Skip to content

Commit cbc046e

Browse files
Dandandanclaude
andcommitted
feat: two-generation early emission for partial aggregation
When the partial aggregate's hash table exceeds a configurable size threshold (default: 4MB), use a two-generation scheme to emit intermediate state while keeping the hash table cache-friendly. When the hot hash table fills up: 1. Emit the cold batch (previous generation's state) downstream 2. Promote the current hot table state to the cold batch 3. Reset the hot hash table and continue reading This gives recurring groups a second chance to be merged locally before being sent downstream, reducing the number of partial emissions through the hash repartition while keeping the working set in CPU cache. At end-of-input, the remaining hot state and cold batch are concatenated and emitted together. New config: datafusion.execution.partial_aggregation_max_table_size Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 0143dfe commit cbc046e

3 files changed

Lines changed: 81 additions & 2 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,14 @@ config_namespace! {
639639
/// aggregation ratio check and trying to switch to skipping aggregation mode
640640
pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
641641

642+
/// Maximum memory (in bytes) that the partial aggregation hash table
643+
/// may use before emitting intermediate state and resetting. This
644+
/// keeps the hash table small enough to fit in CPU cache, improving
645+
/// performance for high-cardinality GROUP BY queries. A value of 0
646+
/// disables early emission. Only applies to Partial aggregation mode
647+
/// with unordered input.
648+
pub partial_aggregation_max_table_size: usize, default = 4_194_304
649+
642650
/// Should DataFusion use row number estimates at the input to decide
643651
/// whether increasing parallelism is beneficial or not. By default,
644652
/// only exact row numbers (not estimates) are used for this decision.

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,18 @@ pub(crate) struct GroupedHashAggregateStream {
437437
/// current stream.
438438
skip_aggregation_probe: Option<SkipAggregationProbe>,
439439

440+
/// Maximum size (in bytes) of the hash table before emitting
441+
/// intermediate state and resetting during partial aggregation.
442+
/// 0 means disabled.
443+
early_emit_max_table_size: usize,
444+
445+
/// Two-generation early emission: the previous generation's partial
446+
/// state batch. When the hot hash table fills up, we emit this cold
447+
/// batch (if any), then store the hot table's state as the new cold
448+
/// batch. Groups appearing across multiple generations get merged
449+
/// locally before being sent downstream.
450+
early_emit_cold_batch: Option<RecordBatch>,
451+
440452
// ========================================================================
441453
// EXECUTION RESOURCES:
442454
// Fields related to managing execution resources and monitoring performance.
@@ -649,6 +661,18 @@ impl GroupedHashAggregateStream {
649661
None
650662
};
651663

664+
let early_emit_max_table_size = if agg.mode == AggregateMode::Partial
665+
&& matches!(group_ordering, GroupOrdering::None)
666+
{
667+
context
668+
.session_config()
669+
.options()
670+
.execution
671+
.partial_aggregation_max_table_size
672+
} else {
673+
0
674+
};
675+
652676
let reduction_factor = if agg.mode == AggregateMode::Partial {
653677
Some(
654678
MetricBuilder::new(&agg.metrics)
@@ -680,6 +704,8 @@ impl GroupedHashAggregateStream {
680704
spill_state,
681705
group_values_soft_limit: agg.limit_options().map(|config| config.limit()),
682706
skip_aggregation_probe,
707+
early_emit_max_table_size,
708+
early_emit_cold_batch: None,
683709
reduction_factor,
684710
})
685711
}
@@ -780,6 +806,39 @@ impl Stream for GroupedHashAggregateStream {
780806
}
781807
}
782808

809+
// Two-generation early emission: keeps the hash table
810+
// small enough to fit in CPU cache while giving
811+
// recurring groups a second chance to be merged locally.
812+
//
813+
// When the hot table fills:
814+
// 1. Emit the cold batch (previous generation) if any
815+
// 2. Promote current hot table state → cold batch
816+
// 3. Reset hot table and continue reading
817+
if self.early_emit_max_table_size > 0 {
818+
let table_size = self.group_values.size()
819+
+ self
820+
.accumulators
821+
.iter()
822+
.map(|x| x.size())
823+
.sum::<usize>();
824+
if table_size >= self.early_emit_max_table_size {
825+
// Take the cold batch to emit
826+
let to_emit = self.early_emit_cold_batch.take();
827+
// Promote hot → cold
828+
let batch_size = self.batch_size;
829+
self.early_emit_cold_batch =
830+
self.emit(EmitTo::All, false)?;
831+
self.clear_shrink(batch_size);
832+
// Emit the previous cold batch if we had one
833+
if let Some(batch) = to_emit {
834+
timer.done();
835+
self.exec_state =
836+
ExecutionState::ProducingOutput(batch);
837+
break 'reading_input;
838+
}
839+
}
840+
}
841+
783842
// If we reach this point, try to update the memory reservation
784843
// handling out-of-memory conditions as determined by the OOM mode.
785844
if let Some(new_state) =
@@ -1221,11 +1280,21 @@ impl GroupedHashAggregateStream {
12211280
self.group_ordering.input_done();
12221281
let elapsed_compute = self.baseline_metrics.elapsed_compute().clone();
12231282
let timer = elapsed_compute.timer();
1283+
12241284
self.exec_state = if self.spill_state.spills.is_empty() {
12251285
// Input has been entirely processed without spilling to disk.
12261286

1227-
// Flush any remaining group values.
1228-
let batch = self.emit(EmitTo::All, false)?;
1287+
// Flush any remaining group values from the hot table,
1288+
// concatenated with the cold batch from early emission.
1289+
let hot_batch = self.emit(EmitTo::All, false)?;
1290+
let cold_batch = self.early_emit_cold_batch.take();
1291+
let batch = match (hot_batch, cold_batch) {
1292+
(Some(hot), Some(cold)) => {
1293+
Some(arrow::compute::concat_batches(&hot.schema(), &[cold, hot])?)
1294+
}
1295+
(Some(b), None) | (None, Some(b)) => Some(b),
1296+
(None, None) => None,
1297+
};
12291298

12301299
// If there are none, we're done; otherwise switch to emitting them
12311300
batch.map_or(ExecutionState::Done, ExecutionState::ProducingOutput)

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ datafusion.execution.parquet.statistics_truncate_length 64
262262
datafusion.execution.parquet.use_content_defined_chunking NULL
263263
datafusion.execution.parquet.write_batch_size 1024
264264
datafusion.execution.parquet.writer_version 1.0
265+
datafusion.execution.partial_aggregation_max_table_size 4194304
265266
datafusion.execution.perfect_hash_join_min_key_density 0.15
266267
datafusion.execution.perfect_hash_join_small_build_threshold 1024
267268
datafusion.execution.planning_concurrency 13
@@ -407,6 +408,7 @@ datafusion.execution.parquet.statistics_truncate_length 64 (writing) Sets statis
407408
datafusion.execution.parquet.use_content_defined_chunking NULL (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When `Some`, CDC is enabled with the given options; when `None` (the default), CDC is disabled. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups.
408409
datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in rows
409410
datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer version valid values are "1.0" and "2.0"
411+
datafusion.execution.partial_aggregation_max_table_size 4194304 Maximum memory (in bytes) that the partial aggregation hash table may use before emitting intermediate state and resetting. This keeps the hash table small enough to fit in CPU cache, improving performance for high-cardinality GROUP BY queries. A value of 0 disables early emission. Only applies to Partial aggregation mode with unordered input.
410412
datafusion.execution.perfect_hash_join_min_key_density 0.15 The minimum required density of join keys on the build side to consider a perfect hash join (see `HashJoinExec` for more details). Density is calculated as: `(number of rows) / (max_key - min_key + 1)`. A perfect hash join may be used if the actual key density > this value. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future.
411413
datafusion.execution.perfect_hash_join_small_build_threshold 1024 A perfect hash join (see `HashJoinExec` for more details) will be considered if the range of keys (max - min) on the build side is < this threshold. This provides a fast path for joins with very small key ranges, bypassing the density check. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future.
412414
datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system

0 commit comments

Comments
 (0)