From c0f60ae3ceb4f6e5d1ce44de8a4dfba82a23faa3 Mon Sep 17 00:00:00 2001 From: Richard Date: Tue, 9 Jun 2026 13:31:19 -0400 Subject: [PATCH 1/6] first iterion for multi-column dictionary benchmarks --- .../multi_column_dictionary_group_values.rs | 217 ++++++++++++++++++ 1 file changed, 217 insertions(+) create mode 100644 datafusion/physical-plan/benches/multi_column_dictionary_group_values.rs diff --git a/datafusion/physical-plan/benches/multi_column_dictionary_group_values.rs b/datafusion/physical-plan/benches/multi_column_dictionary_group_values.rs new file mode 100644 index 0000000000000..9a07f806d983a --- /dev/null +++ b/datafusion/physical-plan/benches/multi_column_dictionary_group_values.rs @@ -0,0 +1,217 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmarks for `GroupValues` over multiple `Dictionary` +//! columns. Each iteration measures repeated `intern` (N_BATCHES times) +//! followed by `emit(EmitTo::All)`. Covers 2, 4, and 8 group-by columns, +//! batch sizes of 8 KiB and 64 KiB rows, fixed cardinalities (20 / 100 / 500 +//! / 1 000) and an all-unique case where every row is distinct. + +use arrow::array::{ArrayRef, DictionaryArray, PrimitiveArray, StringArray}; +use arrow::buffer::{Buffer, NullBuffer}; +use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef}; +use criterion::{ + BatchSize, BenchmarkId, Criterion, Throughput, criterion_group, criterion_main, +}; +use datafusion_expr::EmitTo; +use datafusion_physical_plan::aggregates::group_values::new_group_values; +use datafusion_physical_plan::aggregates::order::GroupOrdering; +use rand::rngs::StdRng; +use rand::seq::SliceRandom; +use rand::{Rng, SeedableRng}; +use std::hint::black_box; +use std::sync::Arc; + +const SIZES: [usize; 2] = [8 * 1024, 64 * 1024]; +const N_COLS: [usize; 2] = [4, 8]; +const CARDS: [usize; 3] = [20, 100, 500]; +const N_BATCHES: usize = 5; +const NULL_DENSITY: f32 = 0.15; +const SEED: u64 = 0xD1C7; + +fn schema_for_cols(n_cols: usize) -> SchemaRef { + let dict_ty = + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + let fields: Vec = (0..n_cols) + .map(|i| Field::new(format!("g{i}"), dict_ty.clone(), true)) + .collect(); + Arc::new(Schema::new(fields)) +} + +/// Build a single `Dictionary` column. +/// `card_lo..=card_hi` is sampled per-column so each column in a batch has a +/// different cardinality, matching real-world multi-key GROUP BY data. +fn make_dict_col(size: usize, card_lo: usize, card_hi: usize, null_density: f32, seed: u64) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(seed); + let cardinality = if card_lo == card_hi { + card_lo + } else { + rng.random_range(card_lo..=card_hi) + }; + + let strings: Vec = (0..cardinality).map(|i| format!("dict_label_{i:012}")).collect(); + let values = Arc::new(StringArray::from( + strings.iter().map(String::as_str).collect::>(), + )); + + // When the pool is at least as large as the batch, shuffle a prefix so + // every row in this batch maps to a distinct key. + let keys: Vec = if cardinality >= size { + let mut perm: Vec = (0..size as i32).collect(); + perm.shuffle(&mut rng); + perm + } else { + (0..size) + .map(|_| rng.random_range(0..cardinality) as i32) + .collect() + }; + let keys_buf = Buffer::from_slice_ref(&keys); + + let nulls: Option = (null_density > 0.0).then(|| { + (0..size) + .map(|_| !rng.random_bool(null_density as f64)) + .collect() + }); + + let key_array = PrimitiveArray::::new(keys_buf.into(), nulls); + Arc::new(DictionaryArray::::try_new(key_array, values).unwrap()) +} + +/// Build one batch of `n_cols` dictionary columns. +/// Each column independently samples its cardinality from `card_lo..=card_hi`. +fn make_batch( + n_cols: usize, + size: usize, + card_lo: usize, + card_hi: usize, + null_density: f32, + seed: u64, +) -> Vec { + (0..n_cols) + .map(|col| { + make_dict_col( + size, + card_lo, + card_hi, + null_density, + seed.wrapping_add(col as u64 * 0x9E37), + ) + }) + .collect() +} + +fn bench_id(label: &str, n_cols: usize, size: usize, cardinality: usize) -> BenchmarkId { + let card_label = if cardinality == size { + "all_unique".to_string() + } else { + format!("{cardinality}") + }; + BenchmarkId::new( + format!("{label}/cols_{n_cols}"), + format!("size_{size}_card_{card_label}_null_{NULL_DENSITY:.2}"), + ) +} + +fn bench_multi_col_repeated_intern_emit(c: &mut Criterion) { + let mut group = c.benchmark_group("multi_col_dict_repeated_intern_emit"); + + for &n_cols in &N_COLS { + let schema = schema_for_cols(n_cols); + + for &size in &SIZES { + let mut cardinalities = CARDS.to_vec(); + cardinalities.push(size); + + for cardinality in cardinalities { + // For the all-unique case pin both bounds to size; otherwise + // spread each column's cardinality across [card/2, card*2]. + let (card_lo, card_hi) = if cardinality == size { + (size, size) + } else { + ((cardinality / 2).max(1), cardinality * 2) + }; + + // Pre-build all batches outside the timing loop. + let batches: Vec> = (0..N_BATCHES) + .map(|i| { + make_batch( + n_cols, + size, + card_lo, + card_hi, + NULL_DENSITY, + SEED.wrapping_add(i as u64 * 0x1F3D), + ) + }) + .collect(); + + group.throughput(Throughput::Elements((size * N_BATCHES) as u64)); + + group.bench_function(bench_id("repeated", n_cols, size, cardinality), |b| { + b.iter_batched_ref( + || { + ( + new_group_values(schema.clone(), &GroupOrdering::None) + .unwrap(), + Vec::::with_capacity(size), + ) + }, + |(gv, groups)| { + for batch in &batches { + gv.intern(batch.as_slice(), groups).unwrap(); + black_box(&*groups); + } + black_box(gv.emit(EmitTo::All).unwrap()); + }, + BatchSize::SmallInput, + ); + }); + + // Partial-emit variant: after each intern spill the first half + // of accumulated groups, then flush the remainder with All. + group.bench_function(bench_id("partial_emit", n_cols, size, cardinality), |b| { + b.iter_batched_ref( + || { + ( + new_group_values(schema.clone(), &GroupOrdering::None) + .unwrap(), + Vec::::with_capacity(size), + ) + }, + |(gv, groups)| { + for batch in &batches { + gv.intern(batch.as_slice(), groups).unwrap(); + black_box(&*groups); + let half = gv.len() / 2; + if half > 0 { + black_box(gv.emit(EmitTo::First(half)).unwrap()); + } + } + black_box(gv.emit(EmitTo::All).unwrap()); + }, + BatchSize::SmallInput, + ); + }); + } + } + } + + group.finish(); +} + +criterion_group!(benches, bench_multi_col_repeated_intern_emit); +criterion_main!(benches); From d9f1af0ba6221b43c9e6620e25566eeeb765caed Mon Sep 17 00:00:00 2001 From: Richard Date: Tue, 9 Jun 2026 15:53:25 -0400 Subject: [PATCH 2/6] introduce multi-column dictionary group value benchmarks --- .../multi_column_dictionary_group_values.rs | 181 +++++++++--------- 1 file changed, 91 insertions(+), 90 deletions(-) diff --git a/datafusion/physical-plan/benches/multi_column_dictionary_group_values.rs b/datafusion/physical-plan/benches/multi_column_dictionary_group_values.rs index 9a07f806d983a..4788e88df12aa 100644 --- a/datafusion/physical-plan/benches/multi_column_dictionary_group_values.rs +++ b/datafusion/physical-plan/benches/multi_column_dictionary_group_values.rs @@ -15,15 +15,13 @@ // specific language governing permissions and limitations // under the License. -//! Benchmarks for `GroupValues` over multiple `Dictionary` -//! columns. Each iteration measures repeated `intern` (N_BATCHES times) -//! followed by `emit(EmitTo::All)`. Covers 2, 4, and 8 group-by columns, -//! batch sizes of 8 KiB and 64 KiB rows, fixed cardinalities (20 / 100 / 500 -//! / 1 000) and an all-unique case where every row is distinct. +//! Benchmarks for `GroupValues` over multiple `Dictionary` columns. +//! Covers 4 and 8 group-by columns, batch sizes of 8 KiB and 64 KiB rows, +//! and cardinalities realistic for multi-column GROUP BY workloads (20 / 100 / 500 / 1 000). use arrow::array::{ArrayRef, DictionaryArray, PrimitiveArray, StringArray}; use arrow::buffer::{Buffer, NullBuffer}; -use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef, UInt64Type}; use criterion::{ BatchSize, BenchmarkId, Criterion, Throughput, criterion_group, criterion_main, }; @@ -38,45 +36,47 @@ use std::sync::Arc; const SIZES: [usize; 2] = [8 * 1024, 64 * 1024]; const N_COLS: [usize; 2] = [4, 8]; -const CARDS: [usize; 3] = [20, 100, 500]; +const CARDS: [usize; 4] = [20, 100, 500, 1_000]; const N_BATCHES: usize = 5; const NULL_DENSITY: f32 = 0.15; const SEED: u64 = 0xD1C7; +/// Per-column cardinality variance as a percentage of the target (half above, half below). +/// each column's distinct count is sampled from [target*0.95, target*1.05]. +const CARDINALITY_RANGE: usize = 10; fn schema_for_cols(n_cols: usize) -> SchemaRef { let dict_ty = - DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + DataType::Dictionary(Box::new(DataType::UInt64), Box::new(DataType::Utf8)); let fields: Vec = (0..n_cols) .map(|i| Field::new(format!("g{i}"), dict_ty.clone(), true)) .collect(); Arc::new(Schema::new(fields)) } -/// Build a single `Dictionary` column. -/// `card_lo..=card_hi` is sampled per-column so each column in a batch has a -/// different cardinality, matching real-world multi-key GROUP BY data. -fn make_dict_col(size: usize, card_lo: usize, card_hi: usize, null_density: f32, seed: u64) -> ArrayRef { +fn make_dict_col( + size: usize, + num_distinct: usize, + null_density: f32, + seed: u64, +) -> ArrayRef { let mut rng = StdRng::seed_from_u64(seed); - let cardinality = if card_lo == card_hi { - card_lo - } else { - rng.random_range(card_lo..=card_hi) - }; - let strings: Vec = (0..cardinality).map(|i| format!("dict_label_{i:012}")).collect(); + let strings: Vec = (0..num_distinct) + .map(|i| format!("dict_label_{i:012}")) + .collect(); let values = Arc::new(StringArray::from( strings.iter().map(String::as_str).collect::>(), )); // When the pool is at least as large as the batch, shuffle a prefix so // every row in this batch maps to a distinct key. - let keys: Vec = if cardinality >= size { - let mut perm: Vec = (0..size as i32).collect(); + let keys: Vec = if num_distinct >= size { + let mut perm: Vec = (0..size as u64).collect(); perm.shuffle(&mut rng); perm } else { (0..size) - .map(|_| rng.random_range(0..cardinality) as i32) + .map(|_| rng.random_range(0..num_distinct) as u64) .collect() }; let keys_buf = Buffer::from_slice_ref(&keys); @@ -87,26 +87,27 @@ fn make_dict_col(size: usize, card_lo: usize, card_hi: usize, null_density: f32, .collect() }); - let key_array = PrimitiveArray::::new(keys_buf.into(), nulls); - Arc::new(DictionaryArray::::try_new(key_array, values).unwrap()) + let key_array = PrimitiveArray::::new(keys_buf.into(), nulls); + Arc::new(DictionaryArray::::try_new(key_array, values).unwrap()) } -/// Build one batch of `n_cols` dictionary columns. -/// Each column independently samples its cardinality from `card_lo..=card_hi`. fn make_batch( n_cols: usize, size: usize, - card_lo: usize, - card_hi: usize, + target_distinct: usize, null_density: f32, seed: u64, ) -> Vec { + let half = CARDINALITY_RANGE / 2; + let lo = (target_distinct * (100 - half) / 100).max(1); + let hi = (target_distinct * (100 + half) / 100).max(lo); + let mut rng = StdRng::seed_from_u64(seed); (0..n_cols) .map(|col| { + let num_distinct = rng.random_range(lo..=hi); make_dict_col( size, - card_lo, - card_hi, + num_distinct, null_density, seed.wrapping_add(col as u64 * 0x9E37), ) @@ -114,45 +115,33 @@ fn make_batch( .collect() } -fn bench_id(label: &str, n_cols: usize, size: usize, cardinality: usize) -> BenchmarkId { - let card_label = if cardinality == size { - "all_unique".to_string() - } else { - format!("{cardinality}") - }; +fn bench_id( + label: &str, + n_cols: usize, + size: usize, + target_distinct: usize, +) -> BenchmarkId { BenchmarkId::new( format!("{label}/cols_{n_cols}"), - format!("size_{size}_card_{card_label}_null_{NULL_DENSITY:.2}"), + format!("size_{size}_card_{target_distinct}"), ) } fn bench_multi_col_repeated_intern_emit(c: &mut Criterion) { - let mut group = c.benchmark_group("multi_col_dict_repeated_intern_emit"); + let mut group = c.benchmark_group("multi_column_dictionary_group_values"); for &n_cols in &N_COLS { let schema = schema_for_cols(n_cols); for &size in &SIZES { - let mut cardinalities = CARDS.to_vec(); - cardinalities.push(size); - - for cardinality in cardinalities { - // For the all-unique case pin both bounds to size; otherwise - // spread each column's cardinality across [card/2, card*2]. - let (card_lo, card_hi) = if cardinality == size { - (size, size) - } else { - ((cardinality / 2).max(1), cardinality * 2) - }; - + for &target_distinct in &CARDS { // Pre-build all batches outside the timing loop. let batches: Vec> = (0..N_BATCHES) .map(|i| { make_batch( n_cols, size, - card_lo, - card_hi, + target_distinct, NULL_DENSITY, SEED.wrapping_add(i as u64 * 0x1F3D), ) @@ -161,51 +150,63 @@ fn bench_multi_col_repeated_intern_emit(c: &mut Criterion) { group.throughput(Throughput::Elements((size * N_BATCHES) as u64)); - group.bench_function(bench_id("repeated", n_cols, size, cardinality), |b| { - b.iter_batched_ref( - || { - ( - new_group_values(schema.clone(), &GroupOrdering::None) + group.bench_function( + bench_id("repeated", n_cols, size, target_distinct), + |b| { + b.iter_batched_ref( + || { + ( + new_group_values( + schema.clone(), + &GroupOrdering::None, + ) .unwrap(), - Vec::::with_capacity(size), - ) - }, - |(gv, groups)| { - for batch in &batches { - gv.intern(batch.as_slice(), groups).unwrap(); - black_box(&*groups); - } - black_box(gv.emit(EmitTo::All).unwrap()); - }, - BatchSize::SmallInput, - ); - }); + Vec::::with_capacity(size), + ) + }, + |(gv, groups)| { + for batch in &batches { + gv.intern(batch.as_slice(), groups).unwrap(); + black_box(&*groups); + } + black_box(gv.emit(EmitTo::All).unwrap()); + }, + BatchSize::SmallInput, + ); + }, + ); // Partial-emit variant: after each intern spill the first half // of accumulated groups, then flush the remainder with All. - group.bench_function(bench_id("partial_emit", n_cols, size, cardinality), |b| { - b.iter_batched_ref( - || { - ( - new_group_values(schema.clone(), &GroupOrdering::None) + group.bench_function( + bench_id("partial_emit", n_cols, size, target_distinct), + |b| { + b.iter_batched_ref( + || { + ( + new_group_values( + schema.clone(), + &GroupOrdering::None, + ) .unwrap(), - Vec::::with_capacity(size), - ) - }, - |(gv, groups)| { - for batch in &batches { - gv.intern(batch.as_slice(), groups).unwrap(); - black_box(&*groups); - let half = gv.len() / 2; - if half > 0 { - black_box(gv.emit(EmitTo::First(half)).unwrap()); + Vec::::with_capacity(size), + ) + }, + |(gv, groups)| { + for batch in &batches { + gv.intern(batch.as_slice(), groups).unwrap(); + black_box(&*groups); + let half = gv.len() / 2; + if half > 0 { + black_box(gv.emit(EmitTo::First(half)).unwrap()); + } } - } - black_box(gv.emit(EmitTo::All).unwrap()); - }, - BatchSize::SmallInput, - ); - }); + black_box(gv.emit(EmitTo::All).unwrap()); + }, + BatchSize::SmallInput, + ); + }, + ); } } } From 9fd8d2a4156ec59b784185a3055812f8ecc6845b Mon Sep 17 00:00:00 2001 From: Richard Date: Wed, 10 Jun 2026 17:14:21 -0400 Subject: [PATCH 3/6] re-run CI From 45005c48ee01148d9e7122e7dd519c7b9ef84e63 Mon Sep 17 00:00:00 2001 From: Richard Date: Mon, 15 Jun 2026 10:05:18 -0400 Subject: [PATCH 4/6] fix distinct_rows calculation and introduce benchmarks to track cross-product aggregations --- .../multi_column_dictionary_group_values.rs | 234 +++++++++++++++--- 1 file changed, 198 insertions(+), 36 deletions(-) diff --git a/datafusion/physical-plan/benches/multi_column_dictionary_group_values.rs b/datafusion/physical-plan/benches/multi_column_dictionary_group_values.rs index 4788e88df12aa..4795f38ccaa8e 100644 --- a/datafusion/physical-plan/benches/multi_column_dictionary_group_values.rs +++ b/datafusion/physical-plan/benches/multi_column_dictionary_group_values.rs @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -//! Benchmarks for `GroupValues` over multiple `Dictionary` columns. +//! Benchmarks for `GroupValues` over multiple `Dictionary` columns. //! Covers 4 and 8 group-by columns, batch sizes of 8 KiB and 64 KiB rows, //! and cardinalities realistic for multi-column GROUP BY workloads (20 / 100 / 500 / 1 000). -use arrow::array::{ArrayRef, DictionaryArray, PrimitiveArray, StringArray}; +use arrow::array::{Array, ArrayRef, DictionaryArray, PrimitiveArray, StringArray}; use arrow::buffer::{Buffer, NullBuffer}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef, UInt64Type}; use criterion::{ @@ -40,9 +40,6 @@ const CARDS: [usize; 4] = [20, 100, 500, 1_000]; const N_BATCHES: usize = 5; const NULL_DENSITY: f32 = 0.15; const SEED: u64 = 0xD1C7; -/// Per-column cardinality variance as a percentage of the target (half above, half below). -/// each column's distinct count is sampled from [target*0.95, target*1.05]. -const CARDINALITY_RANGE: usize = 10; fn schema_for_cols(n_cols: usize) -> SchemaRef { let dict_ty = @@ -53,35 +50,54 @@ fn schema_for_cols(n_cols: usize) -> SchemaRef { Arc::new(Schema::new(fields)) } +fn count_distinct_tuples(cols: &[ArrayRef]) -> usize { + use std::collections::HashSet; + let n = cols[0].len(); + let mut seen: HashSet>> = HashSet::new(); + for row in 0..n { + let key: Vec> = cols + .iter() + .map(|c| { + let dict = c + .as_any() + .downcast_ref::>() + .unwrap(); + if dict.is_null(row) { + None + } else { + Some(dict.keys().value(row)) + } + }) + .collect(); + seen.insert(key); + } + seen.len() +} + fn make_dict_col( size: usize, - num_distinct: usize, + group_ids: &[usize], + col_idx: usize, + per_col_card: usize, null_density: f32, seed: u64, ) -> ArrayRef { - let mut rng = StdRng::seed_from_u64(seed); - - let strings: Vec = (0..num_distinct) + let strings: Vec = (0..per_col_card) .map(|i| format!("dict_label_{i:012}")) .collect(); let values = Arc::new(StringArray::from( strings.iter().map(String::as_str).collect::>(), )); - // When the pool is at least as large as the batch, shuffle a prefix so - // every row in this batch maps to a distinct key. - let keys: Vec = if num_distinct >= size { - let mut perm: Vec = (0..size as u64).collect(); - perm.shuffle(&mut rng); - perm - } else { - (0..size) - .map(|_| rng.random_range(0..num_distinct) as u64) - .collect() - }; + let divisor = per_col_card.pow(col_idx as u32); + let keys: Vec = group_ids + .iter() + .map(|&g| ((g / divisor) % per_col_card) as u64) + .collect(); let keys_buf = Buffer::from_slice_ref(&keys); let nulls: Option = (null_density > 0.0).then(|| { + let mut rng = StdRng::seed_from_u64(seed); (0..size) .map(|_| !rng.random_bool(null_density as f64)) .collect() @@ -89,8 +105,14 @@ fn make_dict_col( let key_array = PrimitiveArray::::new(keys_buf.into(), nulls); Arc::new(DictionaryArray::::try_new(key_array, values).unwrap()) + as ArrayRef } +/// Each row is assigned a `group_id` (0..`target_distinct`). Column keys are +/// derived from `group_id` via mixed-radix decomposition (treating `group_id` +/// as a base-k number and reading off one digit per column), so rows with the +/// same `group_id` always produce the same tuple. This keeps distinct groups at +/// exactly `target_distinct` regardless of column count. fn make_batch( n_cols: usize, size: usize, @@ -98,21 +120,72 @@ fn make_batch( null_density: f32, seed: u64, ) -> Vec { - let half = CARDINALITY_RANGE / 2; - let lo = (target_distinct * (100 - half) / 100).max(1); - let hi = (target_distinct * (100 + half) / 100).max(lo); let mut rng = StdRng::seed_from_u64(seed); - (0..n_cols) + + // When nulls are present all null rows coalesce into one extra group + // (None, None, …), so we generate one fewer non-null group to keep the + // total at exactly target_distinct. + let n_groups = if null_density > 0.0 { + target_distinct.saturating_sub(1).max(1) + } else { + target_distinct + }; + + let mut per_col_card = (n_groups as f64).powf(1.0 / n_cols as f64).ceil() as usize; + per_col_card = per_col_card.max(1); + while per_col_card.saturating_pow(n_cols as u32) < n_groups { + per_col_card += 1; + } + + let n_extra = size.saturating_sub(n_groups); + let mut group_ids: Vec = (0..n_groups.min(size)).collect(); + group_ids.extend((0..n_extra).map(|_| rng.random_range(0..n_groups))); + group_ids.shuffle(&mut rng); + + let cols: Vec = (0..n_cols) + .map(|col| make_dict_col(size, &group_ids, col, per_col_card, null_density, seed)) + .collect(); + + // run `BENCH_VALIDATE=1 cargo bench --bench multi_column_dictionary_group_values -- --list` to validate that the generated batches have the expected number of distinct groups + if std::env::var("BENCH_VALIDATE").is_ok() { + let actual = count_distinct_tuples(&cols); + eprintln!( + "validate: cols={n_cols} size={size} target={target_distinct} actual={actual}" + ); + } + + cols +} + +/// Each column independently samples from its own `target_distinct` value pool +/// (like GROUP BY department, name, age), so actual distinct groups grow with +/// the cross-product of column cardinalities. +fn make_batch_independent( + n_cols: usize, + size: usize, + target_distinct: usize, + null_density: f32, + seed: u64, +) -> Vec { + let cols: Vec = (0..n_cols) .map(|col| { - let num_distinct = rng.random_range(lo..=hi); - make_dict_col( - size, - num_distinct, - null_density, - seed.wrapping_add(col as u64 * 0x9E37), - ) + let mut rng = StdRng::seed_from_u64(seed.wrapping_add(col as u64 * 0x9E37)); + let group_ids: Vec = (0..size) + .map(|_| rng.random_range(0..target_distinct)) + .collect(); + // col_idx=0, per_col_card=target_distinct → key == group_id directly + make_dict_col(size, &group_ids, 0, target_distinct, null_density, seed) }) - .collect() + .collect(); + + if std::env::var("BENCH_VALIDATE").is_ok() { + let actual = count_distinct_tuples(&cols); + eprintln!( + "validate_independent: cols={n_cols} size={size} per_col_card={target_distinct} actual={actual}" + ); + } + + cols } fn bench_id( @@ -135,7 +208,6 @@ fn bench_multi_col_repeated_intern_emit(c: &mut Criterion) { for &size in &SIZES { for &target_distinct in &CARDS { - // Pre-build all batches outside the timing loop. let batches: Vec> = (0..N_BATCHES) .map(|i| { make_batch( @@ -176,8 +248,6 @@ fn bench_multi_col_repeated_intern_emit(c: &mut Criterion) { }, ); - // Partial-emit variant: after each intern spill the first half - // of accumulated groups, then flush the remainder with All. group.bench_function( bench_id("partial_emit", n_cols, size, target_distinct), |b| { @@ -214,5 +284,97 @@ fn bench_multi_col_repeated_intern_emit(c: &mut Criterion) { group.finish(); } -criterion_group!(benches, bench_multi_col_repeated_intern_emit); +fn bench_multi_col_independent_columns(c: &mut Criterion) { + let mut group = c.benchmark_group("multi_column_dictionary_independent"); + + const INDEPENDENT_SIZE: usize = 8 * 1024; + const INDEPENDENT_CARDS: [usize; 3] = [20, 100, 500]; + + for &n_cols in &N_COLS { + let schema = schema_for_cols(n_cols); + + for &target_distinct in &INDEPENDENT_CARDS { + let size = INDEPENDENT_SIZE; + { + let batches: Vec> = (0..N_BATCHES) + .map(|i| { + make_batch_independent( + n_cols, + size, + target_distinct, + NULL_DENSITY, + SEED.wrapping_add(i as u64 * 0x1F3D), + ) + }) + .collect(); + + group.throughput(Throughput::Elements((size * N_BATCHES) as u64)); + + group.bench_function( + bench_id("repeated", n_cols, size, target_distinct), + |b| { + b.iter_batched_ref( + || { + ( + new_group_values( + schema.clone(), + &GroupOrdering::None, + ) + .unwrap(), + Vec::::with_capacity(size), + ) + }, + |(gv, groups)| { + for batch in &batches { + gv.intern(batch.as_slice(), groups).unwrap(); + black_box(&*groups); + } + black_box(gv.emit(EmitTo::All).unwrap()); + }, + BatchSize::SmallInput, + ); + }, + ); + + group.bench_function( + bench_id("partial_emit", n_cols, size, target_distinct), + |b| { + b.iter_batched_ref( + || { + ( + new_group_values( + schema.clone(), + &GroupOrdering::None, + ) + .unwrap(), + Vec::::with_capacity(size), + ) + }, + |(gv, groups)| { + for batch in &batches { + gv.intern(batch.as_slice(), groups).unwrap(); + black_box(&*groups); + let half = gv.len() / 2; + if half > 0 { + black_box(gv.emit(EmitTo::First(half)).unwrap()); + } + } + black_box(gv.emit(EmitTo::All).unwrap()); + }, + BatchSize::SmallInput, + ); + }, + ); + } + } + } + + group.finish(); +} + +criterion_group!( + benches, + bench_multi_col_repeated_intern_emit, + bench_multi_col_independent_columns +); criterion_main!(benches); From 0a2cac47fafcc005628bfa293e6969b35546503c Mon Sep 17 00:00:00 2001 From: Richard Date: Mon, 15 Jun 2026 10:19:42 -0400 Subject: [PATCH 5/6] fix github issues --- .../benches/multi_column_dictionary_group_values.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/benches/multi_column_dictionary_group_values.rs b/datafusion/physical-plan/benches/multi_column_dictionary_group_values.rs index 4795f38ccaa8e..d8845734af2c3 100644 --- a/datafusion/physical-plan/benches/multi_column_dictionary_group_values.rs +++ b/datafusion/physical-plan/benches/multi_column_dictionary_group_values.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Benchmarks for `GroupValues` over multiple `Dictionary` columns. +//! Benchmarks for `GroupValues` over multiple `Dictionary` columns. //! Covers 4 and 8 group-by columns, batch sizes of 8 KiB and 64 KiB rows, //! and cardinalities realistic for multi-column GROUP BY workloads (20 / 100 / 500 / 1 000). From 133364247a3b53785d541e7624e73a9c6eedbae7 Mon Sep 17 00:00:00 2001 From: Richard Date: Tue, 16 Jun 2026 08:25:16 -0400 Subject: [PATCH 6/6] include benchmark harness in cargo.toml --- datafusion/physical-plan/Cargo.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 0fc75043bf333..26c4576b459a7 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -137,3 +137,7 @@ name = "dictionary_group_values" [[bench]] harness = false name = "multi_group_by" + +[[bench]] +harness = false +name = "multi_column_dictionary_group_values" \ No newline at end of file