Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion fuzz/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ use vortex_array::scalar_fn::fns::operators::Operator;
use vortex_array::search_sorted::SearchResult;
use vortex_array::search_sorted::SearchSorted;
use vortex_array::search_sorted::SearchSortedSide;
use vortex_btrblocks::{BtrBlocksCompressor, BtrBlocksCompressorBuilder, FloatCode, IntCode, StringCode};
use vortex_btrblocks::BtrBlocksCompressor;
use vortex_btrblocks::BtrBlocksCompressorBuilder;
use vortex_btrblocks::FloatCode;
use vortex_btrblocks::IntCode;
use vortex_btrblocks::StringCode;
use vortex_error::VortexExpect;
use vortex_error::vortex_panic;
use vortex_mask::Mask;
Expand Down
380 changes: 380 additions & 0 deletions vortex-array/public-api.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion vortex-array/src/arrays/reversed/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use vortex_error::VortexExpect as _;
use vortex_error::VortexResult;
use vortex_error::vortex_ensure;

use crate::ArrayRef;
use crate::DynArray;
use crate::dtype::DType;
use crate::stats::ArrayStats;
use crate::{ArrayRef, DynArray};

#[derive(Clone, Debug)]
pub struct ReversedArray {
Expand Down
6 changes: 6 additions & 0 deletions vortex-datafusion/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ pub fn vortex_datafusion::VortexSource::new(table_schema: datafusion_datasource:

pub fn vortex_datafusion::VortexSource::options(&self) -> &vortex_datafusion::VortexTableOptions

pub fn vortex_datafusion::VortexSource::reversed(&self) -> bool

pub fn vortex_datafusion::VortexSource::with_expression_convertor(self, expr_convertor: alloc::sync::Arc<dyn vortex_datafusion::ExpressionConvertor>) -> Self

pub fn vortex_datafusion::VortexSource::with_file_metadata_cache(self, file_metadata_cache: alloc::sync::Arc<dyn datafusion_execution::cache::cache_manager::FileMetadataCache>) -> Self
Expand All @@ -198,6 +200,8 @@ pub fn vortex_datafusion::VortexSource::with_options(self, opts: vortex_datafusi

pub fn vortex_datafusion::VortexSource::with_projection_pushdown(self, enabled: bool) -> Self

pub fn vortex_datafusion::VortexSource::with_reversed(self, reversed: bool) -> Self

pub fn vortex_datafusion::VortexSource::with_scan_concurrency(self, scan_concurrency: usize) -> Self

pub fn vortex_datafusion::VortexSource::with_segment_cache_builder(self, builder: alloc::sync::Arc<dyn vortex_layout::segments::cache::SegmentCacheBuilder>) -> Self
Expand Down Expand Up @@ -232,6 +236,8 @@ pub fn vortex_datafusion::VortexSource::try_pushdown_filters(&self, filters: all

pub fn vortex_datafusion::VortexSource::try_pushdown_projection(&self, projection: &datafusion_physical_expr::projection::ProjectionExprs) -> datafusion_common::error::Result<core::option::Option<alloc::sync::Arc<dyn datafusion_datasource::file::FileSource>>>

pub fn vortex_datafusion::VortexSource::try_reverse_output(&self, order: &[datafusion_physical_expr_common::sort_expr::PhysicalSortExpr], eq_properties: &datafusion_physical_expr::equivalence::properties::EquivalenceProperties) -> datafusion_common::error::Result<datafusion_physical_plan::sort_pushdown::SortOrderPushdownResult<alloc::sync::Arc<dyn datafusion_datasource::file::FileSource>>>

pub fn vortex_datafusion::VortexSource::with_batch_size(&self, batch_size: usize) -> alloc::sync::Arc<dyn datafusion_datasource::file::FileSource>

pub struct vortex_datafusion::VortexTableOptions
Expand Down
266 changes: 266 additions & 0 deletions vortex-datafusion/src/persistent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,272 @@ mod tests {
Ok(())
}

/// Helper: render a physical plan for the given SQL and assert that it (does not) contain
/// a SortExec — i.e. that the source has absorbed the requested ordering.
async fn plan_tree(ctx: &TestSessionContext, sql: &str) -> anyhow::Result<String> {
let df = ctx.session.sql(sql).await?;
let physical_plan = ctx
.session
.state()
.create_physical_plan(df.logical_plan())
.await?;
Ok(DisplayableExecutionPlan::new(physical_plan.as_ref())
.tree_render()
.to_string())
}

#[tokio::test]
async fn reverse_pushdown_swaps_file_order() -> anyhow::Result<()> {
let ctx = TestSessionContext::default();

// Each insert produces a separate file. ts is monotonically increasing across files,
// so DataFusion's planner accepts `ts ASC` as the table's output ordering.
ctx.session
.sql(
"CREATE EXTERNAL TABLE ts_tbl \
(ts INT NOT NULL) \
STORED AS vortex \
WITH ORDER (ts ASC) \
LOCATION '/ts/'",
)
.await?;

let data = [vec![1, 2, 3], vec![10, 11, 12], vec![100, 101, 102]];

for chunk in data {
let values = chunk
.iter()
.map(|v| v.to_string())
.collect::<Vec<_>>()
.join("),(");
ctx.session
.sql(&format!("INSERT INTO ts_tbl VALUES ({values})"))
.await?
.collect()
.await?;
}

// We return Inexact, so DataFusion keeps a SortExec on top, but the source itself
// is reversed (advertised via fmt_extra). The TopK below the SortPreservingMergeExec
// also confirms LIMIT pushed down through the new plan.
let tree = plan_tree(&ctx, "SELECT ts FROM ts_tbl ORDER BY ts DESC LIMIT 2").await?;
assert!(
tree.contains("reversed: true"),
"expected the source to be reversed, plan was:\n{tree}",
);
assert!(
tree.contains("SortExec(TopK)"),
"expected a TopK sort after reverse pushdown, plan was:\n{tree}",
);

let rows = ctx
.session
.sql("SELECT ts FROM ts_tbl ORDER BY ts DESC LIMIT 2")
.await?
.collect()
.await?;
let formatted = pretty_format_batches(&rows)?.to_string();
assert_snapshot!(formatted, @r"
+-----+
| ts |
+-----+
| 102 |
| 101 |
+-----+
");

Ok(())
}

#[tokio::test]
async fn reverse_pushdown_rejected_for_unrelated_ordering() -> anyhow::Result<()> {
let ctx = TestSessionContext::default();

ctx.session
.sql(
"CREATE EXTERNAL TABLE ab_tbl \
(a INT NOT NULL, b INT NOT NULL) \
STORED AS vortex \
WITH ORDER (a ASC) \
LOCATION '/ab/'",
)
.await?;

ctx.session
.sql("INSERT INTO ab_tbl VALUES (1, 10), (2, 20)")
.await?
.collect()
.await?;

// ORDER BY b DESC is unrelated to the table's declared ordering on `a`. The source
// must refuse to push down, leaving a SortExec in the plan and `reversed: true`
// absent from the source.
let tree = plan_tree(&ctx, "SELECT a, b FROM ab_tbl ORDER BY b DESC LIMIT 1").await?;
assert!(
tree.contains("Sort"),
"expected a Sort node when the requested ordering is unrelated, plan was:\n{tree}",
);
assert!(
!tree.contains("reversed: true"),
"expected the source to not be reversed, plan was:\n{tree}",
);

Ok(())
}

#[tokio::test]
async fn reverse_pushdown_no_ordering_declared() -> anyhow::Result<()> {
let ctx = TestSessionContext::default();

ctx.session
.sql(
"CREATE EXTERNAL TABLE noord_tbl \
(ts INT NOT NULL) \
STORED AS vortex \
LOCATION '/noord/'",
)
.await?;

ctx.session
.sql("INSERT INTO noord_tbl VALUES (3), (1), (2)")
.await?
.collect()
.await?;

let tree = plan_tree(&ctx, "SELECT ts FROM noord_tbl ORDER BY ts DESC LIMIT 1").await?;
assert!(
tree.contains("Sort"),
"expected a Sort node when no WITH ORDER is declared, plan was:\n{tree}",
);
assert!(
!tree.contains("reversed: true"),
"expected the source to not be reversed, plan was:\n{tree}",
);

Ok(())
}

#[tokio::test]
async fn reverse_pushdown_then_filter() -> anyhow::Result<()> {
let ctx = TestSessionContext::default();

ctx.session
.sql(
"CREATE EXTERNAL TABLE filt_tbl \
(ts INT NOT NULL) \
STORED AS vortex \
WITH ORDER (ts ASC) \
LOCATION '/filt/'",
)
.await?;

for chunk in [vec![1, 2, 3], vec![10, 11, 12], vec![100, 101, 102]] {
let values = chunk
.iter()
.map(|v| v.to_string())
.collect::<Vec<_>>()
.join("),(");
ctx.session
.sql(&format!("INSERT INTO filt_tbl VALUES ({values})"))
.await?
.collect()
.await?;
}

// Filter pushdown and reverse sort pushdown should coexist.
let rows = ctx
.session
.sql("SELECT ts FROM filt_tbl WHERE ts < 50 ORDER BY ts DESC LIMIT 2")
.await?
.collect()
.await?;

let formatted = pretty_format_batches(&rows)?.to_string();
assert_snapshot!(formatted, @r"
+----+
| ts |
+----+
| 12 |
| 11 |
+----+
");

Ok(())
}

#[tokio::test]
async fn order_by_desc_limit_returns_largest_rows() -> anyhow::Result<()> {
// Synergy test: combines Part 1 (limit pushdown with filter) and Part 2 (reverse
// sort pushdown). With files holding non-overlapping ts ranges, `ORDER BY ts DESC
// LIMIT k` should produce the largest k ts values, reading them from the last file
// (after reverse pushdown swaps file order, the last file becomes the first scanned).
let ctx = TestSessionContext::default();

ctx.session
.sql(
"CREATE EXTERNAL TABLE big_tbl \
(ts INT NOT NULL, payload INT NOT NULL) \
STORED AS vortex \
WITH ORDER (ts ASC) \
LOCATION '/big/'",
)
.await?;

// Five files, each with 100 rows. ts values across all files run 0..500
// monotonically increasing, with the largest (400..500) in the last file.
for file_idx in 0..5 {
let base = file_idx * 100;
let values = (0..100)
.map(|i| format!("({}, {})", base + i, (base + i) * 2))
.collect::<Vec<_>>()
.join(", ");
ctx.session
.sql(&format!("INSERT INTO big_tbl VALUES {values}"))
.await?
.collect()
.await?;
}

// Filter rules out ts < 50, then ORDER BY ts DESC LIMIT 10 should yield ts in
// [490..500], coming entirely from the last file.
let rows = ctx
.session
.sql("SELECT ts FROM big_tbl WHERE ts >= 50 ORDER BY ts DESC LIMIT 10")
.await?
.collect()
.await?;
let formatted = pretty_format_batches(&rows)?.to_string();
assert_snapshot!(formatted, @r"
+-----+
| ts |
+-----+
| 499 |
| 498 |
| 497 |
| 496 |
| 495 |
| 494 |
| 493 |
| 492 |
| 491 |
| 490 |
+-----+
");

// Confirm the source was marked reversed.
let tree = plan_tree(
&ctx,
"SELECT ts FROM big_tbl WHERE ts >= 50 ORDER BY ts DESC LIMIT 10",
)
.await?;
assert!(
tree.contains("reversed: true"),
"expected reverse pushdown to engage end-to-end, plan was:\n{tree}",
);

Ok(())
}

/// Doc example: demonstrates creating, writing, reading, and filtering a Vortex table.
#[tokio::test]
async fn doc_example() -> anyhow::Result<()> {
Expand Down
11 changes: 4 additions & 7 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ use vortex::error::VortexError;
use vortex::file::OpenOptionsSessionExt;
use vortex::io::InstrumentedReadAt;
use vortex::layout::LayoutReader;
use vortex::layout::segments::FileIdentity;
use vortex::layout::segments::FileVersion;
use vortex::layout::segments::SegmentCacheBuilder;
use vortex::metrics::Label;
use vortex::metrics::MetricsRegistry;
use vortex::scan::ScanBuilder;
Expand All @@ -54,10 +57,6 @@ use crate::convert::exprs::make_vortex_predicate;
use crate::convert::schema::calculate_physical_schema;
use crate::metrics::PARTITION_LABEL;
use crate::metrics::PATH_LABEL;
use vortex::layout::segments::FileIdentity;
use vortex::layout::segments::FileVersion;
use vortex::layout::segments::SegmentCacheBuilder;

use crate::persistent::cache::CachedVortexMetadata;
use crate::persistent::reader::VortexReaderFactory;
use crate::persistent::stream::PrunableStream;
Expand Down Expand Up @@ -360,9 +359,7 @@ impl FileOpener for VortexOpener {
})
.transpose()?;

if let Some(limit) = limit
&& filter.is_none()
{
if let Some(limit) = limit {
scan_builder = scan_builder.with_limit(limit);
}

Expand Down
Loading
Loading