Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::aggregates::group_values::multi_group_by::{
use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
use arrow::array::{
Array, ArrayRef, AsArray, BooleanBufferBuilder, ByteView, GenericByteViewArray,
make_view,
};
use arrow::buffer::{Buffer, ScalarBuffer};
use arrow::datatypes::ByteViewType;
Expand Down Expand Up @@ -176,43 +175,17 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
// copy them directly instead of going through value() → make_view().
self.views.extend(rows.iter().map(|&row| arr.views()[row]));
} else {
// Slow path: some strings are non-inline (>12 bytes).
// Read views directly to avoid array.value(row) overhead and
// reuse the source view's prefix instead of recomputing it via make_view.
// Slow path: some strings may be non-inline (>12 bytes).
// Pre-reserve and delegate to do_append_val_inner which
// reads raw views directly and reuses source prefixes.
self.views.try_reserve(rows.len()).map_err(|e| {
datafusion_common::exec_datafusion_err!(
"failed to reserve {0} views: {e}",
rows.len()
)
})?;
for &row in rows {
let view = arr.views()[row];
let len = view as u32;
if len <= 12 {
// This row happens to be inline; copy view directly.
self.views.push(view);
} else {
let src = ByteView::from(view);
// ensure_in_progress_big_enough must be called before computing
// new_buffer_index / new_offset — it may flush in_progress to completed.
self.ensure_in_progress_big_enough(len as usize);
let new_buffer_index = self.completed.len() as u32;
let new_offset = self.in_progress.len() as u32;
let src_buf = &arr.data_buffers()[src.buffer_index as usize];
self.in_progress.extend_from_slice(
&src_buf[src.offset as usize
..(src.offset + src.length) as usize],
);
// Reuse prefix from the source view — avoids re-reading first 4 bytes.
let new_view = ByteView {
length: src.length,
prefix: src.prefix,
buffer_index: new_buffer_index,
offset: new_offset,
}
.as_u128();
self.views.push(new_view);
}
self.do_append_val_inner(arr, row);
}
}
}
Expand All @@ -230,25 +203,33 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
where
B: ByteViewType,
{
let value: &[u8] = array.value(row).as_ref();
// SAFETY: the caller ensures `row` is valid
let view = unsafe { *array.views().get_unchecked(row) };
let len = view as u32;

let value_len = value.len();
let view = if value_len <= 12 {
make_view(value, 0, 0)
if len <= 12 {
// Inline value: the view is already self-contained, push as-is.
self.views.push(view);
} else {
// Ensure big enough block to hold the value firstly
self.ensure_in_progress_big_enough(value_len);

// Append value
let buffer_index = self.completed.len();
let offset = self.in_progress.len();
self.in_progress.extend_from_slice(value);

make_view(value, buffer_index as u32, offset as u32)
};

// Append view
self.views.push(view);
// Non-inline value: copy the buffer data and construct a new view
// that points into our own buffers, reusing the source prefix.
let src = ByteView::from(view);
self.ensure_in_progress_big_enough(len as usize);
let new_buffer_index = self.completed.len() as u32;
let new_offset = self.in_progress.len() as u32;
let src_buf = &array.data_buffers()[src.buffer_index as usize];
self.in_progress.extend_from_slice(
&src_buf[src.offset as usize..(src.offset + src.length) as usize],
);
let new_view = ByteView {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if it matters, but you could make this more consice by just modifying buffer_index and offset

// Update buffer/offset and copy to output
view.buffer_index = new_buffer_index;
view.offset = new_offset;
self.views.push(view.as_u128);

length: src.length,
prefix: src.prefix,
buffer_index: new_buffer_index,
offset: new_offset,
}
.as_u128();
self.views.push(new_view);
}
}

fn ensure_in_progress_big_enough(&mut self, value_len: usize) {
Expand Down
Loading