Skip to content
144 changes: 124 additions & 20 deletions vortex-array/src/patches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ use crate::dtype::Nullability;
use crate::dtype::Nullability::NonNullable;
use crate::dtype::PType;
use crate::dtype::UnsignedPType;
use crate::expr::stats::Precision;
use crate::expr::stats::Stat;
use crate::match_each_integer_ptype;
use crate::match_each_unsigned_integer_ptype;
use crate::scalar::PValue;
Expand Down Expand Up @@ -175,16 +177,27 @@ impl Patches {
// Perform validation of components when they are host-resident.
// This is not possible to do eagerly when the data is on GPU memory.
if indices.is_host() && values.is_host() {
let max = usize::try_from(&indices.execute_scalar(
let last = indices.execute_scalar(
indices.len() - 1,
&mut LEGACY_SESSION.create_execution_ctx(),
)?)
.map_err(|_| vortex_err!("indices must be a number"))?;
)?;
let max =
usize::try_from(&last).map_err(|_| vortex_err!("indices must be a number"))?;
vortex_ensure!(
max - offset < array_len,
"Patch indices {max:?}, offset {offset} are longer than the array length {array_len}"
);

// Seed Min/Max stats on indices so search_index can short-circuit
// out-of-range lookups, and so pruning/predicate-pushdown consumers
// see populated bounds.
let first = indices.execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())?;
if let (Some(min_value), Some(max_value)) = (first.value(), last.value()) {
let stats = indices.statistics();
stats.set(Stat::Min, Precision::Exact(min_value.clone()));
stats.set(Stat::Max, Precision::Exact(max_value.clone()));
}

#[cfg(debug_assertions)]
{
use crate::VortexSessionExecute;
Expand Down Expand Up @@ -394,13 +407,39 @@ impl Patches {
/// [`SearchResult::Found(patch_idx)`]: SearchResult::Found
/// [`SearchResult::NotFound(insertion_point)`]: SearchResult::NotFound
pub fn search_index(&self, index: usize) -> VortexResult<SearchResult> {
if let Some(result) = self.search_index_out_of_range(index) {
return Ok(result);
}

if self.chunk_offsets.is_some() {
return self.search_index_chunked(index);
}

Self::search_index_binary_search(&self.indices, index + self.offset)
}

#[inline]
fn search_index_out_of_range(&self, index: usize) -> Option<SearchResult> {
let (min, max) = self.cached_bounds()?;
if index < min {
Some(SearchResult::NotFound(0))
} else if index > max {
Some(SearchResult::NotFound(self.indices.len()))
} else {
None
}
}

#[inline]
fn cached_bounds(&self) -> Option<(usize, usize)> {
let offset = self.offset;
self.indices.statistics().with_typed_stats_set(|typed| {
let raw_min = usize::try_from(typed.get_value(Stat::Min)?.as_exact()?).ok()?;
let raw_max = usize::try_from(typed.get_value(Stat::Max)?.as_exact()?).ok()?;
Some((raw_min.checked_sub(offset)?, raw_max.checked_sub(offset)?))
})
}

/// Binary searches for `needle` in the indices array.
///
/// # Returns
Expand Down Expand Up @@ -551,29 +590,34 @@ impl Patches {
})
}

/// Returns the minimum patch index
/// Returns the minimum patch index.
pub fn min_index(&self) -> VortexResult<usize> {
let first = self
if let Some((min, _)) = self.cached_bounds() {
return Ok(min);
}
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let raw: usize = self
.indices
.execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())?
.as_primitive()
.as_::<usize>()
.ok_or_else(|| vortex_err!("index does not fit in usize"))?;
Ok(first - self.offset)
.statistics()
.compute_min(&mut ctx)
.ok_or_else(|| vortex_err!("min index unavailable"))?;
raw.checked_sub(self.offset)
.ok_or_else(|| vortex_err!("offset {} exceeds min index {}", self.offset, raw))
}

/// Returns the maximum patch index
/// Returns the maximum patch index.
pub fn max_index(&self) -> VortexResult<usize> {
let last = self
if let Some((_, max)) = self.cached_bounds() {
return Ok(max);
}
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let raw: usize = self
.indices
.execute_scalar(
self.indices.len() - 1,
&mut LEGACY_SESSION.create_execution_ctx(),
)?
.as_primitive()
.as_::<usize>()
.ok_or_else(|| vortex_err!("index does not fit in usize"))?;
Ok(last - self.offset)
.statistics()
.compute_max(&mut ctx)
.ok_or_else(|| vortex_err!("max index unavailable"))?;
raw.checked_sub(self.offset)
.ok_or_else(|| vortex_err!("offset {} exceeds max index {}", self.offset, raw))
}

/// Filter the patches by a mask, resulting in new patches for the filtered array.
Expand Down Expand Up @@ -1750,6 +1794,66 @@ mod test {
assert_eq!(patches.search_index(9).unwrap(), SearchResult::NotFound(3));
}

#[test]
fn test_search_index_out_of_range_fast_path() {
let patches = Patches::new(
100,
0,
buffer![10u64, 20, 30, 40].into_array(),
buffer![1i32, 2, 3, 4].into_array(),
None,
)
.unwrap();

assert_eq!(
patches.search_index_out_of_range(0),
Some(SearchResult::NotFound(0))
);
assert_eq!(
patches.search_index_out_of_range(9),
Some(SearchResult::NotFound(0))
);
assert_eq!(
patches.search_index_out_of_range(41),
Some(SearchResult::NotFound(4))
);
assert_eq!(
patches.search_index_out_of_range(99),
Some(SearchResult::NotFound(4))
);
assert_eq!(patches.search_index_out_of_range(10), None);
assert_eq!(patches.search_index_out_of_range(25), None);
assert_eq!(patches.search_index_out_of_range(40), None);

assert_eq!(patches.search_index(5).unwrap(), SearchResult::NotFound(0));
assert_eq!(patches.search_index(50).unwrap(), SearchResult::NotFound(4));
}

#[test]
fn test_search_index_out_of_range_with_offset() {
let patches = Patches::new(
100,
0,
buffer![10u64, 50, 90].into_array(),
buffer![1i32, 2, 3].into_array(),
None,
)
.unwrap();
let sliced = patches.slice(40..95).unwrap().unwrap();

assert_eq!(sliced.min_index().unwrap(), 10);
assert_eq!(sliced.max_index().unwrap(), 50);
assert_eq!(
sliced.search_index_out_of_range(5),
Some(SearchResult::NotFound(0))
);
assert_eq!(
sliced.search_index_out_of_range(54),
Some(SearchResult::NotFound(2))
);
assert_eq!(sliced.search_index_out_of_range(30), None);
}

#[test]
fn test_mask_boundary_patches() {
// Test masking patches at array boundaries
Expand Down
77 changes: 45 additions & 32 deletions vortex-array/src/stats/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use std::sync::Arc;

use parking_lot::RwLock;
use arc_swap::ArcSwap;
use vortex_array::ExecutionCtx;
use vortex_error::VortexError;
use vortex_error::VortexResult;
Expand All @@ -32,9 +32,20 @@ use crate::scalar::ScalarValue;

/// A shared [`StatsSet`] stored in an array. Can be shared by copies of the array and can also be mutated in place.
// TODO(adamg): This is a very bad name.
#[derive(Clone, Default, Debug)]
#[derive(Clone, Debug)]
pub struct ArrayStats {
inner: Arc<RwLock<StatsSet>>,
// Lock-free reads via copy-on-write. Writes are last-writer-wins;
// concurrent writers may lose updates, which is acceptable for stats
// (they're hints and can be recomputed).
inner: Arc<ArcSwap<StatsSet>>,
}

impl Default for ArrayStats {
fn default() -> Self {
Self {
inner: Arc::new(ArcSwap::from_pointee(StatsSet::default())),
}
}
}

/// Reference to an array's [`StatsSet`]. Can be used to get and mutate the underlying stats.
Expand All @@ -55,42 +66,49 @@ impl ArrayStats {
}

pub fn set(&self, stat: Stat, value: Precision<ScalarValue>) {
self.inner.write().set(stat, value);
let mut new_stats = (**self.inner.load()).clone();
new_stats.set(stat, value);
self.inner.store(Arc::new(new_stats));
}

pub fn clear(&self, stat: Stat) {
self.inner.write().clear(stat);
let mut new_stats = (**self.inner.load()).clone();
new_stats.clear(stat);
self.inner.store(Arc::new(new_stats));
}

pub fn retain(&self, stats: &[Stat]) {
self.inner.write().retain_only(stats);
let mut new_stats = (**self.inner.load()).clone();
new_stats.retain_only(stats);
self.inner.store(Arc::new(new_stats));
}
}

impl From<StatsSet> for ArrayStats {
fn from(value: StatsSet) -> Self {
Self {
inner: Arc::new(RwLock::new(value)),
inner: Arc::new(ArcSwap::from_pointee(value)),
}
}
}

impl From<ArrayStats> for StatsSet {
fn from(value: ArrayStats) -> Self {
value.inner.read().clone()
(**value.inner.load()).clone()
}
}

impl StatsSetRef<'_> {
pub(crate) fn replace(&self, stats: StatsSet) {
*self.array_stats.inner.write() = stats;
self.array_stats.inner.store(Arc::new(stats));
}

pub fn set_iter(&self, iter: StatsSetIntoIter) {
let mut guard = self.array_stats.inner.write();
let mut new_stats = (**self.array_stats.inner.load()).clone();
for (stat, value) in iter {
guard.set(stat, value);
new_stats.set(stat, value);
}
self.array_stats.inner.store(Arc::new(new_stats));
}

pub fn inherit_from(&self, stats: StatsSetRef<'_>) {
Expand All @@ -101,38 +119,33 @@ impl StatsSetRef<'_> {
}

pub fn inherit<'a>(&self, iter: impl Iterator<Item = &'a (Stat, Precision<ScalarValue>)>) {
let mut guard = self.array_stats.inner.write();
let mut new_stats = (**self.array_stats.inner.load()).clone();
for (stat, value) in iter {
if !value.is_exact() {
if !guard.get(*stat).is_some_and(|v| v.is_exact()) {
guard.set(*stat, value.clone());
if !new_stats.get(*stat).is_some_and(|v| v.is_exact()) {
new_stats.set(*stat, value.clone());
}
} else {
guard.set(*stat, value.clone());
new_stats.set(*stat, value.clone());
}
}
self.array_stats.inner.store(Arc::new(new_stats));
}

pub fn with_typed_stats_set<U, F: FnOnce(TypedStatsSetRef) -> U>(&self, apply: F) -> U {
apply(
self.array_stats
.inner
.read()
.as_typed_ref(self.dyn_array_ref.dtype()),
)
let snapshot = self.array_stats.inner.load();
apply(snapshot.as_typed_ref(self.dyn_array_ref.dtype()))
}

pub fn with_mut_typed_stats_set<U, F: FnOnce(MutTypedStatsSetRef) -> U>(&self, apply: F) -> U {
apply(
self.array_stats
.inner
.write()
.as_mut_typed_ref(self.dyn_array_ref.dtype()),
)
let mut new_stats = (**self.array_stats.inner.load()).clone();
let result = apply(new_stats.as_mut_typed_ref(self.dyn_array_ref.dtype()));
self.array_stats.inner.store(Arc::new(new_stats));
result
}

pub fn to_owned(&self) -> StatsSet {
self.array_stats.inner.read().clone()
(**self.array_stats.inner.load()).clone()
}

/// Returns a clone of the underlying [`ArrayStats`].
Expand All @@ -149,8 +162,8 @@ impl StatsSetRef<'_> {
&self,
f: F,
) -> R {
let lock = self.array_stats.inner.read();
f(&mut lock.iter())
let snapshot = self.array_stats.inner.load();
f(&mut snapshot.iter())
}

pub fn compute_stat(&self, stat: Stat, ctx: &mut ExecutionCtx) -> VortexResult<Option<Scalar>> {
Expand Down Expand Up @@ -288,12 +301,12 @@ impl StatsProvider for StatsSetRef<'_> {
fn get(&self, stat: Stat) -> Option<Precision<Scalar>> {
self.array_stats
.inner
.read()
.load()
.as_typed_ref(self.dyn_array_ref.dtype())
.get(stat)
}

fn len(&self) -> usize {
self.array_stats.inner.read().len()
self.array_stats.inner.load().len()
}
}
15 changes: 15 additions & 0 deletions vortex-array/src/stats/stats_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ impl StatsSet {
.map(|(_, v)| v.clone())
}

/// Borrow the value for a given stat without cloning the underlying `ScalarValue`.
pub fn get_value(&self, stat: Stat) -> Option<&Precision<ScalarValue>> {
self.values
.iter()
.find(|(s, _)| *s == stat)
.map(|(_, v)| v)
}

/// Length of the stats set
pub fn len(&self) -> usize {
self.values.len()
Expand Down Expand Up @@ -225,6 +233,13 @@ pub struct TypedStatsSetRef<'a, 'b> {
pub dtype: &'b DType,
}

impl<'a, 'b> TypedStatsSetRef<'a, 'b> {
/// Borrow the value for a given stat without constructing a [`Scalar`].
pub fn get_value(&self, stat: Stat) -> Option<Precision<&'a ScalarValue>> {
self.values.get_value(stat).map(|p| p.as_ref())
}
}

impl StatsProvider for TypedStatsSetRef<'_, '_> {
fn get(&self, stat: Stat) -> Option<Precision<Scalar>> {
self.values.get(stat).map(|p| {
Expand Down
Loading