From f4c24d50a364085c2e338b130a663b9bb7c2cc89 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Wed, 29 Apr 2026 12:58:53 +0100 Subject: [PATCH 1/6] Add a new AggregateFn for UncompressedSize stat Signed-off-by: Adam Gutglick --- vortex-array/public-api.lock | 86 +++ vortex-array/src/aggregate_fn/fns/mod.rs | 1 + .../fns/uncompressed_size_in_bytes/bool.rs | 25 + .../fns/uncompressed_size_in_bytes/decimal.rs | 35 ++ .../uncompressed_size_in_bytes/extension.rs | 15 + .../fixed_size_list.rs | 26 + .../fns/uncompressed_size_in_bytes/list.rs | 42 ++ .../fns/uncompressed_size_in_bytes/mod.rs | 568 ++++++++++++++++++ .../fns/uncompressed_size_in_bytes/null.rs | 8 + .../uncompressed_size_in_bytes/primitive.rs | 32 + .../fns/uncompressed_size_in_bytes/struct_.rs | 33 + .../uncompressed_size_in_bytes/varbinview.rs | 44 ++ vortex-array/src/aggregate_fn/session.rs | 2 + vortex-array/src/expr/stats/mod.rs | 5 +- vortex-array/src/stats/array.rs | 18 +- 15 files changed, 928 insertions(+), 12 deletions(-) create mode 100644 vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/bool.rs create mode 100644 vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/decimal.rs create mode 100644 vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/extension.rs create mode 100644 vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/fixed_size_list.rs create mode 100644 vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/list.rs create mode 100644 vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs create mode 100644 vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/null.rs create mode 100644 vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/primitive.rs create mode 100644 vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/struct_.rs create mode 100644 vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/varbinview.rs diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index 48a6af40db8..9d417916387 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -728,6 +728,56 @@ pub struct vortex_array::aggregate_fn::fns::sum::SumPartial pub fn vortex_array::aggregate_fn::fns::sum::sum(&vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub mod vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes + +pub struct vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes + +impl core::clone::Clone for vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::clone(&self) -> vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes + +impl core::fmt::Debug for vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes + +pub type vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::Options = vortex_array::aggregate_fn::EmptyOptions + +pub type vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::Partial = u64 + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::accumulate(&self, &mut Self::Partial, &vortex_array::Columnar, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<()> + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::coerce_args(&self, &Self::Options, &vortex_array::dtype::DType) -> vortex_error::VortexResult + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::combine_partials(&self, &mut Self::Partial, vortex_array::scalar::Scalar) -> vortex_error::VortexResult<()> + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::deserialize(&self, &[u8], &vortex_session::VortexSession) -> vortex_error::VortexResult + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::empty_partial(&self, &Self::Options, &vortex_array::dtype::DType) -> vortex_error::VortexResult + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::finalize(&self, vortex_array::ArrayRef) -> vortex_error::VortexResult + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::finalize_scalar(&self, &Self::Partial) -> vortex_error::VortexResult + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::id(&self) -> vortex_array::aggregate_fn::AggregateFnId + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::is_saturated(&self, &Self::Partial) -> bool + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::partial_dtype(&self, &Self::Options, &vortex_array::dtype::DType) -> core::option::Option + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::reset(&self, &mut Self::Partial) + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::return_dtype(&self, &Self::Options, &vortex_array::dtype::DType) -> core::option::Option + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::serialize(&self, &Self::Options) -> vortex_error::VortexResult>> + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::to_scalar(&self, &Self::Partial) -> vortex_error::VortexResult + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::uncompressed_size_in_bytes(&vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult + pub mod vortex_array::aggregate_fn::kernels pub trait vortex_array::aggregate_fn::kernels::DynAggregateKernel: 'static + core::marker::Send + core::marker::Sync + core::fmt::Debug @@ -1264,6 +1314,42 @@ pub fn vortex_array::aggregate_fn::fns::sum::Sum::to_scalar(&self, &Self::Partia pub fn vortex_array::aggregate_fn::fns::sum::Sum::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes + +pub type vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::Options = vortex_array::aggregate_fn::EmptyOptions + +pub type vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::Partial = u64 + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::accumulate(&self, &mut Self::Partial, &vortex_array::Columnar, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<()> + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::coerce_args(&self, &Self::Options, &vortex_array::dtype::DType) -> vortex_error::VortexResult + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::combine_partials(&self, &mut Self::Partial, vortex_array::scalar::Scalar) -> vortex_error::VortexResult<()> + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::deserialize(&self, &[u8], &vortex_session::VortexSession) -> vortex_error::VortexResult + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::empty_partial(&self, &Self::Options, &vortex_array::dtype::DType) -> vortex_error::VortexResult + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::finalize(&self, vortex_array::ArrayRef) -> vortex_error::VortexResult + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::finalize_scalar(&self, &Self::Partial) -> vortex_error::VortexResult + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::id(&self) -> vortex_array::aggregate_fn::AggregateFnId + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::is_saturated(&self, &Self::Partial) -> bool + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::partial_dtype(&self, &Self::Options, &vortex_array::dtype::DType) -> core::option::Option + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::reset(&self, &mut Self::Partial) + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::return_dtype(&self, &Self::Options, &vortex_array::dtype::DType) -> core::option::Option + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::serialize(&self, &Self::Options) -> vortex_error::VortexResult>> + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::to_scalar(&self, &Self::Partial) -> vortex_error::VortexResult + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult + impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::combined::Combined pub type vortex_array::aggregate_fn::combined::Combined::Options = vortex_array::aggregate_fn::combined::PairOptions<<::Left as vortex_array::aggregate_fn::AggregateFnVTable>::Options, <::Right as vortex_array::aggregate_fn::AggregateFnVTable>::Options> diff --git a/vortex-array/src/aggregate_fn/fns/mod.rs b/vortex-array/src/aggregate_fn/fns/mod.rs index f1281c18544..a3822648d2a 100644 --- a/vortex-array/src/aggregate_fn/fns/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/mod.rs @@ -10,3 +10,4 @@ pub mod mean; pub mod min_max; pub mod nan_count; pub mod sum; +pub mod uncompressed_size_in_bytes; diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/bool.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/bool.rs new file mode 100644 index 00000000000..6bab03af27f --- /dev/null +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/bool.rs @@ -0,0 +1,25 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_error::VortexResult; +use vortex_error::vortex_err; + +use crate::ExecutionCtx; +use crate::arrays::BoolArray; + +pub(super) fn bool_uncompressed_size_in_bytes( + array: &BoolArray, + ctx: &mut ExecutionCtx, +) -> VortexResult { + let value_size = super::packed_bit_buffer_size_in_bytes(array.len())?; + let validity_size = super::validity_uncompressed_size_in_bytes( + array + .as_ref() + .validity()? + .execute_mask(array.as_ref().len(), ctx)?, + )?; + + value_size + .checked_add(validity_size) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64")) +} diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/decimal.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/decimal.rs new file mode 100644 index 00000000000..f53474d70ff --- /dev/null +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/decimal.rs @@ -0,0 +1,35 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_error::VortexResult; +use vortex_error::vortex_err; + +use crate::ExecutionCtx; +use crate::arrays::DecimalArray; +use crate::arrays::decimal::DecimalArrayExt; +use crate::dtype::DecimalType; + +pub(super) fn decimal_uncompressed_size_in_bytes( + array: &DecimalArray, + ctx: &mut ExecutionCtx, +) -> VortexResult { + let value_size = u64::try_from(array.len()) + .map_err(|e| vortex_err!("Failed to convert decimal array length to u64: {e}"))? + .checked_mul( + u64::try_from( + DecimalType::smallest_decimal_value_type(&array.decimal_dtype()).byte_width(), + ) + .map_err(|e| vortex_err!("Failed to convert decimal byte width to u64: {e}"))?, + ) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; + let validity_size = super::validity_uncompressed_size_in_bytes( + array + .as_ref() + .validity()? + .execute_mask(array.as_ref().len(), ctx)?, + )?; + + value_size + .checked_add(validity_size) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64")) +} diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/extension.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/extension.rs new file mode 100644 index 00000000000..a0b12eb2870 --- /dev/null +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/extension.rs @@ -0,0 +1,15 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_error::VortexResult; + +use crate::ExecutionCtx; +use crate::arrays::ExtensionArray; +use crate::arrays::extension::ExtensionArrayExt; + +pub(super) fn extension_uncompressed_size_in_bytes( + array: &ExtensionArray, + ctx: &mut ExecutionCtx, +) -> VortexResult { + super::uncompressed_size_in_bytes_u64(array.storage_array(), ctx) +} diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/fixed_size_list.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/fixed_size_list.rs new file mode 100644 index 00000000000..0640d4d8520 --- /dev/null +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/fixed_size_list.rs @@ -0,0 +1,26 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_error::VortexResult; +use vortex_error::vortex_err; + +use crate::ExecutionCtx; +use crate::arrays::FixedSizeListArray; +use crate::arrays::fixed_size_list::FixedSizeListArrayExt; + +pub(super) fn fixed_size_list_uncompressed_size_in_bytes( + array: &FixedSizeListArray, + ctx: &mut ExecutionCtx, +) -> VortexResult { + let elements_size = super::uncompressed_size_in_bytes_u64(array.elements(), ctx)?; + let validity_size = super::validity_uncompressed_size_in_bytes( + array + .as_ref() + .validity()? + .execute_mask(array.as_ref().len(), ctx)?, + )?; + + elements_size + .checked_add(validity_size) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64")) +} diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/list.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/list.rs new file mode 100644 index 00000000000..733556ded29 --- /dev/null +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/list.rs @@ -0,0 +1,42 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_error::VortexResult; +use vortex_error::vortex_err; + +use crate::ExecutionCtx; +use crate::arrays::ListViewArray; +use crate::arrays::listview::ListViewArrayExt; +use crate::arrays::listview::ListViewRebuildMode; + +pub(super) fn list_uncompressed_size_in_bytes( + array: &ListViewArray, + ctx: &mut ExecutionCtx, +) -> VortexResult { + let mut size = if array.is_empty() { + 0 + } else { + let rebuilt = array.rebuild(ListViewRebuildMode::MakeExact)?; + super::uncompressed_size_in_bytes_u64(rebuilt.elements(), ctx)? + }; + + let view_buffer_size = u64::try_from(array.len()) + .map_err(|e| vortex_err!("Failed to convert list array length to u64: {e}"))? + .checked_mul(8) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; + + size = size + .checked_add(view_buffer_size) + .and_then(|size| size.checked_add(view_buffer_size)) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; + size = size + .checked_add(super::validity_uncompressed_size_in_bytes( + array + .as_ref() + .validity()? + .execute_mask(array.as_ref().len(), ctx)?, + )?) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; + + Ok(size) +} diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs new file mode 100644 index 00000000000..6d2081acafc --- /dev/null +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs @@ -0,0 +1,568 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +mod bool; +mod decimal; +mod extension; +mod fixed_size_list; +mod list; +mod null; +mod primitive; +mod struct_; +mod varbinview; + +use bool::bool_uncompressed_size_in_bytes; +use decimal::decimal_uncompressed_size_in_bytes; +use extension::extension_uncompressed_size_in_bytes; +use fixed_size_list::fixed_size_list_uncompressed_size_in_bytes; +use list::list_uncompressed_size_in_bytes; +use null::null_uncompressed_size_in_bytes; +use primitive::primitive_uncompressed_size_in_bytes; +use struct_::struct_uncompressed_size_in_bytes; +use varbinview::varbinview_uncompressed_size_in_bytes; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_err; +use vortex_mask::Mask; + +use crate::ArrayRef; +use crate::Canonical; +use crate::Columnar; +use crate::ExecutionCtx; +use crate::IntoArray; +use crate::aggregate_fn::Accumulator; +use crate::aggregate_fn::AggregateFnId; +use crate::aggregate_fn::AggregateFnVTable; +use crate::aggregate_fn::DynAccumulator; +use crate::aggregate_fn::EmptyOptions; +use crate::arrays::ConstantArray; +use crate::builders::builder_with_capacity; +use crate::dtype::DType; +use crate::dtype::Nullability::NonNullable; +use crate::dtype::PType; +use crate::expr::stats::Precision; +use crate::expr::stats::Stat; +use crate::expr::stats::StatsProvider; +use crate::scalar::Scalar; +use crate::scalar::ScalarValue; + +/// Return the uncompressed size of an array in bytes. +/// +/// See [`UncompressedSizeInBytes`] for details. +pub fn uncompressed_size_in_bytes(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { + let size = uncompressed_size_in_bytes_u64(array, ctx)?; + + usize::try_from(size) + .map_err(|e| vortex_err!("Failed to convert uncompressed size to usize: {e}")) +} + +fn uncompressed_size_in_bytes_u64(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { + if let Some(Precision::Exact(size_scalar)) = + array.statistics().get(Stat::UncompressedSizeInBytes) + { + return u64::try_from(&size_scalar) + .map_err(|e| vortex_err!("Failed to convert uncompressed size stat to u64: {e}")); + } + + let mut acc = + Accumulator::try_new(UncompressedSizeInBytes, EmptyOptions, array.dtype().clone())?; + acc.accumulate(array, ctx)?; + let result = acc.finish()?; + + let size = result + .as_primitive() + .typed_value::() + .vortex_expect("uncompressed_size_in_bytes result should not be null"); + + array.statistics().set( + Stat::UncompressedSizeInBytes, + Precision::Exact(ScalarValue::from(size)), + ); + + Ok(size) +} + +/// Sum the canonical, recursively uncompressed buffer sizes for an array. +/// +/// Applies to all types and returns a non-null `u64`. Encoding kernels can return this aggregate +/// directly from metadata to avoid decoding arrays whose uncompressed size is known. +#[derive(Clone, Debug)] +pub struct UncompressedSizeInBytes; + +impl AggregateFnVTable for UncompressedSizeInBytes { + type Options = EmptyOptions; + type Partial = u64; + + fn id(&self) -> AggregateFnId { + AggregateFnId::new("vortex.uncompressed_size_in_bytes") + } + + fn serialize(&self, _options: &Self::Options) -> VortexResult>> { + unimplemented!("UncompressedSizeInBytes is not yet serializable"); + } + + fn return_dtype(&self, _options: &Self::Options, _input_dtype: &DType) -> Option { + supports_uncompressed_size_in_bytes(_input_dtype) + .then_some(DType::Primitive(PType::U64, NonNullable)) + } + + fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option { + self.return_dtype(options, input_dtype) + } + + fn empty_partial( + &self, + _options: &Self::Options, + _input_dtype: &DType, + ) -> VortexResult { + Ok(0) + } + + fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> { + let size = other + .as_primitive() + .typed_value::() + .vortex_expect("uncompressed_size_in_bytes partial should not be null"); + *partial = partial + .checked_add(size) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; + Ok(()) + } + + fn to_scalar(&self, partial: &Self::Partial) -> VortexResult { + Ok(Scalar::primitive(*partial, NonNullable)) + } + + fn reset(&self, partial: &mut Self::Partial) { + *partial = 0; + } + + #[inline] + fn is_saturated(&self, _partial: &Self::Partial) -> bool { + false + } + + fn try_accumulate( + &self, + partial: &mut Self::Partial, + batch: &ArrayRef, + _ctx: &mut ExecutionCtx, + ) -> VortexResult { + let Some(Precision::Exact(size_scalar)) = + batch.statistics().get(Stat::UncompressedSizeInBytes) + else { + return Ok(false); + }; + + let size = u64::try_from(&size_scalar) + .map_err(|e| vortex_err!("Failed to convert uncompressed size stat to u64: {e}"))?; + *partial = partial + .checked_add(size) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; + Ok(true) + } + + fn accumulate( + &self, + partial: &mut Self::Partial, + batch: &Columnar, + ctx: &mut ExecutionCtx, + ) -> VortexResult<()> { + let size = match batch { + Columnar::Canonical(canonical) => canonical_uncompressed_size_in_bytes(canonical, ctx)?, + Columnar::Constant(constant) => { + let array = constant.clone().into_array(); + materialized_uncompressed_size_in_bytes(&array) + } + }; + *partial = partial + .checked_add(size) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; + Ok(()) + } + + fn finalize(&self, partials: ArrayRef) -> VortexResult { + Ok(partials) + } + + fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult { + self.to_scalar(partial) + } +} + +fn canonical_uncompressed_size_in_bytes( + canonical: &Canonical, + ctx: &mut ExecutionCtx, +) -> VortexResult { + match canonical { + Canonical::Null(array) => Ok(null_uncompressed_size_in_bytes(array)), + Canonical::Bool(array) => bool_uncompressed_size_in_bytes(array, ctx), + Canonical::Primitive(array) => primitive_uncompressed_size_in_bytes(array, ctx), + Canonical::Decimal(array) => decimal_uncompressed_size_in_bytes(array, ctx), + Canonical::VarBinView(array) => varbinview_uncompressed_size_in_bytes(array, ctx), + Canonical::List(array) => list_uncompressed_size_in_bytes(array, ctx), + Canonical::FixedSizeList(array) => fixed_size_list_uncompressed_size_in_bytes(array, ctx), + Canonical::Struct(array) => struct_uncompressed_size_in_bytes(array, ctx), + Canonical::Extension(array) => extension_uncompressed_size_in_bytes(array, ctx), + Canonical::Variant(_) => { + vortex_bail!("UncompressedSizeInBytes is not supported for Variant arrays") + } + } +} + +fn supports_uncompressed_size_in_bytes(dtype: &DType) -> bool { + match dtype { + DType::List(element_dtype, _) | DType::FixedSizeList(element_dtype, ..) => { + supports_uncompressed_size_in_bytes(element_dtype) + } + DType::Struct(fields, _) => fields + .fields() + .all(|field| supports_uncompressed_size_in_bytes(&field)), + DType::Extension(ext_dtype) => { + supports_uncompressed_size_in_bytes(ext_dtype.storage_dtype()) + } + DType::Variant(_) => false, + DType::Null + | DType::Bool(_) + | DType::Primitive(..) + | DType::Decimal(..) + | DType::Utf8(_) + | DType::Binary(_) => true, + } +} + +fn materialized_uncompressed_size_in_bytes(array: &ArrayRef) -> u64 { + let mut builder = builder_with_capacity(array.dtype(), array.len()); + unsafe { + builder.extend_from_array_unchecked(array); + } + builder.finish().nbytes() +} + +fn validity_uncompressed_size_in_bytes(validity: Mask) -> VortexResult { + match validity { + Mask::AllTrue(_) => Ok(0), + Mask::AllFalse(len) => Ok(ConstantArray::new(false, len).into_array().nbytes()), + Mask::Values(values) => packed_bit_buffer_size_in_bytes(values.len()), + } +} + +fn packed_bit_buffer_size_in_bytes(len: usize) -> VortexResult { + u64::try_from(len.div_ceil(8)) + .map_err(|e| vortex_err!("Failed to convert bit buffer length to u64: {e}")) +} + +#[cfg(test)] +mod tests { + use vortex_buffer::buffer; + use vortex_error::VortexResult; + use vortex_error::vortex_err; + + use crate::ArrayRef; + use crate::IntoArray; + use crate::LEGACY_SESSION; + use crate::VortexSessionExecute; + use crate::aggregate_fn::Accumulator; + use crate::aggregate_fn::AggregateFnVTable; + use crate::aggregate_fn::DynAccumulator; + use crate::aggregate_fn::EmptyOptions; + use crate::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; + use crate::aggregate_fn::fns::uncompressed_size_in_bytes::materialized_uncompressed_size_in_bytes; + use crate::aggregate_fn::fns::uncompressed_size_in_bytes::uncompressed_size_in_bytes; + use crate::arrays::BoolArray; + use crate::arrays::ChunkedArray; + use crate::arrays::ConstantArray; + use crate::arrays::DecimalArray; + use crate::arrays::ExtensionArray; + use crate::arrays::FixedSizeListArray; + use crate::arrays::ListViewArray; + use crate::arrays::NullArray; + use crate::arrays::PrimitiveArray; + use crate::arrays::StructArray; + use crate::arrays::VarBinViewArray; + use crate::arrays::VariantArray; + use crate::dtype::DType; + use crate::dtype::DecimalDType; + use crate::dtype::FieldNames; + use crate::dtype::Nullability; + use crate::dtype::PType; + use crate::expr::stats::Precision; + use crate::expr::stats::Stat; + use crate::expr::stats::StatsProvider; + use crate::extension::datetime::Date; + use crate::extension::datetime::TimeUnit; + use crate::scalar::Scalar; + use crate::scalar::ScalarValue; + use crate::validity::Validity; + + fn aggregate(array: &ArrayRef) -> VortexResult { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut acc = + Accumulator::try_new(UncompressedSizeInBytes, EmptyOptions, array.dtype().clone())?; + acc.accumulate(array, &mut ctx)?; + acc.finish()? + .as_primitive() + .typed_value::() + .ok_or_else(|| vortex_err!("uncompressed size result should not be null")) + } + + #[test] + fn primitive_matches_materialized_size() -> VortexResult<()> { + let array = PrimitiveArray::new(buffer![1i32, 2, 3, 4], Validity::NonNullable).into_array(); + + assert_eq!( + aggregate(&array)?, + materialized_uncompressed_size_in_bytes(&array) + ); + Ok(()) + } + + #[test] + fn nullable_primitive_matches_materialized_size() -> VortexResult<()> { + let array = PrimitiveArray::from_option_iter([Some(1i32), None, Some(3)]).into_array(); + + assert_eq!( + aggregate(&array)?, + materialized_uncompressed_size_in_bytes(&array) + ); + Ok(()) + } + + #[test] + fn all_invalid_primitive_matches_materialized_size() -> VortexResult<()> { + let array = PrimitiveArray::new(buffer![0i32, 0, 0], Validity::AllInvalid).into_array(); + + assert_eq!( + aggregate(&array)?, + materialized_uncompressed_size_in_bytes(&array) + ); + Ok(()) + } + + #[test] + fn bool_matches_materialized_size() -> VortexResult<()> { + let array = BoolArray::from_iter([true, false, true, true, false]).into_array(); + + assert_eq!( + aggregate(&array)?, + materialized_uncompressed_size_in_bytes(&array) + ); + Ok(()) + } + + #[test] + fn nullable_bool_matches_materialized_size() -> VortexResult<()> { + let array = BoolArray::from_iter([Some(true), None, Some(false), Some(true)]).into_array(); + + assert_eq!( + aggregate(&array)?, + materialized_uncompressed_size_in_bytes(&array) + ); + Ok(()) + } + + #[test] + fn all_invalid_bool_matches_materialized_size() -> VortexResult<()> { + let array = BoolArray::from_iter([None::, None, None]).into_array(); + + assert_eq!( + aggregate(&array)?, + materialized_uncompressed_size_in_bytes(&array) + ); + Ok(()) + } + + #[test] + fn null_matches_materialized_size() -> VortexResult<()> { + let array = NullArray::new(5).into_array(); + + assert_eq!( + aggregate(&array)?, + materialized_uncompressed_size_in_bytes(&array) + ); + Ok(()) + } + + #[test] + fn decimal_matches_materialized_size() -> VortexResult<()> { + let array = DecimalArray::new( + buffer![12345i64, -123i64, 0i64], + DecimalDType::new(5, 2), + Validity::NonNullable, + ) + .into_array(); + + assert_eq!( + aggregate(&array)?, + materialized_uncompressed_size_in_bytes(&array) + ); + Ok(()) + } + + #[test] + fn varbinview_matches_materialized_size() -> VortexResult<()> { + let array = VarBinViewArray::from_iter_nullable_str([ + Some("short"), + None, + Some("this string is longer than twelve bytes"), + ]) + .into_array(); + + assert_eq!( + aggregate(&array)?, + materialized_uncompressed_size_in_bytes(&array) + ); + Ok(()) + } + + #[test] + fn list_matches_materialized_size() -> VortexResult<()> { + let elements = + PrimitiveArray::new(buffer![1i32, 2, 3, 4], Validity::NonNullable).into_array(); + let offsets = buffer![2u32, 0].into_array(); + let sizes = buffer![2u32, 1].into_array(); + let array = + ListViewArray::new(elements, offsets, sizes, Validity::NonNullable).into_array(); + + assert_eq!( + aggregate(&array)?, + materialized_uncompressed_size_in_bytes(&array) + ); + Ok(()) + } + + #[test] + fn fixed_size_list_matches_materialized_size() -> VortexResult<()> { + let elements = + PrimitiveArray::from_option_iter([Some(1i32), None, Some(3), Some(4)]).into_array(); + let array = FixedSizeListArray::new(elements, 2, Validity::NonNullable, 2).into_array(); + + assert_eq!( + aggregate(&array)?, + materialized_uncompressed_size_in_bytes(&array) + ); + Ok(()) + } + + #[test] + fn struct_matches_materialized_size() -> VortexResult<()> { + let ints = PrimitiveArray::from_option_iter([Some(1i32), None, Some(3)]).into_array(); + let strings = VarBinViewArray::from_iter_nullable_str([Some("alpha"), None, Some("omega")]) + .into_array(); + let array = StructArray::try_new( + FieldNames::from(["ints", "strings"]), + vec![ints, strings], + 3, + Validity::NonNullable, + )? + .into_array(); + + assert_eq!( + aggregate(&array)?, + materialized_uncompressed_size_in_bytes(&array) + ); + Ok(()) + } + + #[test] + fn extension_matches_materialized_size() -> VortexResult<()> { + let storage = PrimitiveArray::from_option_iter([Some(1i32), None, Some(3)]).into_array(); + let ext_dtype = Date::new(TimeUnit::Days, Nullability::Nullable).erased(); + let array = ExtensionArray::new(ext_dtype, storage).into_array(); + + assert_eq!( + aggregate(&array)?, + materialized_uncompressed_size_in_bytes(&array) + ); + Ok(()) + } + + #[test] + fn variant_stat_is_unsupported() -> VortexResult<()> { + let child = ConstantArray::new(Scalar::variant(Scalar::from(42i32)), 3).into_array(); + let array = VariantArray::new(child).into_array(); + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + + assert_eq!( + array + .statistics() + .compute_uncompressed_size_in_bytes(&mut ctx), + None + ); + Ok(()) + } + + #[test] + fn constant_matches_materialized_size() -> VortexResult<()> { + let array = ConstantArray::new(42i32, 10).into_array(); + + assert_eq!( + aggregate(&array)?, + materialized_uncompressed_size_in_bytes(&array) + ); + Ok(()) + } + + #[test] + fn chunked_sums_chunk_sizes() -> VortexResult<()> { + let chunk1 = PrimitiveArray::new(buffer![1i32, 2, 3], Validity::NonNullable).into_array(); + let chunk2 = PrimitiveArray::new(buffer![4i32, 5], Validity::NonNullable).into_array(); + let expected = materialized_uncompressed_size_in_bytes(&chunk1) + + materialized_uncompressed_size_in_bytes(&chunk2); + let chunked = ChunkedArray::try_new( + vec![chunk1, chunk2], + DType::Primitive(PType::I32, Nullability::NonNullable), + )? + .into_array(); + + assert_eq!(aggregate(&chunked)?, expected); + Ok(()) + } + + #[test] + fn uses_cached_exact_stat() -> VortexResult<()> { + let array = ConstantArray::new(42i32, 10).into_array(); + array.statistics().set( + Stat::UncompressedSizeInBytes, + Precision::Exact(ScalarValue::from(123u64)), + ); + + assert_eq!(aggregate(&array)?, 123); + Ok(()) + } + + #[test] + fn helper_caches_result() -> VortexResult<()> { + let array = PrimitiveArray::new(buffer![1i32, 2, 3], Validity::NonNullable).into_array(); + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + + let size = uncompressed_size_in_bytes(&array, &mut ctx)?; + + assert_eq!( + array.statistics().get(Stat::UncompressedSizeInBytes), + Some(Precision::exact(u64::try_from(size)?)) + ); + Ok(()) + } + + #[test] + fn state_merge() -> VortexResult<()> { + let dtype = DType::Primitive(PType::I32, Nullability::NonNullable); + let mut state = UncompressedSizeInBytes.empty_partial(&EmptyOptions, &dtype)?; + + UncompressedSizeInBytes.combine_partials( + &mut state, + Scalar::primitive(5u64, Nullability::NonNullable), + )?; + UncompressedSizeInBytes.combine_partials( + &mut state, + Scalar::primitive(3u64, Nullability::NonNullable), + )?; + + let result = UncompressedSizeInBytes.to_scalar(&state)?; + UncompressedSizeInBytes.reset(&mut state); + assert_eq!(result.as_primitive().typed_value::(), Some(8)); + Ok(()) + } +} diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/null.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/null.rs new file mode 100644 index 00000000000..e2687e9d19d --- /dev/null +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/null.rs @@ -0,0 +1,8 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use crate::arrays::NullArray; + +pub(super) fn null_uncompressed_size_in_bytes(_array: &NullArray) -> u64 { + 0 +} diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/primitive.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/primitive.rs new file mode 100644 index 00000000000..3b9a06cbcc3 --- /dev/null +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/primitive.rs @@ -0,0 +1,32 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_error::VortexResult; +use vortex_error::vortex_err; + +use crate::ExecutionCtx; +use crate::arrays::PrimitiveArray; +use crate::arrays::primitive::PrimitiveArrayExt; + +pub(super) fn primitive_uncompressed_size_in_bytes( + array: &PrimitiveArray, + ctx: &mut ExecutionCtx, +) -> VortexResult { + let value_size = u64::try_from(array.len()) + .map_err(|e| vortex_err!("Failed to convert primitive array length to u64: {e}"))? + .checked_mul( + u64::try_from(array.ptype().byte_width()) + .map_err(|e| vortex_err!("Failed to convert primitive byte width to u64: {e}"))?, + ) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; + let validity_size = super::validity_uncompressed_size_in_bytes( + array + .as_ref() + .validity()? + .execute_mask(array.as_ref().len(), ctx)?, + )?; + + value_size + .checked_add(validity_size) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64")) +} diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/struct_.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/struct_.rs new file mode 100644 index 00000000000..025a33a3063 --- /dev/null +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/struct_.rs @@ -0,0 +1,33 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_error::VortexResult; +use vortex_error::vortex_err; + +use crate::ExecutionCtx; +use crate::arrays::StructArray; +use crate::arrays::struct_::StructArrayExt; + +pub(super) fn struct_uncompressed_size_in_bytes( + array: &StructArray, + ctx: &mut ExecutionCtx, +) -> VortexResult { + let mut size = 0u64; + + for field in array.iter_unmasked_fields() { + size = size + .checked_add(super::uncompressed_size_in_bytes_u64(field, ctx)?) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; + } + + size = size + .checked_add(super::validity_uncompressed_size_in_bytes( + array + .as_ref() + .validity()? + .execute_mask(array.as_ref().len(), ctx)?, + )?) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; + + Ok(size) +} diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/varbinview.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/varbinview.rs new file mode 100644 index 00000000000..851af14a8df --- /dev/null +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/varbinview.rs @@ -0,0 +1,44 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::mem::size_of; + +use vortex_error::VortexResult; +use vortex_error::vortex_err; + +use crate::ExecutionCtx; +use crate::arrays::VarBinViewArray; +use crate::arrays::varbinview::BinaryView; + +pub(super) fn varbinview_uncompressed_size_in_bytes( + array: &VarBinViewArray, + ctx: &mut ExecutionCtx, +) -> VortexResult { + let mut size = u64::try_from(array.len()) + .map_err(|e| vortex_err!("Failed to convert varbinview array length to u64: {e}"))? + .checked_mul( + u64::try_from(size_of::()) + .map_err(|e| vortex_err!("Failed to convert binary view width to u64: {e}"))?, + ) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; + + for buffer in array.data_buffers().iter() { + size = size + .checked_add( + u64::try_from(buffer.len()) + .map_err(|e| vortex_err!("Failed to convert data buffer length to u64: {e}"))?, + ) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; + } + + size = size + .checked_add(super::validity_uncompressed_size_in_bytes( + array + .as_ref() + .validity()? + .execute_mask(array.as_ref().len(), ctx)?, + )?) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; + + Ok(size) +} diff --git a/vortex-array/src/aggregate_fn/session.rs b/vortex-array/src/aggregate_fn/session.rs index 17720398f6c..d04f0dcd29a 100644 --- a/vortex-array/src/aggregate_fn/session.rs +++ b/vortex-array/src/aggregate_fn/session.rs @@ -21,6 +21,7 @@ use crate::aggregate_fn::fns::last::Last; use crate::aggregate_fn::fns::min_max::MinMax; use crate::aggregate_fn::fns::nan_count::NanCount; use crate::aggregate_fn::fns::sum::Sum; +use crate::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; use crate::aggregate_fn::kernels::DynAggregateKernel; use crate::aggregate_fn::kernels::DynGroupedAggregateKernel; use crate::array::ArrayId; @@ -72,6 +73,7 @@ impl Default for AggregateFnSession { this.register(MinMax); this.register(NanCount); this.register(Sum); + this.register(UncompressedSizeInBytes); // Register the built-in aggregate kernels. this.register_aggregate_kernel(Chunked.id(), None::, &ChunkedArrayAggregate); diff --git a/vortex-array/src/expr/stats/mod.rs b/vortex-array/src/expr/stats/mod.rs index 53f50bbb9e0..a06be0b21f8 100644 --- a/vortex-array/src/expr/stats/mod.rs +++ b/vortex-array/src/expr/stats/mod.rs @@ -173,7 +173,10 @@ impl Stat { Self::Min if matches!(data_type, DType::Null) => return None, Self::Min => data_type.clone(), Self::NullCount => DType::Primitive(PType::U64, NonNullable), - Self::UncompressedSizeInBytes => DType::Primitive(PType::U64, NonNullable), + Self::UncompressedSizeInBytes => { + return aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes + .return_dtype(&EmptyOptions, data_type); + } Self::NaNCount => { return aggregate_fn::fns::nan_count::NanCount .return_dtype(&EmptyOptions, data_type); diff --git a/vortex-array/src/stats/array.rs b/vortex-array/src/stats/array.rs index 248ca3587f2..fd41090c528 100644 --- a/vortex-array/src/stats/array.rs +++ b/vortex-array/src/stats/array.rs @@ -23,7 +23,7 @@ use crate::aggregate_fn::fns::min_max::MinMaxResult; use crate::aggregate_fn::fns::min_max::min_max; use crate::aggregate_fn::fns::nan_count::nan_count; use crate::aggregate_fn::fns::sum::sum; -use crate::builders::builder_with_capacity; +use crate::aggregate_fn::fns::uncompressed_size_in_bytes::uncompressed_size_in_bytes; use crate::expr::stats::Precision; use crate::expr::stats::Stat; use crate::expr::stats::StatsProvider; @@ -182,16 +182,12 @@ impl StatsSetRef<'_> { } Stat::IsSorted => Some(is_sorted(self.dyn_array_ref, ctx)?.into()), Stat::IsStrictSorted => Some(is_strict_sorted(self.dyn_array_ref, ctx)?.into()), - Stat::UncompressedSizeInBytes => { - let mut builder = - builder_with_capacity(self.dyn_array_ref.dtype(), self.dyn_array_ref.len()); - unsafe { - builder.extend_from_array_unchecked(self.dyn_array_ref); - } - let nbytes = builder.finish().nbytes(); - self.set(stat, Precision::exact(nbytes)); - Some(nbytes.into()) - } + Stat::UncompressedSizeInBytes => Stat::UncompressedSizeInBytes + .dtype(self.dyn_array_ref.dtype()) + .is_some() + .then(|| uncompressed_size_in_bytes(self.dyn_array_ref, ctx)) + .transpose()? + .map(|s| s.into()), Stat::NaNCount => { Stat::NaNCount .dtype(self.dyn_array_ref.dtype()) From 88a81cc7a8008e099253650604350fa20400f1c6 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 30 Apr 2026 09:22:00 +0100 Subject: [PATCH 2/6] CR Signed-off-by: Adam Gutglick --- .../aggregate_fn/fns/uncompressed_size_in_bytes/bool.rs | 6 ++++-- .../fns/uncompressed_size_in_bytes/decimal.rs | 3 ++- .../fns/uncompressed_size_in_bytes/extension.rs | 3 ++- .../fns/uncompressed_size_in_bytes/fixed_size_list.rs | 6 ++++-- .../uncompressed_size_in_bytes/{list.rs => list_view.rs} | 9 ++++++--- .../aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs | 6 +++--- .../fns/uncompressed_size_in_bytes/primitive.rs | 3 ++- .../fns/uncompressed_size_in_bytes/struct_.rs | 6 ++++-- .../fns/uncompressed_size_in_bytes/varbinview.rs | 3 ++- 9 files changed, 29 insertions(+), 16 deletions(-) rename vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/{list.rs => list_view.rs} (79%) diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/bool.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/bool.rs index 6bab03af27f..4c5c0a4df8d 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/bool.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/bool.rs @@ -4,6 +4,8 @@ use vortex_error::VortexResult; use vortex_error::vortex_err; +use super::packed_bit_buffer_size_in_bytes; +use super::validity_uncompressed_size_in_bytes; use crate::ExecutionCtx; use crate::arrays::BoolArray; @@ -11,8 +13,8 @@ pub(super) fn bool_uncompressed_size_in_bytes( array: &BoolArray, ctx: &mut ExecutionCtx, ) -> VortexResult { - let value_size = super::packed_bit_buffer_size_in_bytes(array.len())?; - let validity_size = super::validity_uncompressed_size_in_bytes( + let value_size = packed_bit_buffer_size_in_bytes(array.len())?; + let validity_size = validity_uncompressed_size_in_bytes( array .as_ref() .validity()? diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/decimal.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/decimal.rs index f53474d70ff..c04fa55f0ab 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/decimal.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/decimal.rs @@ -4,6 +4,7 @@ use vortex_error::VortexResult; use vortex_error::vortex_err; +use super::validity_uncompressed_size_in_bytes; use crate::ExecutionCtx; use crate::arrays::DecimalArray; use crate::arrays::decimal::DecimalArrayExt; @@ -22,7 +23,7 @@ pub(super) fn decimal_uncompressed_size_in_bytes( .map_err(|e| vortex_err!("Failed to convert decimal byte width to u64: {e}"))?, ) .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; - let validity_size = super::validity_uncompressed_size_in_bytes( + let validity_size = validity_uncompressed_size_in_bytes( array .as_ref() .validity()? diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/extension.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/extension.rs index a0b12eb2870..a125c417ea3 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/extension.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/extension.rs @@ -3,6 +3,7 @@ use vortex_error::VortexResult; +use super::uncompressed_size_in_bytes_u64; use crate::ExecutionCtx; use crate::arrays::ExtensionArray; use crate::arrays::extension::ExtensionArrayExt; @@ -11,5 +12,5 @@ pub(super) fn extension_uncompressed_size_in_bytes( array: &ExtensionArray, ctx: &mut ExecutionCtx, ) -> VortexResult { - super::uncompressed_size_in_bytes_u64(array.storage_array(), ctx) + uncompressed_size_in_bytes_u64(array.storage_array(), ctx) } diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/fixed_size_list.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/fixed_size_list.rs index 0640d4d8520..7f8d55b07c1 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/fixed_size_list.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/fixed_size_list.rs @@ -4,6 +4,8 @@ use vortex_error::VortexResult; use vortex_error::vortex_err; +use super::uncompressed_size_in_bytes_u64; +use super::validity_uncompressed_size_in_bytes; use crate::ExecutionCtx; use crate::arrays::FixedSizeListArray; use crate::arrays::fixed_size_list::FixedSizeListArrayExt; @@ -12,8 +14,8 @@ pub(super) fn fixed_size_list_uncompressed_size_in_bytes( array: &FixedSizeListArray, ctx: &mut ExecutionCtx, ) -> VortexResult { - let elements_size = super::uncompressed_size_in_bytes_u64(array.elements(), ctx)?; - let validity_size = super::validity_uncompressed_size_in_bytes( + let elements_size = uncompressed_size_in_bytes_u64(array.elements(), ctx)?; + let validity_size = validity_uncompressed_size_in_bytes( array .as_ref() .validity()? diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/list.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/list_view.rs similarity index 79% rename from vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/list.rs rename to vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/list_view.rs index 733556ded29..919430195d9 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/list.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/list_view.rs @@ -4,12 +4,14 @@ use vortex_error::VortexResult; use vortex_error::vortex_err; +use super::uncompressed_size_in_bytes_u64; +use super::validity_uncompressed_size_in_bytes; use crate::ExecutionCtx; use crate::arrays::ListViewArray; use crate::arrays::listview::ListViewArrayExt; use crate::arrays::listview::ListViewRebuildMode; -pub(super) fn list_uncompressed_size_in_bytes( +pub(super) fn list_view_uncompressed_size_in_bytes( array: &ListViewArray, ctx: &mut ExecutionCtx, ) -> VortexResult { @@ -17,7 +19,7 @@ pub(super) fn list_uncompressed_size_in_bytes( 0 } else { let rebuilt = array.rebuild(ListViewRebuildMode::MakeExact)?; - super::uncompressed_size_in_bytes_u64(rebuilt.elements(), ctx)? + uncompressed_size_in_bytes_u64(rebuilt.elements(), ctx)? }; let view_buffer_size = u64::try_from(array.len()) @@ -25,12 +27,13 @@ pub(super) fn list_uncompressed_size_in_bytes( .checked_mul(8) .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; + // ListView stores both offsets and sizes as u64 view buffers. size = size .checked_add(view_buffer_size) .and_then(|size| size.checked_add(view_buffer_size)) .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; size = size - .checked_add(super::validity_uncompressed_size_in_bytes( + .checked_add(validity_uncompressed_size_in_bytes( array .as_ref() .validity()? diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs index 6d2081acafc..dbf700c4c9b 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs @@ -5,7 +5,7 @@ mod bool; mod decimal; mod extension; mod fixed_size_list; -mod list; +mod list_view; mod null; mod primitive; mod struct_; @@ -15,7 +15,7 @@ use bool::bool_uncompressed_size_in_bytes; use decimal::decimal_uncompressed_size_in_bytes; use extension::extension_uncompressed_size_in_bytes; use fixed_size_list::fixed_size_list_uncompressed_size_in_bytes; -use list::list_uncompressed_size_in_bytes; +use list_view::list_view_uncompressed_size_in_bytes; use null::null_uncompressed_size_in_bytes; use primitive::primitive_uncompressed_size_in_bytes; use struct_::struct_uncompressed_size_in_bytes; @@ -201,7 +201,7 @@ fn canonical_uncompressed_size_in_bytes( Canonical::Primitive(array) => primitive_uncompressed_size_in_bytes(array, ctx), Canonical::Decimal(array) => decimal_uncompressed_size_in_bytes(array, ctx), Canonical::VarBinView(array) => varbinview_uncompressed_size_in_bytes(array, ctx), - Canonical::List(array) => list_uncompressed_size_in_bytes(array, ctx), + Canonical::List(array) => list_view_uncompressed_size_in_bytes(array, ctx), Canonical::FixedSizeList(array) => fixed_size_list_uncompressed_size_in_bytes(array, ctx), Canonical::Struct(array) => struct_uncompressed_size_in_bytes(array, ctx), Canonical::Extension(array) => extension_uncompressed_size_in_bytes(array, ctx), diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/primitive.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/primitive.rs index 3b9a06cbcc3..8ca149e791a 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/primitive.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/primitive.rs @@ -4,6 +4,7 @@ use vortex_error::VortexResult; use vortex_error::vortex_err; +use super::validity_uncompressed_size_in_bytes; use crate::ExecutionCtx; use crate::arrays::PrimitiveArray; use crate::arrays::primitive::PrimitiveArrayExt; @@ -19,7 +20,7 @@ pub(super) fn primitive_uncompressed_size_in_bytes( .map_err(|e| vortex_err!("Failed to convert primitive byte width to u64: {e}"))?, ) .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; - let validity_size = super::validity_uncompressed_size_in_bytes( + let validity_size = validity_uncompressed_size_in_bytes( array .as_ref() .validity()? diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/struct_.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/struct_.rs index 025a33a3063..ffed40babd2 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/struct_.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/struct_.rs @@ -4,6 +4,8 @@ use vortex_error::VortexResult; use vortex_error::vortex_err; +use super::uncompressed_size_in_bytes_u64; +use super::validity_uncompressed_size_in_bytes; use crate::ExecutionCtx; use crate::arrays::StructArray; use crate::arrays::struct_::StructArrayExt; @@ -16,12 +18,12 @@ pub(super) fn struct_uncompressed_size_in_bytes( for field in array.iter_unmasked_fields() { size = size - .checked_add(super::uncompressed_size_in_bytes_u64(field, ctx)?) + .checked_add(uncompressed_size_in_bytes_u64(field, ctx)?) .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; } size = size - .checked_add(super::validity_uncompressed_size_in_bytes( + .checked_add(validity_uncompressed_size_in_bytes( array .as_ref() .validity()? diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/varbinview.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/varbinview.rs index 851af14a8df..820ec8ca88d 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/varbinview.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/varbinview.rs @@ -6,6 +6,7 @@ use std::mem::size_of; use vortex_error::VortexResult; use vortex_error::vortex_err; +use super::validity_uncompressed_size_in_bytes; use crate::ExecutionCtx; use crate::arrays::VarBinViewArray; use crate::arrays::varbinview::BinaryView; @@ -32,7 +33,7 @@ pub(super) fn varbinview_uncompressed_size_in_bytes( } size = size - .checked_add(super::validity_uncompressed_size_in_bytes( + .checked_add(validity_uncompressed_size_in_bytes( array .as_ref() .validity()? From 59ba0890266b9d958a7d6a278ff6332325a688a9 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Fri, 1 May 2026 11:59:59 +0100 Subject: [PATCH 3/6] Update doc Signed-off-by: Adam Gutglick --- .../aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs index dbf700c4c9b..1c9a97fcdb4 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs @@ -83,10 +83,12 @@ fn uncompressed_size_in_bytes_u64(array: &ArrayRef, ctx: &mut ExecutionCtx) -> V Ok(size) } -/// Sum the canonical, recursively uncompressed buffer sizes for an array. +/// Sum the canonical, recursively **uncompressed** data size for an array. /// /// Applies to all types and returns a non-null `u64`. Encoding kernels can return this aggregate /// directly from metadata to avoid decoding arrays whose uncompressed size is known. +/// +/// This is generally useful for various execution engines to pick better join orderings. #[derive(Clone, Debug)] pub struct UncompressedSizeInBytes; @@ -102,8 +104,8 @@ impl AggregateFnVTable for UncompressedSizeInBytes { unimplemented!("UncompressedSizeInBytes is not yet serializable"); } - fn return_dtype(&self, _options: &Self::Options, _input_dtype: &DType) -> Option { - supports_uncompressed_size_in_bytes(_input_dtype) + fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option { + supports_uncompressed_size_in_bytes(input_dtype) .then_some(DType::Primitive(PType::U64, NonNullable)) } From 673cf827070ed396cd238d4ec1430bafc7b7cdec Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 7 May 2026 16:01:27 +0100 Subject: [PATCH 4/6] Constant kernel Signed-off-by: Adam Gutglick --- .../uncompressed_size_in_bytes/list_view.rs | 4 +- .../fns/uncompressed_size_in_bytes/mod.rs | 85 +++++++++++- vortex-array/src/aggregate_fn/session.rs | 7 + .../src/arrays/constant/compute/mod.rs | 1 + .../constant/compute/uncompressed_size.rs | 128 ++++++++++++++++++ 5 files changed, 217 insertions(+), 8 deletions(-) create mode 100644 vortex-array/src/arrays/constant/compute/uncompressed_size.rs diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/list_view.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/list_view.rs index 919430195d9..39f73d9fbc0 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/list_view.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/list_view.rs @@ -9,7 +9,6 @@ use super::validity_uncompressed_size_in_bytes; use crate::ExecutionCtx; use crate::arrays::ListViewArray; use crate::arrays::listview::ListViewArrayExt; -use crate::arrays::listview::ListViewRebuildMode; pub(super) fn list_view_uncompressed_size_in_bytes( array: &ListViewArray, @@ -18,8 +17,7 @@ pub(super) fn list_view_uncompressed_size_in_bytes( let mut size = if array.is_empty() { 0 } else { - let rebuilt = array.rebuild(ListViewRebuildMode::MakeExact)?; - uncompressed_size_in_bytes_u64(rebuilt.elements(), ctx)? + uncompressed_size_in_bytes_u64(array.elements(), ctx)? }; let view_buffer_size = u64::try_from(array.len()) diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs index 1c9a97fcdb4..aa166c6c478 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs @@ -11,6 +11,8 @@ mod primitive; mod struct_; mod varbinview; +use std::mem::size_of; + use bool::bool_uncompressed_size_in_bytes; use decimal::decimal_uncompressed_size_in_bytes; use extension::extension_uncompressed_size_in_bytes; @@ -36,9 +38,14 @@ use crate::aggregate_fn::AggregateFnId; use crate::aggregate_fn::AggregateFnVTable; use crate::aggregate_fn::DynAccumulator; use crate::aggregate_fn::EmptyOptions; +use crate::array::ArrayView; +use crate::arrays::Constant; use crate::arrays::ConstantArray; +use crate::arrays::varbinview::BinaryView; +#[cfg(test)] use crate::builders::builder_with_capacity; use crate::dtype::DType; +use crate::dtype::DecimalType; use crate::dtype::Nullability::NonNullable; use crate::dtype::PType; use crate::expr::stats::Precision; @@ -174,8 +181,7 @@ impl AggregateFnVTable for UncompressedSizeInBytes { let size = match batch { Columnar::Canonical(canonical) => canonical_uncompressed_size_in_bytes(canonical, ctx)?, Columnar::Constant(constant) => { - let array = constant.clone().into_array(); - materialized_uncompressed_size_in_bytes(&array) + constant_uncompressed_size_in_bytes(constant.as_view(), ctx)? } }; *partial = partial @@ -193,7 +199,7 @@ impl AggregateFnVTable for UncompressedSizeInBytes { } } -fn canonical_uncompressed_size_in_bytes( +pub(crate) fn canonical_uncompressed_size_in_bytes( canonical: &Canonical, ctx: &mut ExecutionCtx, ) -> VortexResult { @@ -213,6 +219,74 @@ fn canonical_uncompressed_size_in_bytes( } } +pub(crate) fn constant_uncompressed_size_in_bytes( + array: ArrayView<'_, Constant>, + ctx: &mut ExecutionCtx, +) -> VortexResult { + let value_size = match array.dtype() { + DType::Null => return Ok(0), + DType::Bool(_) => packed_bit_buffer_size_in_bytes(array.len())?, + DType::Primitive(ptype, _) => { + checked_len_mul(array.len(), ptype.byte_width(), "primitive")? + } + DType::Decimal(decimal_type, _) => checked_len_mul( + array.len(), + DecimalType::smallest_decimal_value_type(decimal_type).byte_width(), + "decimal", + )?, + DType::Utf8(_) => constant_varbinview_value_size( + array.len(), + array.scalar().as_utf8().value().map(|value| value.len()), + )?, + DType::Binary(_) => constant_varbinview_value_size( + array.len(), + array.scalar().as_binary().value().map(|value| value.len()), + )?, + DType::Variant(_) => { + vortex_bail!("UncompressedSizeInBytes is not supported for Variant arrays") + } + DType::Struct(..) | DType::List(..) | DType::FixedSizeList(..) | DType::Extension(_) => { + let canonical = array.array().clone().execute::(ctx)?; + return canonical_uncompressed_size_in_bytes(&canonical, ctx); + } + }; + + value_size + .checked_add(constant_validity_size(array, ctx)?) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64")) +} + +fn constant_varbinview_value_size(len: usize, scalar_len: Option) -> VortexResult { + let views_size = checked_len_mul(len, size_of::(), "binary view")?; + let data_size = match scalar_len { + Some(scalar_len) if scalar_len >= BinaryView::MAX_INLINED_SIZE => u64::try_from(scalar_len) + .map_err(|e| vortex_err!("Failed to convert data buffer length to u64: {e}"))?, + _ => 0, + }; + + views_size + .checked_add(data_size) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64")) +} + +fn constant_validity_size( + array: ArrayView<'_, Constant>, + ctx: &mut ExecutionCtx, +) -> VortexResult { + let validity = array.validity()?.execute_mask(array.len(), ctx)?; + validity_uncompressed_size_in_bytes(validity) +} + +fn checked_len_mul(len: usize, width: usize, name: &str) -> VortexResult { + let len = u64::try_from(len) + .map_err(|e| vortex_err!("Failed to convert {name} length to u64: {e}"))?; + let width = u64::try_from(width) + .map_err(|e| vortex_err!("Failed to convert {name} byte width to u64: {e}"))?; + + len.checked_mul(width) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64")) +} + fn supports_uncompressed_size_in_bytes(dtype: &DType) -> bool { match dtype { DType::List(element_dtype, _) | DType::FixedSizeList(element_dtype, ..) => { @@ -234,6 +308,7 @@ fn supports_uncompressed_size_in_bytes(dtype: &DType) -> bool { } } +#[cfg(test)] fn materialized_uncompressed_size_in_bytes(array: &ArrayRef) -> u64 { let mut builder = builder_with_capacity(array.dtype(), array.len()); unsafe { @@ -242,7 +317,7 @@ fn materialized_uncompressed_size_in_bytes(array: &ArrayRef) -> u64 { builder.finish().nbytes() } -fn validity_uncompressed_size_in_bytes(validity: Mask) -> VortexResult { +pub(crate) fn validity_uncompressed_size_in_bytes(validity: Mask) -> VortexResult { match validity { Mask::AllTrue(_) => Ok(0), Mask::AllFalse(len) => Ok(ConstantArray::new(false, len).into_array().nbytes()), @@ -250,7 +325,7 @@ fn validity_uncompressed_size_in_bytes(validity: Mask) -> VortexResult { } } -fn packed_bit_buffer_size_in_bytes(len: usize) -> VortexResult { +pub(crate) fn packed_bit_buffer_size_in_bytes(len: usize) -> VortexResult { u64::try_from(len.div_ceil(8)) .map_err(|e| vortex_err!("Failed to convert bit buffer length to u64: {e}")) } diff --git a/vortex-array/src/aggregate_fn/session.rs b/vortex-array/src/aggregate_fn/session.rs index d04f0dcd29a..bde19e93ac6 100644 --- a/vortex-array/src/aggregate_fn/session.rs +++ b/vortex-array/src/aggregate_fn/session.rs @@ -27,8 +27,10 @@ use crate::aggregate_fn::kernels::DynGroupedAggregateKernel; use crate::array::ArrayId; use crate::array::VTable; use crate::arrays::Chunked; +use crate::arrays::Constant; use crate::arrays::Dict; use crate::arrays::chunked::compute::aggregate::ChunkedArrayAggregate; +use crate::arrays::constant::compute::uncompressed_size::ConstantUncompressedSizeKernel; use crate::arrays::dict::compute::is_constant::DictIsConstantKernel; use crate::arrays::dict::compute::is_sorted::DictIsSortedKernel; use crate::arrays::dict::compute::min_max::DictMinMaxKernel; @@ -77,6 +79,11 @@ impl Default for AggregateFnSession { // Register the built-in aggregate kernels. this.register_aggregate_kernel(Chunked.id(), None::, &ChunkedArrayAggregate); + this.register_aggregate_kernel( + Constant.id(), + Some(UncompressedSizeInBytes.id()), + &ConstantUncompressedSizeKernel, + ); this.register_aggregate_kernel(Dict.id(), Some(MinMax.id()), &DictMinMaxKernel); this.register_aggregate_kernel(Dict.id(), Some(IsConstant.id()), &DictIsConstantKernel); this.register_aggregate_kernel(Dict.id(), Some(IsSorted.id()), &DictIsSortedKernel); diff --git a/vortex-array/src/arrays/constant/compute/mod.rs b/vortex-array/src/arrays/constant/compute/mod.rs index eec35ac0cbf..151ca705314 100644 --- a/vortex-array/src/arrays/constant/compute/mod.rs +++ b/vortex-array/src/arrays/constant/compute/mod.rs @@ -9,6 +9,7 @@ mod not; pub(crate) mod rules; mod slice; mod take; +pub(crate) mod uncompressed_size; #[cfg(test)] mod test { diff --git a/vortex-array/src/arrays/constant/compute/uncompressed_size.rs b/vortex-array/src/arrays/constant/compute/uncompressed_size.rs new file mode 100644 index 00000000000..53c7a366357 --- /dev/null +++ b/vortex-array/src/arrays/constant/compute/uncompressed_size.rs @@ -0,0 +1,128 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_error::VortexResult; + +use crate::ArrayRef; +use crate::ExecutionCtx; +use crate::aggregate_fn::AggregateFnRef; +use crate::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; +use crate::aggregate_fn::fns::uncompressed_size_in_bytes::constant_uncompressed_size_in_bytes; +use crate::aggregate_fn::kernels::DynAggregateKernel; +use crate::arrays::Constant; +use crate::dtype::Nullability; +use crate::scalar::Scalar; + +#[derive(Debug)] +pub(crate) struct ConstantUncompressedSizeKernel; + +impl DynAggregateKernel for ConstantUncompressedSizeKernel { + fn aggregate( + &self, + aggregate_fn: &AggregateFnRef, + batch: &ArrayRef, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + if !aggregate_fn.is::() { + return Ok(None); + } + + let Some(array) = batch.as_opt::() else { + return Ok(None); + }; + + let size = constant_uncompressed_size_in_bytes(array, ctx)?; + + Ok(Some(Scalar::primitive(size, Nullability::NonNullable))) + } +} + +#[cfg(test)] +mod tests { + use vortex_buffer::Buffer; + use vortex_error::VortexResult; + use vortex_error::vortex_err; + + use crate::ArrayRef; + use crate::Canonical; + use crate::IntoArray; + use crate::LEGACY_SESSION; + use crate::VortexSessionExecute; + use crate::aggregate_fn::Accumulator; + use crate::aggregate_fn::DynAccumulator; + use crate::aggregate_fn::EmptyOptions; + use crate::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; + use crate::aggregate_fn::fns::uncompressed_size_in_bytes::canonical_uncompressed_size_in_bytes; + use crate::arrays::BoolArray; + use crate::arrays::ConstantArray; + use crate::arrays::PrimitiveArray; + use crate::dtype::DType; + use crate::dtype::Nullability; + use crate::dtype::PType; + use crate::scalar::Scalar; + use crate::validity::Validity; + + fn aggregate(array: &ArrayRef) -> VortexResult { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut acc = + Accumulator::try_new(UncompressedSizeInBytes, EmptyOptions, array.dtype().clone())?; + acc.accumulate(array, &mut ctx)?; + acc.finish()? + .as_primitive() + .typed_value::() + .ok_or_else(|| vortex_err!("uncompressed size result should not be null")) + } + + #[test] + fn constant_primitive_matches_primitive_array() -> VortexResult<()> { + let constant = ConstantArray::new(5i32, 10).into_array(); + let primitive = + PrimitiveArray::new(Buffer::full(5i32, 10), Validity::NonNullable).into_array(); + + assert_eq!(aggregate(&constant)?, aggregate(&primitive)?); + Ok(()) + } + + #[test] + fn nullable_constant_primitive_matches_nullable_primitive_array() -> VortexResult<()> { + let constant = + ConstantArray::new(Scalar::primitive(5i32, Nullability::Nullable), 10).into_array(); + let primitive = + PrimitiveArray::new(Buffer::full(5i32, 10), Validity::AllValid).into_array(); + + assert_eq!(aggregate(&constant)?, aggregate(&primitive)?); + Ok(()) + } + + #[test] + fn null_constant_primitive_matches_all_invalid_primitive_array() -> VortexResult<()> { + let dtype = DType::Primitive(PType::I32, Nullability::Nullable); + let constant = ConstantArray::new(Scalar::null(dtype), 10).into_array(); + let primitive = + PrimitiveArray::new(Buffer::::zeroed(10), Validity::AllInvalid).into_array(); + + assert_eq!(aggregate(&constant)?, aggregate(&primitive)?); + Ok(()) + } + + #[test] + fn constant_bool_uses_packed_canonical_size() -> VortexResult<()> { + let constant = ConstantArray::new(true, 10).into_array(); + let bool_array = BoolArray::from_iter([true; 10]).into_array(); + + assert_eq!(aggregate(&constant)?, aggregate(&bool_array)?); + assert_eq!(aggregate(&constant)?, 2); + Ok(()) + } + + #[test] + fn constant_utf8_matches_canonical_size() -> VortexResult<()> { + let constant = ConstantArray::new("abcdefghijkl".to_string(), 10).into_array(); + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let canonical = constant.clone().execute::(&mut ctx)?; + let expected = canonical_uncompressed_size_in_bytes(&canonical, &mut ctx)?; + + assert_eq!(aggregate(&constant)?, expected); + Ok(()) + } +} From b7fb3d0862ba3ef4ed7edb13617cb03cfbf79861 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 7 May 2026 16:50:49 +0100 Subject: [PATCH 5/6] Bring back ListView rebuild for stats Signed-off-by: Adam Gutglick --- .../aggregate_fn/fns/uncompressed_size_in_bytes/list_view.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/list_view.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/list_view.rs index 39f73d9fbc0..919430195d9 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/list_view.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/list_view.rs @@ -9,6 +9,7 @@ use super::validity_uncompressed_size_in_bytes; use crate::ExecutionCtx; use crate::arrays::ListViewArray; use crate::arrays::listview::ListViewArrayExt; +use crate::arrays::listview::ListViewRebuildMode; pub(super) fn list_view_uncompressed_size_in_bytes( array: &ListViewArray, @@ -17,7 +18,8 @@ pub(super) fn list_view_uncompressed_size_in_bytes( let mut size = if array.is_empty() { 0 } else { - uncompressed_size_in_bytes_u64(array.elements(), ctx)? + let rebuilt = array.rebuild(ListViewRebuildMode::MakeExact)?; + uncompressed_size_in_bytes_u64(rebuilt.elements(), ctx)? }; let view_buffer_size = u64::try_from(array.len()) From 21d292bcda3f37e204f9b85bf562168d3d679349 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 7 May 2026 22:49:02 +0100 Subject: [PATCH 6/6] Define fn and fix some other stuff Signed-off-by: Adam Gutglick --- .../fns/uncompressed_size_in_bytes/mod.rs | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs index aa166c6c478..848ed8c0850 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs @@ -42,8 +42,6 @@ use crate::array::ArrayView; use crate::arrays::Constant; use crate::arrays::ConstantArray; use crate::arrays::varbinview::BinaryView; -#[cfg(test)] -use crate::builders::builder_with_capacity; use crate::dtype::DType; use crate::dtype::DecimalType; use crate::dtype::Nullability::NonNullable; @@ -90,7 +88,7 @@ fn uncompressed_size_in_bytes_u64(array: &ArrayRef, ctx: &mut ExecutionCtx) -> V Ok(size) } -/// Sum the canonical, recursively **uncompressed** data size for an array. +/// The byte size of all buffers in children in their canonical representation. /// /// Applies to all types and returns a non-null `u64`. Encoding kernels can return this aggregate /// directly from metadata to avoid decoding arrays whose uncompressed size is known. @@ -308,15 +306,6 @@ fn supports_uncompressed_size_in_bytes(dtype: &DType) -> bool { } } -#[cfg(test)] -fn materialized_uncompressed_size_in_bytes(array: &ArrayRef) -> u64 { - let mut builder = builder_with_capacity(array.dtype(), array.len()); - unsafe { - builder.extend_from_array_unchecked(array); - } - builder.finish().nbytes() -} - pub(crate) fn validity_uncompressed_size_in_bytes(validity: Mask) -> VortexResult { match validity { Mask::AllTrue(_) => Ok(0), @@ -345,7 +334,6 @@ mod tests { use crate::aggregate_fn::DynAccumulator; use crate::aggregate_fn::EmptyOptions; use crate::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; - use crate::aggregate_fn::fns::uncompressed_size_in_bytes::materialized_uncompressed_size_in_bytes; use crate::aggregate_fn::fns::uncompressed_size_in_bytes::uncompressed_size_in_bytes; use crate::arrays::BoolArray; use crate::arrays::ChunkedArray; @@ -359,6 +347,7 @@ mod tests { use crate::arrays::StructArray; use crate::arrays::VarBinViewArray; use crate::arrays::VariantArray; + use crate::builders::builder_with_capacity; use crate::dtype::DType; use crate::dtype::DecimalDType; use crate::dtype::FieldNames; @@ -373,6 +362,14 @@ mod tests { use crate::scalar::ScalarValue; use crate::validity::Validity; + fn materialized_uncompressed_size_in_bytes(array: &ArrayRef) -> u64 { + let mut builder = builder_with_capacity(array.dtype(), array.len()); + unsafe { + builder.extend_from_array_unchecked(array); + } + builder.finish().nbytes() + } + fn aggregate(array: &ArrayRef) -> VortexResult { let mut ctx = LEGACY_SESSION.create_execution_ctx(); let mut acc =