From c3969500a004498b49a023de60827e234bfbd715 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Mon, 9 Feb 2026 22:14:48 +0800 Subject: [PATCH 1/7] perf array_union/array_intersect --- .../functions-nested/benches/array_set_ops.rs | 186 ++++++++++++++++++ 1 file changed, 186 insertions(+) create mode 100644 datafusion/functions-nested/benches/array_set_ops.rs diff --git a/datafusion/functions-nested/benches/array_set_ops.rs b/datafusion/functions-nested/benches/array_set_ops.rs new file mode 100644 index 0000000000000..5a523bbebb2ac --- /dev/null +++ b/datafusion/functions-nested/benches/array_set_ops.rs @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[macro_use] +extern crate criterion; + +use arrow::array::{ArrayRef, Int64Array, ListArray}; +use arrow::buffer::OffsetBuffer; +use arrow::datatypes::{DataType, Field}; +use criterion::{BenchmarkId, Criterion}; +use datafusion_common::config::ConfigOptions; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; +use datafusion_functions_nested::set_ops::{ArrayUnion, ArrayIntersect}; +use rand::Rng; +use rand::SeedableRng; +use rand::rngs::StdRng; +use std::hint::black_box; +use std::sync::Arc; + +const NUM_ROWS: usize = 1000; +const ARRAY_SIZES: &[usize] = &[10, 50, 100]; +const SEED: u64 = 42; + +fn criterion_benchmark(c: &mut Criterion) { + bench_array_union(c); + bench_array_intersect(c); +} + +fn invoke_array_union(udf: &impl ScalarUDFImpl, array1: &ArrayRef, array2: &ArrayRef) { + let args = vec![ + ColumnarValue::Array(array1.clone()), + ColumnarValue::Array(array2.clone()), + ]; + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args, + arg_fields: vec![ + Field::new("arr1", array1.data_type().clone(), false).into(), + Field::new("arr2", array2.data_type().clone(), false).into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new("result", array1.data_type().clone(), false).into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ); +} + +fn invoke_array_intersect(udf: &impl ScalarUDFImpl, array1: &ArrayRef, array2: &ArrayRef) { + let args = vec![ + ColumnarValue::Array(array1.clone()), + ColumnarValue::Array(array2.clone()), + ]; + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args, + arg_fields: vec![ + Field::new("arr1", array1.data_type().clone(), false).into(), + Field::new("arr2", array2.data_type().clone(), false).into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new("result", array1.data_type().clone(), false).into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ); +} + +fn bench_array_union(c: &mut Criterion) { + let mut group = c.benchmark_group("array_union"); + let udf = ArrayUnion::new(); + + for &array_size in ARRAY_SIZES { + let (array1, array2) = create_arrays_with_overlap(NUM_ROWS, array_size, 0.8); + group.bench_with_input( + BenchmarkId::new("high_overlap", array_size), + &array_size, + |b, _| b.iter(|| invoke_array_union(&udf, &array1, &array2)), + ); + } + + for &array_size in ARRAY_SIZES { + let (array1, array2) = create_arrays_with_overlap(NUM_ROWS, array_size, 0.2); + group.bench_with_input( + BenchmarkId::new("low_overlap", array_size), + &array_size, + |b, _| b.iter(|| invoke_array_union(&udf, &array1, &array2)), + ); + } + + group.finish(); +} + +fn bench_array_intersect(c: &mut Criterion) { + let mut group = c.benchmark_group("array_intersect"); + let udf = ArrayIntersect::new(); + + for &array_size in ARRAY_SIZES { + let (array1, array2) = create_arrays_with_overlap(NUM_ROWS, array_size, 0.8); + group.bench_with_input( + BenchmarkId::new("high_overlap", array_size), + &array_size, + |b, _| b.iter(|| invoke_array_intersect(&udf, &array1, &array2)), + ); + } + + for &array_size in ARRAY_SIZES { + let (array1, array2) = create_arrays_with_overlap(NUM_ROWS, array_size, 0.2); + group.bench_with_input( + BenchmarkId::new("low_overlap", array_size), + &array_size, + |b, _| b.iter(|| invoke_array_intersect(&udf, &array1, &array2)), + ); + } + + group.finish(); +} + +fn create_arrays_with_overlap(num_rows: usize, array_size: usize, overlap_ratio: f64) -> (ArrayRef, ArrayRef) { + let mut rng = StdRng::seed_from_u64(SEED); + + let values1 = (0..num_rows * array_size) + .map(|_| Some(rng.random_range(0..(array_size * 10) as i64))) + .collect::(); + let offsets1 = (0..=num_rows) + .map(|i| (i * array_size) as i32) + .collect::>(); + + let mut values2 = Vec::new(); + for row in 0..num_rows { + let start = row * array_size; + let _end = start + array_size; + let overlap_count = (array_size as f64 * overlap_ratio) as usize; + + for i in 0..overlap_count { + values2.push(values1.value(start + i)); + } + for _ in overlap_count..array_size { + values2.push(rng.random_range(0..(array_size * 10) as i64)); + } + } + + let values2 = Int64Array::from(values2); + let offsets2 = (0..=num_rows) + .map(|i| (i * array_size) as i32) + .collect::>(); + + let array1 = Arc::new( + ListArray::try_new( + Arc::new(Field::new("item", DataType::Int64, true)), + OffsetBuffer::new(offsets1.into()), + Arc::new(values1), + None, + ) + .unwrap(), + ); + + let array2 = Arc::new( + ListArray::try_new( + Arc::new(Field::new("item", DataType::Int64, true)), + OffsetBuffer::new(offsets2.into()), + Arc::new(values2), + None, + ) + .unwrap(), + ); + + (array1, array2) +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); From 01affff84a9ec48930b5d2f20a1c112ebc3d65e6 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Mon, 9 Feb 2026 22:15:03 +0800 Subject: [PATCH 2/7] perf array_union/array_intersect --- datafusion/functions-nested/Cargo.toml | 4 + .../functions-nested/benches/array_set_ops.rs | 82 ++++++++----- datafusion/functions-nested/src/set_ops.rs | 113 ++++++++++-------- datafusion/sqllogictest/test_files/array.slt | 12 +- 4 files changed, 123 insertions(+), 88 deletions(-) diff --git a/datafusion/functions-nested/Cargo.toml b/datafusion/functions-nested/Cargo.toml index bfbfbd56c8baf..e5e601f30ae84 100644 --- a/datafusion/functions-nested/Cargo.toml +++ b/datafusion/functions-nested/Cargo.toml @@ -92,3 +92,7 @@ name = "array_remove" [[bench]] harness = false name = "array_repeat" + +[[bench]] +harness = false +name = "array_set_ops" diff --git a/datafusion/functions-nested/benches/array_set_ops.rs b/datafusion/functions-nested/benches/array_set_ops.rs index 5a523bbebb2ac..a42dc130eea12 100644 --- a/datafusion/functions-nested/benches/array_set_ops.rs +++ b/datafusion/functions-nested/benches/array_set_ops.rs @@ -24,9 +24,9 @@ use arrow::datatypes::{DataType, Field}; use criterion::{BenchmarkId, Criterion}; use datafusion_common::config::ConfigOptions; use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; -use datafusion_functions_nested::set_ops::{ArrayUnion, ArrayIntersect}; -use rand::Rng; +use datafusion_functions_nested::set_ops::{ArrayIntersect, ArrayUnion}; use rand::SeedableRng; +use rand::prelude::SliceRandom; use rand::rngs::StdRng; use std::hint::black_box; use std::sync::Arc; @@ -41,13 +41,12 @@ fn criterion_benchmark(c: &mut Criterion) { } fn invoke_array_union(udf: &impl ScalarUDFImpl, array1: &ArrayRef, array2: &ArrayRef) { - let args = vec![ - ColumnarValue::Array(array1.clone()), - ColumnarValue::Array(array2.clone()), - ]; black_box( udf.invoke_with_args(ScalarFunctionArgs { - args, + args: vec![ + ColumnarValue::Array(array1.clone()), + ColumnarValue::Array(array2.clone()), + ], arg_fields: vec![ Field::new("arr1", array1.data_type().clone(), false).into(), Field::new("arr2", array2.data_type().clone(), false).into(), @@ -60,14 +59,17 @@ fn invoke_array_union(udf: &impl ScalarUDFImpl, array1: &ArrayRef, array2: &Arra ); } -fn invoke_array_intersect(udf: &impl ScalarUDFImpl, array1: &ArrayRef, array2: &ArrayRef) { - let args = vec![ - ColumnarValue::Array(array1.clone()), - ColumnarValue::Array(array2.clone()), - ]; +fn invoke_array_intersect( + udf: &impl ScalarUDFImpl, + array1: &ArrayRef, + array2: &ArrayRef, +) { black_box( udf.invoke_with_args(ScalarFunctionArgs { - args, + args: vec![ + ColumnarValue::Array(array1.clone()), + ColumnarValue::Array(array2.clone()), + ], arg_fields: vec![ Field::new("arr1", array1.data_type().clone(), false).into(), Field::new("arr2", array2.data_type().clone(), false).into(), @@ -130,39 +132,53 @@ fn bench_array_intersect(c: &mut Criterion) { group.finish(); } -fn create_arrays_with_overlap(num_rows: usize, array_size: usize, overlap_ratio: f64) -> (ArrayRef, ArrayRef) { +fn create_arrays_with_overlap( + num_rows: usize, + array_size: usize, + overlap_ratio: f64, +) -> (ArrayRef, ArrayRef) { + assert!((0.0..=1.0).contains(&overlap_ratio)); + let overlap_count = ((array_size as f64) * overlap_ratio).round() as usize; + let mut rng = StdRng::seed_from_u64(SEED); - let values1 = (0..num_rows * array_size) - .map(|_| Some(rng.random_range(0..(array_size * 10) as i64))) - .collect::(); - let offsets1 = (0..=num_rows) - .map(|i| (i * array_size) as i32) - .collect::>(); + let mut values1 = Vec::with_capacity(num_rows * array_size); + let mut values2 = Vec::with_capacity(num_rows * array_size); - let mut values2 = Vec::new(); for row in 0..num_rows { - let start = row * array_size; - let _end = start + array_size; - let overlap_count = (array_size as f64 * overlap_ratio) as usize; + let base = (row as i64) * (array_size as i64) * 2; - for i in 0..overlap_count { - values2.push(values1.value(start + i)); + for i in 0..array_size { + values1.push(base + i as i64); } - for _ in overlap_count..array_size { - values2.push(rng.random_range(0..(array_size * 10) as i64)); + + let mut positions: Vec = (0..array_size).collect(); + positions.shuffle(&mut rng); + + let overlap_positions = &positions[..overlap_count]; + + for i in 0..array_size { + if overlap_positions.contains(&i) { + values2.push(base + i as i64); + } else { + values2.push(base + array_size as i64 + i as i64); + } } } + let values1 = Int64Array::from(values1); let values2 = Int64Array::from(values2); - let offsets2 = (0..=num_rows) + + let field = Arc::new(Field::new("item", DataType::Int64, true)); + + let offsets = (0..=num_rows) .map(|i| (i * array_size) as i32) .collect::>(); let array1 = Arc::new( ListArray::try_new( - Arc::new(Field::new("item", DataType::Int64, true)), - OffsetBuffer::new(offsets1.into()), + field.clone(), + OffsetBuffer::new(offsets.clone().into()), Arc::new(values1), None, ) @@ -171,8 +187,8 @@ fn create_arrays_with_overlap(num_rows: usize, array_size: usize, overlap_ratio: let array2 = Arc::new( ListArray::try_new( - Arc::new(Field::new("item", DataType::Int64, true)), - OffsetBuffer::new(offsets2.into()), + field, + OffsetBuffer::new(offsets.into()), Arc::new(values2), None, ) diff --git a/datafusion/functions-nested/src/set_ops.rs b/datafusion/functions-nested/src/set_ops.rs index 8799af6d491c2..4c71e7e3ab6d3 100644 --- a/datafusion/functions-nested/src/set_ops.rs +++ b/datafusion/functions-nested/src/set_ops.rs @@ -184,7 +184,7 @@ impl ScalarUDFImpl for ArrayUnion { ) )] #[derive(Debug, PartialEq, Eq, Hash)] -pub(super) struct ArrayIntersect { +pub struct ArrayIntersect { signature: Signature, aliases: Vec, } @@ -358,69 +358,84 @@ fn generic_set_lists( "{set_op:?} is not implemented for '{l:?}' and '{r:?}'" ); - let mut offsets = vec![OffsetSize::usize_as(0)]; - let mut new_arrays = vec![]; + // Convert all values to rows in batch for performance. let converter = RowConverter::new(vec![SortField::new(l.value_type())])?; - for (l_arr, r_arr) in l.iter().zip(r.iter()) { - let last_offset = *offsets.last().unwrap(); - - let (l_values, r_values) = match (l_arr, r_arr) { - (Some(l_arr), Some(r_arr)) => ( - converter.convert_columns(&[l_arr])?, - converter.convert_columns(&[r_arr])?, - ), - _ => { - offsets.push(last_offset); - continue; - } - }; - - let l_iter = l_values.iter().sorted().dedup(); - let values_set: HashSet<_> = l_iter.clone().collect(); - let mut rows = if set_op == SetOp::Union { - l_iter.collect() - } else { - vec![] - }; + let rows_l = converter.convert_columns(&[Arc::clone(l.values())])?; + let rows_r = converter.convert_columns(&[Arc::clone(r.values())])?; + let l_offsets = l.value_offsets(); + let r_offsets = r.value_offsets(); + + let mut result_offsets = Vec::with_capacity(l.len() + 1); + result_offsets.push(OffsetSize::usize_as(0)); + let mut final_rows = Vec::with_capacity(rows_l.num_rows()); + + // Reuse hash sets across iterations + let mut seen = HashSet::new(); + let mut r_set = HashSet::new(); + for i in 0..l.len() { + let last_offset = *result_offsets.last().unwrap(); + + if l.is_null(i) || r.is_null(i) { + result_offsets.push(last_offset); + continue; + } - for r_val in r_values.iter().sorted().dedup() { - match set_op { - SetOp::Union => { - if !values_set.contains(&r_val) { - rows.push(r_val); + let l_start = l_offsets[i].as_usize(); + let l_end = l_offsets[i + 1].as_usize(); + let r_start = r_offsets[i].as_usize(); + let r_end = r_offsets[i + 1].as_usize(); + + let mut count = 0usize; + // Clear sets for reuse + seen.clear(); + r_set.clear(); + + match set_op { + SetOp::Union => { + for idx in l_start..l_end { + let row = rows_l.row(idx); + if seen.insert(row) { + final_rows.push(row); + count += 1; } } - SetOp::Intersect => { - if values_set.contains(&r_val) { - rows.push(r_val); + for idx in r_start..r_end { + let row = rows_r.row(idx); + if seen.insert(row) { + final_rows.push(row); + count += 1; } } } - } - - offsets.push(last_offset + OffsetSize::usize_as(rows.len())); - let arrays = converter.convert_rows(rows)?; - let array = match arrays.first() { - Some(array) => Arc::clone(array), - None => { - return internal_err!("{set_op}: failed to get array from rows"); + SetOp::Intersect => { + // Build hash set from right array for lookup table + // then iterator left array to find common elements. + for idx in r_start..r_end { + r_set.insert(rows_r.row(idx)); + } + for idx in l_start..l_end { + let row = rows_l.row(idx); + if r_set.contains(&row) && seen.insert(row) { + final_rows.push(row); + count += 1; + } + } } - }; - - new_arrays.push(array); + } + result_offsets.push(last_offset + OffsetSize::usize_as(count)); } - let offsets = OffsetBuffer::new(offsets.into()); - let new_arrays_ref: Vec<_> = new_arrays.iter().map(|v| v.as_ref()).collect(); - let values = if new_arrays_ref.is_empty() { + let final_values = if final_rows.is_empty() { new_empty_array(&l.value_type()) } else { - compute::concat(&new_arrays_ref)? + let arrays = converter.convert_rows(final_rows)?; + Arc::clone(&arrays[0]) }; + let arr = GenericListArray::::try_new( field, - offsets, - values, + OffsetBuffer::new(result_offsets.into()), + final_values, NullBuffer::union(l.nulls(), r.nulls()), )?; Ok(Arc::new(arr)) diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index 2b98ae14d298b..8c0752d2ed577 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -6905,7 +6905,7 @@ select array_intersect(column1, column2), array_intersect(column5, column6) from array_intersect_table_1D_Boolean; ---- -[] [false, true] [false] +[] [true, false] [false] [false] [true] [true] query ??? @@ -6914,7 +6914,7 @@ select array_intersect(column1, column2), array_intersect(column5, column6) from large_array_intersect_table_1D_Boolean; ---- -[] [false, true] [false] +[] [true, false] [false] [false] [true] [true] query ??? @@ -6923,8 +6923,8 @@ select array_intersect(column1, column2), array_intersect(column5, column6) from array_intersect_table_1D_UTF8; ---- -[bc] [arrow, rust] [] -[] [arrow, datafusion, rust] [arrow, rust] +[bc] [rust, arrow] [] +[] [datafusion, rust, arrow] [rust, arrow] query ??? select array_intersect(column1, column2), @@ -6932,8 +6932,8 @@ select array_intersect(column1, column2), array_intersect(column5, column6) from large_array_intersect_table_1D_UTF8; ---- -[bc] [arrow, rust] [] -[] [arrow, datafusion, rust] [arrow, rust] +[bc] [rust, arrow] [] +[] [datafusion, rust, arrow] [rust, arrow] query ? select array_intersect(column1, column2) From 029bfb349092fa24344ffc4a5236a23fcaf54826 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Mon, 9 Feb 2026 22:37:41 +0800 Subject: [PATCH 3/7] clippy --- datafusion/functions-nested/src/set_ops.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/datafusion/functions-nested/src/set_ops.rs b/datafusion/functions-nested/src/set_ops.rs index 4c71e7e3ab6d3..2acd6b8d71408 100644 --- a/datafusion/functions-nested/src/set_ops.rs +++ b/datafusion/functions-nested/src/set_ops.rs @@ -189,6 +189,12 @@ pub struct ArrayIntersect { aliases: Vec, } +impl Default for ArrayIntersect { + fn default() -> Self { + Self::new() + } +} + impl ArrayIntersect { pub fn new() -> Self { Self { From 0eb66b9df4213cde2bfee59516e73c2600683389 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Mon, 9 Feb 2026 22:59:48 +0800 Subject: [PATCH 4/7] license --- datafusion/functions-nested/benches/array_set_ops.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/functions-nested/benches/array_set_ops.rs b/datafusion/functions-nested/benches/array_set_ops.rs index a42dc130eea12..b2d77507962f3 100644 --- a/datafusion/functions-nested/benches/array_set_ops.rs +++ b/datafusion/functions-nested/benches/array_set_ops.rs @@ -3,7 +3,7 @@ // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the -// "License"); you may not this file except in compliance +// "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 From 8284bbb055efc95d793b63fb478d2c3b321825e4 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Tue, 10 Feb 2026 22:42:24 +0800 Subject: [PATCH 5/7] simplify benches --- .../functions-nested/benches/array_set_ops.rs | 79 ++++++------------- 1 file changed, 22 insertions(+), 57 deletions(-) diff --git a/datafusion/functions-nested/benches/array_set_ops.rs b/datafusion/functions-nested/benches/array_set_ops.rs index b2d77507962f3..663665a31899e 100644 --- a/datafusion/functions-nested/benches/array_set_ops.rs +++ b/datafusion/functions-nested/benches/array_set_ops.rs @@ -28,6 +28,7 @@ use datafusion_functions_nested::set_ops::{ArrayIntersect, ArrayUnion}; use rand::SeedableRng; use rand::prelude::SliceRandom; use rand::rngs::StdRng; +use std::collections::HashSet; use std::hint::black_box; use std::sync::Arc; @@ -40,30 +41,7 @@ fn criterion_benchmark(c: &mut Criterion) { bench_array_intersect(c); } -fn invoke_array_union(udf: &impl ScalarUDFImpl, array1: &ArrayRef, array2: &ArrayRef) { - black_box( - udf.invoke_with_args(ScalarFunctionArgs { - args: vec![ - ColumnarValue::Array(array1.clone()), - ColumnarValue::Array(array2.clone()), - ], - arg_fields: vec![ - Field::new("arr1", array1.data_type().clone(), false).into(), - Field::new("arr2", array2.data_type().clone(), false).into(), - ], - number_rows: NUM_ROWS, - return_field: Field::new("result", array1.data_type().clone(), false).into(), - config_options: Arc::new(ConfigOptions::default()), - }) - .unwrap(), - ); -} - -fn invoke_array_intersect( - udf: &impl ScalarUDFImpl, - array1: &ArrayRef, - array2: &ArrayRef, -) { +fn invoke_udf(udf: &impl ScalarUDFImpl, array1: &ArrayRef, array2: &ArrayRef) { black_box( udf.invoke_with_args(ScalarFunctionArgs { args: vec![ @@ -86,22 +64,15 @@ fn bench_array_union(c: &mut Criterion) { let mut group = c.benchmark_group("array_union"); let udf = ArrayUnion::new(); - for &array_size in ARRAY_SIZES { - let (array1, array2) = create_arrays_with_overlap(NUM_ROWS, array_size, 0.8); - group.bench_with_input( - BenchmarkId::new("high_overlap", array_size), - &array_size, - |b, _| b.iter(|| invoke_array_union(&udf, &array1, &array2)), - ); - } - - for &array_size in ARRAY_SIZES { - let (array1, array2) = create_arrays_with_overlap(NUM_ROWS, array_size, 0.2); - group.bench_with_input( - BenchmarkId::new("low_overlap", array_size), - &array_size, - |b, _| b.iter(|| invoke_array_union(&udf, &array1, &array2)), - ); + for (overlap_label, overlap_ratio) in &[("high_overlap", 0.8), ("low_overlap", 0.2)] { + for &array_size in ARRAY_SIZES { + let (array1, array2) = create_arrays_with_overlap(NUM_ROWS, array_size, *overlap_ratio); + group.bench_with_input( + BenchmarkId::new(*overlap_label, array_size), + &array_size, + |b, _| b.iter(|| invoke_udf(&udf, &array1, &array2)), + ); + } } group.finish(); @@ -111,22 +82,15 @@ fn bench_array_intersect(c: &mut Criterion) { let mut group = c.benchmark_group("array_intersect"); let udf = ArrayIntersect::new(); - for &array_size in ARRAY_SIZES { - let (array1, array2) = create_arrays_with_overlap(NUM_ROWS, array_size, 0.8); - group.bench_with_input( - BenchmarkId::new("high_overlap", array_size), - &array_size, - |b, _| b.iter(|| invoke_array_intersect(&udf, &array1, &array2)), - ); - } - - for &array_size in ARRAY_SIZES { - let (array1, array2) = create_arrays_with_overlap(NUM_ROWS, array_size, 0.2); - group.bench_with_input( - BenchmarkId::new("low_overlap", array_size), - &array_size, - |b, _| b.iter(|| invoke_array_intersect(&udf, &array1, &array2)), - ); + for (overlap_label, overlap_ratio) in &[("high_overlap", 0.8), ("low_overlap", 0.2)] { + for &array_size in ARRAY_SIZES { + let (array1, array2) = create_arrays_with_overlap(NUM_ROWS, array_size, *overlap_ratio); + group.bench_with_input( + BenchmarkId::new(*overlap_label, array_size), + &array_size, + |b, _| b.iter(|| invoke_udf(&udf, &array1, &array2)), + ); + } } group.finish(); @@ -155,7 +119,8 @@ fn create_arrays_with_overlap( let mut positions: Vec = (0..array_size).collect(); positions.shuffle(&mut rng); - let overlap_positions = &positions[..overlap_count]; + let overlap_positions: HashSet<_> = + positions[..overlap_count].iter().copied().collect(); for i in 0..array_size { if overlap_positions.contains(&i) { From 0e2dc47f7bf22ceb72fa7b4c7265fb860a73400a Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Tue, 10 Feb 2026 22:44:16 +0800 Subject: [PATCH 6/7] simplify benches --- datafusion/functions-nested/benches/array_set_ops.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/datafusion/functions-nested/benches/array_set_ops.rs b/datafusion/functions-nested/benches/array_set_ops.rs index 663665a31899e..2b21c50b27823 100644 --- a/datafusion/functions-nested/benches/array_set_ops.rs +++ b/datafusion/functions-nested/benches/array_set_ops.rs @@ -66,7 +66,8 @@ fn bench_array_union(c: &mut Criterion) { for (overlap_label, overlap_ratio) in &[("high_overlap", 0.8), ("low_overlap", 0.2)] { for &array_size in ARRAY_SIZES { - let (array1, array2) = create_arrays_with_overlap(NUM_ROWS, array_size, *overlap_ratio); + let (array1, array2) = + create_arrays_with_overlap(NUM_ROWS, array_size, *overlap_ratio); group.bench_with_input( BenchmarkId::new(*overlap_label, array_size), &array_size, @@ -84,7 +85,8 @@ fn bench_array_intersect(c: &mut Criterion) { for (overlap_label, overlap_ratio) in &[("high_overlap", 0.8), ("low_overlap", 0.2)] { for &array_size in ARRAY_SIZES { - let (array1, array2) = create_arrays_with_overlap(NUM_ROWS, array_size, *overlap_ratio); + let (array1, array2) = + create_arrays_with_overlap(NUM_ROWS, array_size, *overlap_ratio); group.bench_with_input( BenchmarkId::new(*overlap_label, array_size), &array_size, From e718bc7113df54555929b2636e3d0d42cbf0d032 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Tue, 10 Feb 2026 22:57:44 +0800 Subject: [PATCH 7/7] optimize by suggestions. --- datafusion/functions-nested/src/set_ops.rs | 99 ++++++++++++++-------- 1 file changed, 66 insertions(+), 33 deletions(-) diff --git a/datafusion/functions-nested/src/set_ops.rs b/datafusion/functions-nested/src/set_ops.rs index 2acd6b8d71408..370599611feef 100644 --- a/datafusion/functions-nested/src/set_ops.rs +++ b/datafusion/functions-nested/src/set_ops.rs @@ -368,16 +368,45 @@ fn generic_set_lists( let converter = RowConverter::new(vec![SortField::new(l.value_type())])?; let rows_l = converter.convert_columns(&[Arc::clone(l.values())])?; let rows_r = converter.convert_columns(&[Arc::clone(r.values())])?; + + match set_op { + SetOp::Union => generic_set_loop::( + l, r, &rows_l, &rows_r, field, &converter, + ), + SetOp::Intersect => generic_set_loop::( + l, r, &rows_l, &rows_r, field, &converter, + ), + } +} + +/// Inner loop for set operations, parameterized by const generic to +/// avoid branching inside the hot loop. +fn generic_set_loop( + l: &GenericListArray, + r: &GenericListArray, + rows_l: &arrow::row::Rows, + rows_r: &arrow::row::Rows, + field: Arc, + converter: &RowConverter, +) -> Result { let l_offsets = l.value_offsets(); let r_offsets = r.value_offsets(); let mut result_offsets = Vec::with_capacity(l.len() + 1); result_offsets.push(OffsetSize::usize_as(0)); - let mut final_rows = Vec::with_capacity(rows_l.num_rows()); + let initial_capacity = if IS_UNION { + // Union can include all elements from both sides + rows_l.num_rows() + } else { + // Intersect result is bounded by the smaller side + rows_l.num_rows().min(rows_r.num_rows()) + }; + + let mut final_rows = Vec::with_capacity(initial_capacity); // Reuse hash sets across iterations let mut seen = HashSet::new(); - let mut r_set = HashSet::new(); + let mut lookup_set = HashSet::new(); for i in 0..l.len() { let last_offset = *result_offsets.last().unwrap(); @@ -391,44 +420,48 @@ fn generic_set_lists( let r_start = r_offsets[i].as_usize(); let r_end = r_offsets[i + 1].as_usize(); - let mut count = 0usize; - // Clear sets for reuse seen.clear(); - r_set.clear(); - - match set_op { - SetOp::Union => { - for idx in l_start..l_end { - let row = rows_l.row(idx); - if seen.insert(row) { - final_rows.push(row); - count += 1; - } - } - for idx in r_start..r_end { - let row = rows_r.row(idx); - if seen.insert(row) { - final_rows.push(row); - count += 1; - } + + if IS_UNION { + for idx in l_start..l_end { + let row = rows_l.row(idx); + if seen.insert(row) { + final_rows.push(row); } } - SetOp::Intersect => { - // Build hash set from right array for lookup table - // then iterator left array to find common elements. - for idx in r_start..r_end { - r_set.insert(rows_r.row(idx)); + for idx in r_start..r_end { + let row = rows_r.row(idx); + if seen.insert(row) { + final_rows.push(row); } - for idx in l_start..l_end { - let row = rows_l.row(idx); - if r_set.contains(&row) && seen.insert(row) { - final_rows.push(row); - count += 1; - } + } + } else { + let l_len = l_end - l_start; + let r_len = r_end - r_start; + + // Select shorter side for lookup, longer side for probing + let (lookup_rows, lookup_range, probe_rows, probe_range) = if l_len < r_len { + (rows_l, l_start..l_end, rows_r, r_start..r_end) + } else { + (rows_r, r_start..r_end, rows_l, l_start..l_end) + }; + lookup_set.clear(); + lookup_set.reserve(lookup_range.len()); + + // Build lookup table + for idx in lookup_range { + lookup_set.insert(lookup_rows.row(idx)); + } + + // Probe and emit distinct intersected rows + for idx in probe_range { + let row = probe_rows.row(idx); + if lookup_set.contains(&row) && seen.insert(row) { + final_rows.push(row); } } } - result_offsets.push(last_offset + OffsetSize::usize_as(count)); + result_offsets.push(last_offset + OffsetSize::usize_as(seen.len())); } let final_values = if final_rows.is_empty() {