Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions vortex-array/src/array_future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::future::Future;
use std::ops::Range;
use std::sync::Arc;

use futures::FutureExt;
use futures::TryFutureExt;
use futures::future::BoxFuture;
use futures::future::Shared;
use vortex_error::SharedVortexResult;
use vortex_error::VortexError;
use vortex_error::VortexResult;
use vortex_error::vortex_panic;

use crate::ArrayRef;

/// A future that resolves to an array with a known length.
#[derive(Clone)]
pub struct ArrayFuture {
inner: Shared<BoxFuture<'static, SharedVortexResult<ArrayRef>>>,
len: usize,
}

impl ArrayFuture {
/// Create a new `ArrayFuture` from a future that returns an array.
pub fn new<F>(len: usize, fut: F) -> Self
where
F: Future<Output = VortexResult<ArrayRef>> + Send + 'static,
{
Self {
inner: fut
.inspect(move |r| {
if let Ok(array) = r
&& array.len() != len {
vortex_panic!("ArrayFuture created with future that returned array of incorrect length (expected {}, got {})", len, array.len());
}
})
.map_err(Arc::new)
.boxed()
.shared(),
len,
}
}

/// Create an `ArrayFuture` from an already-resolved array.
pub fn ready(array: ArrayRef) -> Self {
let len = array.len();
Self::new(len, async move { Ok(array) })
}

/// Returns the length of the array.
pub fn len(&self) -> usize {
self.len
}

/// Returns true if the array is empty.
pub fn is_empty(&self) -> bool {
self.len == 0
}

/// Create an `ArrayFuture` that resolves to a slice of the original array.
pub fn slice(&self, range: Range<usize>) -> Self {
let inner = self.inner.clone();
let parent_len = self.len;
Self::new(range.len(), async move {
let array = inner.await?;
debug_assert!(range.end <= parent_len, "slice range out of bounds");
let _ = parent_len;
array.slice(range)
})
}
}

impl Future for ArrayFuture {
type Output = VortexResult<ArrayRef>;

fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
self.inner.poll_unpin(cx).map_err(VortexError::from)
}
}
3 changes: 3 additions & 0 deletions vortex-array/src/expr/exprs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub(crate) mod operators;
pub(crate) mod pack;
pub(crate) mod root;
pub(crate) mod select;
pub(crate) mod stats;

pub use between::*;
pub use binary::*;
pub use cast::*;
Expand All @@ -33,3 +35,4 @@ pub use operators::*;
pub use pack::*;
pub use root::*;
pub use select::*;
pub use stats::*;
96 changes: 96 additions & 0 deletions vortex-array/src/expr/exprs/stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use vortex_dtype::DType;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_err;
use vortex_scalar::Scalar;

use crate::Array;
use crate::ArrayRef;
use crate::IntoArray;
use crate::arrays::ConstantArray;
use crate::expr::Arity;
use crate::expr::ChildName;
use crate::expr::ExecutionArgs;
use crate::expr::ExprId;
use crate::expr::Expression;
use crate::expr::SimplifyCtx;
use crate::expr::VTable;
use crate::expr::VTableExt;
use crate::expr::stats::Precision;
use crate::expr::stats::Stat;
use crate::expr::stats::StatsProvider;

/// Creates a new expression that returns a minimum bound of its input.
pub fn statistic(stat: Stat, child: Expression) -> Expression {
Statistic.new_expr(stat, vec![child])
}

pub struct Statistic;

impl VTable for Statistic {
type Options = Stat;

fn id(&self) -> ExprId {
ExprId::from("statistic")
}

fn arity(&self, _options: &Self::Options) -> Arity {
Arity::Exact(1)
}

fn child_name(&self, _options: &Self::Options, _child_idx: usize) -> ChildName {
ChildName::from("input")
}

fn return_dtype(&self, stat: &Stat, arg_dtypes: &[DType]) -> VortexResult<DType> {
stat.dtype(&arg_dtypes[0])
.ok_or_else(|| {
vortex_err!(
"statistic {:?} not supported for dtype {:?}",
stat,
arg_dtypes[0]
)
})
// We make all statistics types nullable in case there is no reduction rule to handle
// the statistic expression.
.map(|dt| dt.as_nullable())
}

fn execute(&self, stat: &Stat, args: ExecutionArgs) -> VortexResult<ArrayRef> {
// FIXME(ngates): we should implement this as a reduction rule instead?
let Some(stat_dtype) = stat.dtype(args.inputs[0].dtype()) else {
vortex_bail!(
"Statistic {:?} not supported for dtype {:?}",
stat,
args.inputs[0].dtype()
);
};

Ok(match args.inputs[0].statistics().get(*stat) {
// TODO(ngates): do we care about precision here? Possibly we should configure in the
// options of the expression.
Some(Precision::Exact(v)) => {
// We have an exact value for the statistic; so we return a constant array
// with that value.
ConstantArray::new(v, args.row_count).into_array()
}
None | Some(Precision::Inexact(_)) => {
ConstantArray::new(Scalar::null(stat_dtype), args.row_count).into_array()
}
})
}

fn simplify(
&self,
_options: &Self::Options,
_expr: &Expression,
_ctx: &dyn SimplifyCtx,
) -> VortexResult<Option<Expression>> {
// FIXME(ngates): we really want to implement a reduction rule for all arrays? But it's an array.
// And it's a reduction rule. How do we do this without reduce_parent on everything..?
Ok(None)
}
}
2 changes: 1 addition & 1 deletion vortex-array/src/expr/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl Stat {
})
}

pub fn name(&self) -> &str {
pub const fn name(&self) -> &'static str {
match self {
Self::IsConstant => "is_constant",
Self::IsSorted => "is_sorted",
Expand Down
23 changes: 22 additions & 1 deletion vortex-array/src/expr/vtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,20 @@ pub trait VTable: 'static + Sized + Send + Sync {
options: &Self::Options,
expr: &Expression,
f: &mut Formatter<'_>,
) -> fmt::Result;
) -> fmt::Result {
write!(f, "{}(", expr.id())?;
for (i, child) in expr.children().iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
child.fmt_sql(f)?;
}
let options = format!("{}", options);
if !options.is_empty() {
write!(f, ", options={}", options)?;
}
write!(f, ")")
}

/// Compute the return [`DType`] of the expression if evaluated over the given input types.
fn return_dtype(&self, options: &Self::Options, arg_dtypes: &[DType]) -> VortexResult<DType>;
Expand Down Expand Up @@ -136,6 +149,14 @@ pub trait VTable: 'static + Sized + Send + Sync {
Ok(None)
}

/// Falsify the expression, returning a new expression that is true whenever the original
/// expression is guaranteed to be false via stats.
fn falsify(&self, options: &Self::Options, expr: &Expression) -> Option<Expression> {
_ = options;
_ = expr;
None
}

/// See [`Expression::stat_falsification`].
fn stat_falsification(
&self,
Expand Down
2 changes: 2 additions & 0 deletions vortex-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use std::sync::LazyLock;

pub use array::*;
pub use array_future::*;
pub use canonical::*;
pub use columnar::*;
pub use context::*;
Expand All @@ -31,6 +32,7 @@ pub mod accessor;
#[doc(hidden)]
pub mod aliases;
mod array;
mod array_future;
pub mod arrays;
pub mod arrow;
pub mod buffer;
Expand Down
53 changes: 35 additions & 18 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ use vortex::array::ArrayRef;
use vortex::array::VortexSessionExecute;
use vortex::array::arrow::ArrowArrayExecutor;
use vortex::error::VortexError;
use vortex::error::VortexExpect;
use vortex::file::OpenOptionsSessionExt;
use vortex::io::InstrumentedReadAt;
use vortex::io::session::RuntimeSessionExt;
use vortex::layout::LayoutReader;
use vortex::metrics::VortexMetrics;
use vortex::scan::ScanBuilder;
use vortex::scan::v2::scan::ScanBuilder2;
use vortex::session::VortexSession;
use vortex_utils::aliases::dash_map::DashMap;
use vortex_utils::aliases::dash_map::Entry;
Expand Down Expand Up @@ -261,21 +264,28 @@ impl FileOpener for VortexOpener {
}
};

let mut scan_builder = ScanBuilder::new(session.clone(), layout_reader);
// let mut scan_builder = ScanBuilder::new(session.clone(), layout_reader);
let mut scan_builder = ScanBuilder2::new(
vxf.footer()
.layout()
.new_reader2(&vxf.segment_source(), &session)
.vortex_expect("Failed to create footer"),
session.clone(),
);

if let Some(extensions) = file.extensions
&& let Some(vortex_plan) = extensions.downcast_ref::<VortexAccessPlan>()
{
scan_builder = vortex_plan.apply_to_builder(scan_builder);
}
// if let Some(extensions) = file.extensions
// && let Some(vortex_plan) = extensions.downcast_ref::<VortexAccessPlan>()
// {
// scan_builder = vortex_plan.apply_to_builder(scan_builder);
// }

if let Some(file_range) = file.range {
scan_builder = apply_byte_range(
file_range,
file.object_meta.size,
vxf.row_count(),
scan_builder,
);
// scan_builder = apply_byte_range(
// file_range,
// file.object_meta.size,
// vxf.row_count(),
// scan_builder,
// );
}

let filter = filter
Expand Down Expand Up @@ -317,17 +327,24 @@ impl FileOpener for VortexOpener {
scan_builder = scan_builder.with_limit(limit);
}

let handle = session.handle().clone();
let stream = scan_builder
.with_metrics(metrics)
// .with_metrics(metrics)
.with_projection(scan_projection)
.with_some_filter(filter)
.with_ordered(has_output_ordering)
.map(move |chunk| {
// .with_ordered(has_output_ordering)
.into_array_stream()
.vortex_expect("Failed to execute Vortex scan")
.then(move |chunk| {
let mut ctx = session.create_execution_ctx();
chunk.execute_record_batch(&stream_schema, &mut ctx)
let stream_schema = stream_schema.clone();
handle.spawn_cpu(move || {
chunk
.vortex_expect("failed")
.execute_record_batch(&stream_schema, &mut ctx)
})
})
.into_stream()
.map_err(|e| exec_datafusion_err!("Failed to create Vortex stream: {e}"))?
// .map_err(|e| exec_datafusion_err!("Failed to create Vortex stream: {e}"))?
.map_ok(move |rb| {
// We try and slice the stream into respecting datafusion's configured batch size.
stream::iter(
Expand Down
9 changes: 9 additions & 0 deletions vortex-file/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use vortex_layout::LayoutReader;
use vortex_layout::segments::SegmentSource;
use vortex_scan::ScanBuilder;
use vortex_scan::SplitBy;
use vortex_scan::v2::scan::ScanBuilder2;
use vortex_session::VortexSession;
use vortex_utils::aliases::hash_map::HashMap;

Expand Down Expand Up @@ -95,6 +96,14 @@ impl VortexFile {
))
}

pub fn scan2(&self) -> VortexResult<ScanBuilder2> {
let reader_ref = self
.footer
.layout()
.new_reader2(&self.segment_source, &self.session)?;
Ok(ScanBuilder2::new(reader_ref, self.session.clone()))
}

/// Returns true if the expression will never match any rows in the file.
pub fn can_prune(&self, filter: &Expression) -> VortexResult<bool> {
let Some((stats, fields)) = self
Expand Down
Loading