Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 241 additions & 9 deletions vortex-datafusion/src/persistent/source.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::cmp::Ordering;
use std::fmt::Formatter;
use std::ops::Range;
use std::sync::Arc;
use std::sync::Weak;

use datafusion_common::Result as DFResult;
use datafusion_common::ScalarValue;
use datafusion_common::config::ConfigOptions;
use datafusion_datasource::PartitionedFile;
use datafusion_datasource::TableSchema;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
Expand All @@ -17,9 +20,11 @@ use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr::PhysicalExprRef;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_expr::conjunction;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::DisplayFormatType;
use datafusion_physical_plan::PhysicalExpr;
use datafusion_physical_plan::SortOrderPushdownResult;
Expand Down Expand Up @@ -200,7 +205,7 @@ pub struct VortexSource {
natural_split_ranges: Arc<DashMap<Path, Arc<[Range<u64>]>>>,
expression_convertor: Arc<dyn ExpressionConvertor>,
pub(crate) vortex_reader_factory: Option<Arc<dyn VortexReaderFactory>>,
pub(crate) ordered: bool,
pub(crate) sort_order: Option<LexOrdering>,
vx_metrics_registry: Arc<dyn MetricsRegistry>,
file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
/// Options controlling scan planning and execution behavior.
Expand Down Expand Up @@ -235,7 +240,7 @@ impl VortexSource {
vortex_reader_factory: None,
vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
file_metadata_cache: None,
ordered: false,
sort_order: None,
options: VortexTableOptions::default(),
}
}
Expand Down Expand Up @@ -362,7 +367,7 @@ impl VortexSource {
metrics_registry: Arc::clone(&self.vx_metrics_registry),
layout_readers: Arc::clone(&self.layout_readers),
natural_split_ranges: Arc::clone(&self.natural_split_ranges),
has_output_ordering: !base_config.output_ordering.is_empty() || self.ordered,
has_output_ordering: !base_config.output_ordering.is_empty(),
expression_convertor: Arc::clone(&self.expression_convertor),
file_metadata_cache: self.file_metadata_cache.clone(),
projection_pushdown: self.options.projection_pushdown,
Expand Down Expand Up @@ -414,16 +419,50 @@ impl FileSource for VortexSource {
return Ok(SortOrderPushdownResult::Unsupported);
}

if eq_properties.ordering_satisfy(order.iter().cloned())? {
let mut this = self.clone();
this.ordered = true;
let Some(sort_order) = LexOrdering::new(order.iter().cloned()) else {
return Ok(SortOrderPushdownResult::Unsupported);
};

if sort_reorder_key(&sort_order, &self.table_schema).is_none() {
return Ok(SortOrderPushdownResult::Unsupported);
}

let mut this = self.clone();
this.sort_order = Some(sort_order);

if eq_properties.ordering_satisfy(order.iter().cloned())? {
return Ok(SortOrderPushdownResult::Exact {
inner: Arc::new(this) as Arc<dyn FileSource>,
});
}

Ok(SortOrderPushdownResult::Unsupported)
Ok(SortOrderPushdownResult::Inexact {
inner: Arc::new(this) as Arc<dyn FileSource>,
})
}

fn reorder_files(&self, mut files: Vec<PartitionedFile>) -> Vec<PartitionedFile> {
let Some((col_idx, descending)) = self
.sort_order
.as_ref()
.and_then(|sort_order| sort_reorder_key(sort_order, &self.table_schema))
else {
return files;
};

files.sort_unstable_by(|a, b| {
match (file_min_value(a, col_idx), file_min_value(b, col_idx)) {
(Some(va), Some(vb)) => {
let cmp = va.partial_cmp(vb).unwrap_or(Ordering::Equal);
if descending { cmp.reverse() } else { cmp }
}
(Some(_), None) => Ordering::Less,
(None, Some(_)) => Ordering::Greater,
(None, None) => Ordering::Equal,
}
});

files
}

fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
Expand Down Expand Up @@ -542,13 +581,35 @@ impl FileSource for VortexSource {
}
}

fn sort_reorder_key(sort_order: &LexOrdering, table_schema: &TableSchema) -> Option<(usize, bool)> {
let first = sort_order.first();
let col = first.expr.downcast_ref::<Column>()?;
let col_idx = table_schema.file_schema().index_of(col.name()).ok()?;
Some((col_idx, first.options.descending))
}

fn file_min_value(file: &PartitionedFile, col_idx: usize) -> Option<&ScalarValue> {
file.statistics
.as_ref()?
.column_statistics
.get(col_idx)?
.min_value
.get_value()
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use arrow_schema::DataType;
use arrow_schema::Field;
use arrow_schema::Schema;
use datafusion_common::ColumnStatistics;
use datafusion_common::ScalarValue;
use datafusion_common::Statistics;
use datafusion_common::config::ConfigOptions;
use datafusion_common::stats::Precision;
use datafusion_datasource::PartitionedFile;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::Operator;
Expand Down Expand Up @@ -601,6 +662,10 @@ mod tests {
PhysicalSortExpr::new_default(expr)
}

fn sort_column_desc(name: &str, index: usize) -> PhysicalSortExpr {
sort_column(name, index).desc()
}

fn sort_test_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Expand All @@ -615,6 +680,10 @@ mod tests {
)
}

fn sort_test_source_with_table_schema(table_schema: TableSchema) -> VortexSource {
VortexSource::new(table_schema, VortexSession::default())
}

fn octet_length_filter(schema: &Schema) -> PhysicalExprRef {
let name = Arc::new(Column::new("name", 0)) as PhysicalExprRef;
let octet_length = Arc::new(
Expand All @@ -636,10 +705,48 @@ mod tests {
.downcast_ref::<VortexSource>()
.ok_or_else(|| anyhow::anyhow!("expected VortexSource"))?;

assert!(source.ordered);
assert!(source.sort_order.is_some());
Ok(())
}

fn assert_inexact_source(
result: SortOrderPushdownResult<Arc<dyn FileSource>>,
) -> anyhow::Result<VortexSource> {
let SortOrderPushdownResult::Inexact { inner } = result else {
anyhow::bail!("expected inexact sort pushdown");
};

Ok(inner
.downcast_ref::<VortexSource>()
.ok_or_else(|| anyhow::anyhow!("expected VortexSource"))?
.clone())
}

fn file_with_min(path: &str, min: Option<i32>) -> PartitionedFile {
let column_stats = min.map_or_else(ColumnStatistics::new_unknown, |min| {
let value = ScalarValue::Int32(Some(min));
ColumnStatistics {
min_value: Precision::Exact(value.clone()),
max_value: Precision::Exact(value),
..ColumnStatistics::new_unknown()
}
});
let statistics = Statistics {
num_rows: Precision::Exact(1),
total_byte_size: Precision::Absent,
column_statistics: vec![column_stats, ColumnStatistics::new_unknown()],
};

PartitionedFile::new(path, 1).with_statistics(Arc::new(statistics))
}

fn file_names(files: &[PartitionedFile]) -> Vec<&str> {
files
.iter()
.map(|file| file.object_meta.location.as_ref())
.collect()
}

#[test]
fn try_pushdown_sort_returns_exact_when_ordering_is_satisfied() -> anyhow::Result<()> {
let schema = sort_test_schema();
Expand All @@ -655,7 +762,132 @@ mod tests {
anyhow::bail!("expected exact sort pushdown")
}
}
assert!(!source.ordered);
assert!(source.sort_order.is_none());
Ok(())
}

#[test]
fn try_pushdown_sort_returns_inexact_and_keeps_order() -> anyhow::Result<()> {
let schema = sort_test_schema();
let source = sort_test_source(Arc::clone(&schema));
let order = vec![sort_column("a", 0)];
let eq_properties = EquivalenceProperties::new(schema);

let updated_source =
assert_inexact_source(source.try_pushdown_sort(&order, &eq_properties)?)?;

let sort_order = updated_source
.sort_order
.as_ref()
.ok_or_else(|| anyhow::anyhow!("expected sort order for file reordering"))?;
assert_eq!(sort_order.first().expr.to_string(), "a@0");
Ok(())
}

#[test]
fn try_pushdown_sort_returns_unsupported_without_file_reorder_key() -> anyhow::Result<()> {
let schema = sort_test_schema();
let source = sort_test_source(Arc::clone(&schema));
let expr: PhysicalExprRef = Arc::new(df_expr::BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Plus,
Arc::new(df_expr::Literal::new(ScalarValue::Int32(Some(1)))),
));
let order = vec![PhysicalSortExpr::new_default(expr)];
let eq_properties = EquivalenceProperties::new(schema);

assert!(matches!(
source.try_pushdown_sort(&order, &eq_properties)?,
SortOrderPushdownResult::Unsupported
));
Ok(())
}

#[test]
fn try_pushdown_sort_returns_unsupported_for_partition_column() -> anyhow::Result<()> {
let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let partition_col = Arc::new(Field::new("part", DataType::Int32, false));
let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![partition_col]);
let source = sort_test_source_with_table_schema(table_schema);
let order = vec![sort_column("part", 1)];
let eq_properties = EquivalenceProperties::new(file_schema);

assert!(matches!(
source.try_pushdown_sort(&order, &eq_properties)?,
SortOrderPushdownResult::Unsupported
));
Ok(())
}

#[test]
fn sort_reorder_hint_does_not_force_ordered_vortex_scan() -> anyhow::Result<()> {
let schema = sort_test_schema();
let source = sort_test_source(Arc::clone(&schema));
let order = vec![sort_column("a", 0)];
let eq_properties = EquivalenceProperties::new(schema);
let mut updated_source =
assert_inexact_source(source.try_pushdown_sort(&order, &eq_properties)?)?;
updated_source.batch_size = Some(100);

let config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
Arc::new(updated_source.clone()),
)
.build();

let opener = updated_source.create_vortex_opener(
Arc::new(InMemory::new()) as Arc<dyn ObjectStore>,
&config,
0,
)?;

assert!(!opener.has_output_ordering);
Ok(())
}

#[test]
fn reorder_files_sorts_by_min_statistics_ascending() -> anyhow::Result<()> {
let schema = sort_test_schema();
let source = sort_test_source(Arc::clone(&schema));
let order = vec![sort_column("a", 0)];
let eq_properties = EquivalenceProperties::new(schema);
let updated_source =
assert_inexact_source(source.try_pushdown_sort(&order, &eq_properties)?)?;

let reordered = updated_source.reorder_files(vec![
file_with_min("middle", Some(50)),
file_with_min("missing", None),
file_with_min("small", Some(10)),
file_with_min("large", Some(100)),
]);

assert_eq!(
file_names(&reordered),
vec!["small", "middle", "large", "missing"]
);
Ok(())
}

#[test]
fn reorder_files_sorts_by_min_statistics_descending() -> anyhow::Result<()> {
let schema = sort_test_schema();
let source = sort_test_source(Arc::clone(&schema));
let order = vec![sort_column_desc("a", 0)];
let eq_properties = EquivalenceProperties::new(schema);
let updated_source =
assert_inexact_source(source.try_pushdown_sort(&order, &eq_properties)?)?;

let reordered = updated_source.reorder_files(vec![
file_with_min("middle", Some(50)),
file_with_min("missing", None),
file_with_min("small", Some(10)),
file_with_min("large", Some(100)),
]);

assert_eq!(
file_names(&reordered),
vec!["large", "middle", "small", "missing"]
);
Ok(())
}

Expand Down
Loading