Skip to content
8 changes: 8 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,14 @@ config_namespace! {
/// aggregation ratio check and trying to switch to skipping aggregation mode
pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000

/// Maximum memory (in bytes) that the partial aggregation hash table
/// may use before emitting intermediate state and resetting. This
/// keeps the hash table small enough to fit in CPU cache, improving
/// performance for high-cardinality GROUP BY queries. A value of 0
/// disables early emission. Only applies to Partial aggregation mode
/// with unordered input.
pub partial_aggregation_max_table_size: usize, default = 4_194_304

/// Should DataFusion use row number estimates at the input to decide
/// whether increasing parallelism is beneficial or not. By default,
/// only exact row numbers (not estimates) are used for this decision.
Expand Down
97 changes: 76 additions & 21 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ impl SkipAggregationProbe {
self.should_skip
}

/// Returns whether the probe has evaluated (enough rows seen).
fn has_evaluated(&self) -> bool {
self.input_rows >= self.probe_rows_threshold
}

/// Record the number of rows that were output directly without aggregation
fn record_skipped(&mut self, batch: &RecordBatch) {
self.skipped_aggregation_rows.add(batch.num_rows());
Expand Down Expand Up @@ -437,6 +442,13 @@ pub(crate) struct GroupedHashAggregateStream {
/// current stream.
skip_aggregation_probe: Option<SkipAggregationProbe>,

/// Maximum size (in bytes) of the hash table before emitting
/// accumulated state and resetting during partial aggregation.
/// When exceeded, all groups are emitted as partial state and
/// the hash table is cleared, bounding memory usage.
/// 0 means disabled.
early_emit_max_table_size: usize,

// ========================================================================
// EXECUTION RESOURCES:
// Fields related to managing execution resources and monitoring performance.
Expand Down Expand Up @@ -649,6 +661,22 @@ impl GroupedHashAggregateStream {
None
};

// Emit+reset strategy: when the hash table exceeds a size
// limit, emit all accumulated state and reset the table.
// This bounds memory usage while still benefiting from
// partial aggregation.
let early_emit_max_table_size = if agg.mode == AggregateMode::Partial
&& matches!(group_ordering, GroupOrdering::None)
{
context
.session_config()
.options()
.execution
.partial_aggregation_max_table_size
} else {
0
};

let reduction_factor = if agg.mode == AggregateMode::Partial {
Some(
MetricBuilder::new(&agg.metrics)
Expand Down Expand Up @@ -680,6 +708,7 @@ impl GroupedHashAggregateStream {
spill_state,
group_values_soft_limit: agg.limit_options().map(|config| config.limit()),
skip_aggregation_probe,
early_emit_max_table_size,
reduction_factor,
})
}
Expand Down Expand Up @@ -780,6 +809,31 @@ impl Stream for GroupedHashAggregateStream {
}
}

// Emit+reset: when the hash table exceeds the
// size limit, emit all accumulated state and
// reset the table to bound memory usage.
//
// Only activate after the skip probe has evaluated
// and decided NOT to skip. This prevents emit+reset
// from cycling on high-cardinality queries where
// skip_aggregation should take over instead.
if self.early_emit_max_table_size > 0
&& match &self.skip_aggregation_probe {
None => true,
Some(p) => p.has_evaluated() && !p.should_skip(),
}
&& self.table_size() >= self.early_emit_max_table_size
{
let batch_size = self.batch_size;
if let Some(batch) = self.emit(EmitTo::All, false)? {
self.clear_shrink(batch_size);
timer.done();
self.exec_state =
ExecutionState::ProducingOutput(batch);
break 'reading_input;
}
}

// If we reach this point, try to update the memory reservation
// handling out-of-memory conditions as determined by the OOM mode.
if let Some(new_state) =
Expand Down Expand Up @@ -843,22 +897,18 @@ impl Stream for GroupedHashAggregateStream {
// slice off a part of the batch, if needed
let output_batch;
let size = self.batch_size;
let batch = batch.clone();
(self.exec_state, output_batch) = if batch.num_rows() <= size {
(
if self.input_done {
ExecutionState::Done
}
// In Partial aggregation, we also need to check
// if we should trigger partial skipping
else if self.mode == AggregateMode::Partial
&& self.should_skip_aggregation()
{
ExecutionState::SkippingAggregation
} else {
ExecutionState::ReadingInput
},
batch.clone(),
)
let next_state = if self.input_done {
ExecutionState::Done
} else if self.mode == AggregateMode::Partial
&& self.should_skip_aggregation()
{
ExecutionState::SkippingAggregation
} else {
ExecutionState::ReadingInput
};
(next_state, batch)
} else {
// output first batch_size rows
let size = self.batch_size;
Expand Down Expand Up @@ -1053,9 +1103,8 @@ impl GroupedHashAggregateStream {
}

fn update_memory_reservation(&mut self) -> Result<()> {
let acc = self.accumulators.iter().map(|x| x.size()).sum::<usize>();
let groups_and_acc_size = acc
+ self.group_values.size()
let table_size = self.table_size();
let groups_and_acc_size = table_size
+ self.group_ordering.size()
+ self.current_group_indices.allocated_size();

Expand All @@ -1069,7 +1118,7 @@ impl GroupedHashAggregateStream {
// after clear_shrink is sufficient to cover the sort memory.
let sort_headroom =
if self.oom_mode == OutOfMemoryMode::Spill && !self.group_values.is_empty() {
acc + self.group_values.size()
table_size
} else {
0
};
Expand Down Expand Up @@ -1203,6 +1252,12 @@ impl GroupedHashAggregateStream {
self.clear_shrink(0);
}

/// Returns the combined size of the hash table and accumulators in bytes.
fn table_size(&self) -> usize {
self.group_values.size()
+ self.accumulators.iter().map(|x| x.size()).sum::<usize>()
}

/// returns true if there is a soft groups limit and the number of distinct
/// groups we have seen is over that limit
fn hit_soft_group_limit(&self) -> bool {
Expand All @@ -1221,13 +1276,13 @@ impl GroupedHashAggregateStream {
self.group_ordering.input_done();
let elapsed_compute = self.baseline_metrics.elapsed_compute().clone();
let timer = elapsed_compute.timer();

self.exec_state = if self.spill_state.spills.is_empty() {
// Input has been entirely processed without spilling to disk.

// Flush any remaining group values.
// Flush any remaining group values from the hash table.
let batch = self.emit(EmitTo::All, false)?;

// If there are none, we're done; otherwise switch to emitting them
batch.map_or(ExecutionState::Done, ExecutionState::ProducingOutput)
} else {
// Spill any remaining data to disk. There is some performance overhead in
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ datafusion.execution.parquet.statistics_truncate_length 64
datafusion.execution.parquet.use_content_defined_chunking NULL
datafusion.execution.parquet.write_batch_size 1024
datafusion.execution.parquet.writer_version 1.0
datafusion.execution.partial_aggregation_max_table_size 4194304
datafusion.execution.perfect_hash_join_min_key_density 0.15
datafusion.execution.perfect_hash_join_small_build_threshold 1024
datafusion.execution.planning_concurrency 13
Expand Down Expand Up @@ -407,6 +408,7 @@ datafusion.execution.parquet.statistics_truncate_length 64 (writing) Sets statis
datafusion.execution.parquet.use_content_defined_chunking NULL (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When `Some`, CDC is enabled with the given options; when `None` (the default), CDC is disabled. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups.
datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in rows
datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer version valid values are "1.0" and "2.0"
datafusion.execution.partial_aggregation_max_table_size 4194304 Maximum memory (in bytes) that the partial aggregation hash table may use before emitting intermediate state and resetting. This keeps the hash table small enough to fit in CPU cache, improving performance for high-cardinality GROUP BY queries. A value of 0 disables early emission. Only applies to Partial aggregation mode with unordered input.
datafusion.execution.perfect_hash_join_min_key_density 0.15 The minimum required density of join keys on the build side to consider a perfect hash join (see `HashJoinExec` for more details). Density is calculated as: `(number of rows) / (max_key - min_key + 1)`. A perfect hash join may be used if the actual key density > this value. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future.
datafusion.execution.perfect_hash_join_small_build_threshold 1024 A perfect hash join (see `HashJoinExec` for more details) will be considered if the range of keys (max - min) on the build side is < this threshold. This provides a fast path for joins with very small key ranges, bypassing the density check. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future.
datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system
Expand Down
Loading