diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs index abc3aba88ad48..8625772e2c995 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs @@ -21,7 +21,6 @@ use crate::aggregates::group_values::multi_group_by::{ use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; use arrow::array::{ Array, ArrayRef, AsArray, BooleanBufferBuilder, ByteView, GenericByteViewArray, - make_view, }; use arrow::buffer::{Buffer, ScalarBuffer}; use arrow::datatypes::ByteViewType; @@ -176,9 +175,9 @@ impl ByteViewGroupValueBuilder { // copy them directly instead of going through value() → make_view(). self.views.extend(rows.iter().map(|&row| arr.views()[row])); } else { - // Slow path: some strings are non-inline (>12 bytes). - // Read views directly to avoid array.value(row) overhead and - // reuse the source view's prefix instead of recomputing it via make_view. + // Slow path: some strings may be non-inline (>12 bytes). + // Pre-reserve and delegate to do_append_val_inner which + // reads raw views directly and reuses source prefixes. self.views.try_reserve(rows.len()).map_err(|e| { datafusion_common::exec_datafusion_err!( "failed to reserve {0} views: {e}", @@ -186,33 +185,7 @@ impl ByteViewGroupValueBuilder { ) })?; for &row in rows { - let view = arr.views()[row]; - let len = view as u32; - if len <= 12 { - // This row happens to be inline; copy view directly. - self.views.push(view); - } else { - let src = ByteView::from(view); - // ensure_in_progress_big_enough must be called before computing - // new_buffer_index / new_offset — it may flush in_progress to completed. - self.ensure_in_progress_big_enough(len as usize); - let new_buffer_index = self.completed.len() as u32; - let new_offset = self.in_progress.len() as u32; - let src_buf = &arr.data_buffers()[src.buffer_index as usize]; - self.in_progress.extend_from_slice( - &src_buf[src.offset as usize - ..(src.offset + src.length) as usize], - ); - // Reuse prefix from the source view — avoids re-reading first 4 bytes. - let new_view = ByteView { - length: src.length, - prefix: src.prefix, - buffer_index: new_buffer_index, - offset: new_offset, - } - .as_u128(); - self.views.push(new_view); - } + self.do_append_val_inner(arr, row); } } } @@ -230,25 +203,33 @@ impl ByteViewGroupValueBuilder { where B: ByteViewType, { - let value: &[u8] = array.value(row).as_ref(); + // SAFETY: the caller ensures `row` is valid + let view = unsafe { *array.views().get_unchecked(row) }; + let len = view as u32; - let value_len = value.len(); - let view = if value_len <= 12 { - make_view(value, 0, 0) + if len <= 12 { + // Inline value: the view is already self-contained, push as-is. + self.views.push(view); } else { - // Ensure big enough block to hold the value firstly - self.ensure_in_progress_big_enough(value_len); - - // Append value - let buffer_index = self.completed.len(); - let offset = self.in_progress.len(); - self.in_progress.extend_from_slice(value); - - make_view(value, buffer_index as u32, offset as u32) - }; - - // Append view - self.views.push(view); + // Non-inline value: copy the buffer data and construct a new view + // that points into our own buffers, reusing the source prefix. + let src = ByteView::from(view); + self.ensure_in_progress_big_enough(len as usize); + let new_buffer_index = self.completed.len() as u32; + let new_offset = self.in_progress.len() as u32; + let src_buf = &array.data_buffers()[src.buffer_index as usize]; + self.in_progress.extend_from_slice( + &src_buf[src.offset as usize..(src.offset + src.length) as usize], + ); + let new_view = ByteView { + length: src.length, + prefix: src.prefix, + buffer_index: new_buffer_index, + offset: new_offset, + } + .as_u128(); + self.views.push(new_view); + } } fn ensure_in_progress_big_enough(&mut self, value_len: usize) {