diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index 469759527f6..4d9a14cbf70 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -1,13 +1,16 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::cmp::Ordering; use std::fmt::Formatter; use std::ops::Range; use std::sync::Arc; use std::sync::Weak; use datafusion_common::Result as DFResult; +use datafusion_common::ScalarValue; use datafusion_common::config::ConfigOptions; +use datafusion_datasource::PartitionedFile; use datafusion_datasource::TableSchema; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; @@ -17,9 +20,11 @@ use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_expr::PhysicalExprRef; use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_expr::conjunction; +use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::fmt_sql; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::DisplayFormatType; use datafusion_physical_plan::PhysicalExpr; use datafusion_physical_plan::SortOrderPushdownResult; @@ -200,7 +205,7 @@ pub struct VortexSource { natural_split_ranges: Arc]>>>, expression_convertor: Arc, pub(crate) vortex_reader_factory: Option>, - pub(crate) ordered: bool, + pub(crate) sort_order: Option, vx_metrics_registry: Arc, file_metadata_cache: Option>, /// Options controlling scan planning and execution behavior. @@ -236,7 +241,7 @@ impl VortexSource { vortex_reader_factory: None, vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()), file_metadata_cache: None, - ordered: false, + sort_order: None, options: VortexTableOptions::default(), } } @@ -363,7 +368,7 @@ impl VortexSource { metrics_registry: Arc::clone(&self.vx_metrics_registry), layout_readers: Arc::clone(&self.layout_readers), natural_split_ranges: Arc::clone(&self.natural_split_ranges), - has_output_ordering: !base_config.output_ordering.is_empty() || self.ordered, + has_output_ordering: !base_config.output_ordering.is_empty(), expression_convertor: Arc::clone(&self.expression_convertor), file_metadata_cache: self.file_metadata_cache.clone(), projection_pushdown: self.options.projection_pushdown, @@ -415,16 +420,50 @@ impl FileSource for VortexSource { return Ok(SortOrderPushdownResult::Unsupported); } - if eq_properties.ordering_satisfy(order.iter().cloned())? { - let mut this = self.clone(); - this.ordered = true; + let Some(sort_order) = LexOrdering::new(order.iter().cloned()) else { + return Ok(SortOrderPushdownResult::Unsupported); + }; + + if sort_reorder_key(&sort_order, &self.table_schema).is_none() { + return Ok(SortOrderPushdownResult::Unsupported); + } + let mut this = self.clone(); + this.sort_order = Some(sort_order); + + if eq_properties.ordering_satisfy(order.iter().cloned())? { return Ok(SortOrderPushdownResult::Exact { inner: Arc::new(this) as Arc, }); } - Ok(SortOrderPushdownResult::Unsupported) + Ok(SortOrderPushdownResult::Inexact { + inner: Arc::new(this) as Arc, + }) + } + + fn reorder_files(&self, mut files: Vec) -> Vec { + let Some((col_idx, descending)) = self + .sort_order + .as_ref() + .and_then(|sort_order| sort_reorder_key(sort_order, &self.table_schema)) + else { + return files; + }; + + files.sort_unstable_by(|a, b| { + match (file_min_value(a, col_idx), file_min_value(b, col_idx)) { + (Some(va), Some(vb)) => { + let cmp = va.partial_cmp(vb).unwrap_or(Ordering::Equal); + if descending { cmp.reverse() } else { cmp } + } + (Some(_), None) => Ordering::Less, + (None, Some(_)) => Ordering::Greater, + (None, None) => Ordering::Equal, + } + }); + + files } fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { @@ -543,13 +582,35 @@ impl FileSource for VortexSource { } } +fn sort_reorder_key(sort_order: &LexOrdering, table_schema: &TableSchema) -> Option<(usize, bool)> { + let first = sort_order.first(); + let col = first.expr.downcast_ref::()?; + let col_idx = table_schema.file_schema().index_of(col.name()).ok()?; + Some((col_idx, first.options.descending)) +} + +fn file_min_value(file: &PartitionedFile, col_idx: usize) -> Option<&ScalarValue> { + file.statistics + .as_ref()? + .column_statistics + .get(col_idx)? + .min_value + .get_value() +} + #[cfg(test)] mod tests { + use std::sync::Arc; + use arrow_schema::DataType; use arrow_schema::Field; use arrow_schema::Schema; + use datafusion_common::ColumnStatistics; use datafusion_common::ScalarValue; + use datafusion_common::Statistics; use datafusion_common::config::ConfigOptions; + use datafusion_common::stats::Precision; + use datafusion_datasource::PartitionedFile; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::Operator; @@ -602,6 +663,10 @@ mod tests { PhysicalSortExpr::new_default(expr) } + fn sort_column_desc(name: &str, index: usize) -> PhysicalSortExpr { + sort_column(name, index).desc() + } + fn sort_test_schema() -> Arc { Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, false), @@ -616,6 +681,10 @@ mod tests { ) } + fn sort_test_source_with_table_schema(table_schema: TableSchema) -> VortexSource { + VortexSource::new(table_schema, VortexSession::default()) + } + fn octet_length_filter(schema: &Schema) -> PhysicalExprRef { let name = Arc::new(Column::new("name", 0)) as PhysicalExprRef; let octet_length = Arc::new( @@ -637,10 +706,48 @@ mod tests { .downcast_ref::() .ok_or_else(|| anyhow::anyhow!("expected VortexSource"))?; - assert!(source.ordered); + assert!(source.sort_order.is_some()); Ok(()) } + fn assert_inexact_source( + result: SortOrderPushdownResult>, + ) -> anyhow::Result { + let SortOrderPushdownResult::Inexact { inner } = result else { + anyhow::bail!("expected inexact sort pushdown"); + }; + + Ok(inner + .downcast_ref::() + .ok_or_else(|| anyhow::anyhow!("expected VortexSource"))? + .clone()) + } + + fn file_with_min(path: &str, min: Option) -> PartitionedFile { + let column_stats = min.map_or_else(ColumnStatistics::new_unknown, |min| { + let value = ScalarValue::Int32(Some(min)); + ColumnStatistics { + min_value: Precision::Exact(value.clone()), + max_value: Precision::Exact(value), + ..ColumnStatistics::new_unknown() + } + }); + let statistics = Statistics { + num_rows: Precision::Exact(1), + total_byte_size: Precision::Absent, + column_statistics: vec![column_stats, ColumnStatistics::new_unknown()], + }; + + PartitionedFile::new(path, 1).with_statistics(Arc::new(statistics)) + } + + fn file_names(files: &[PartitionedFile]) -> Vec<&str> { + files + .iter() + .map(|file| file.object_meta.location.as_ref()) + .collect() + } + #[test] fn try_pushdown_sort_returns_exact_when_ordering_is_satisfied() -> anyhow::Result<()> { let schema = sort_test_schema(); @@ -656,7 +763,132 @@ mod tests { anyhow::bail!("expected exact sort pushdown") } } - assert!(!source.ordered); + assert!(source.sort_order.is_none()); + Ok(()) + } + + #[test] + fn try_pushdown_sort_returns_inexact_and_keeps_order() -> anyhow::Result<()> { + let schema = sort_test_schema(); + let source = sort_test_source(Arc::clone(&schema)); + let order = vec![sort_column("a", 0)]; + let eq_properties = EquivalenceProperties::new(schema); + + let updated_source = + assert_inexact_source(source.try_pushdown_sort(&order, &eq_properties)?)?; + + let sort_order = updated_source + .sort_order + .as_ref() + .ok_or_else(|| anyhow::anyhow!("expected sort order for file reordering"))?; + assert_eq!(sort_order.first().expr.to_string(), "a@0"); + Ok(()) + } + + #[test] + fn try_pushdown_sort_returns_unsupported_without_file_reorder_key() -> anyhow::Result<()> { + let schema = sort_test_schema(); + let source = sort_test_source(Arc::clone(&schema)); + let expr: PhysicalExprRef = Arc::new(df_expr::BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Plus, + Arc::new(df_expr::Literal::new(ScalarValue::Int32(Some(1)))), + )); + let order = vec![PhysicalSortExpr::new_default(expr)]; + let eq_properties = EquivalenceProperties::new(schema); + + assert!(matches!( + source.try_pushdown_sort(&order, &eq_properties)?, + SortOrderPushdownResult::Unsupported + )); + Ok(()) + } + + #[test] + fn try_pushdown_sort_returns_unsupported_for_partition_column() -> anyhow::Result<()> { + let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let partition_col = Arc::new(Field::new("part", DataType::Int32, false)); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![partition_col]); + let source = sort_test_source_with_table_schema(table_schema); + let order = vec![sort_column("part", 1)]; + let eq_properties = EquivalenceProperties::new(file_schema); + + assert!(matches!( + source.try_pushdown_sort(&order, &eq_properties)?, + SortOrderPushdownResult::Unsupported + )); + Ok(()) + } + + #[test] + fn sort_reorder_hint_does_not_force_ordered_vortex_scan() -> anyhow::Result<()> { + let schema = sort_test_schema(); + let source = sort_test_source(Arc::clone(&schema)); + let order = vec![sort_column("a", 0)]; + let eq_properties = EquivalenceProperties::new(schema); + let mut updated_source = + assert_inexact_source(source.try_pushdown_sort(&order, &eq_properties)?)?; + updated_source.batch_size = Some(100); + + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + Arc::new(updated_source.clone()), + ) + .build(); + + let opener = updated_source.create_vortex_opener( + Arc::new(InMemory::new()) as Arc, + &config, + 0, + )?; + + assert!(!opener.has_output_ordering); + Ok(()) + } + + #[test] + fn reorder_files_sorts_by_min_statistics_ascending() -> anyhow::Result<()> { + let schema = sort_test_schema(); + let source = sort_test_source(Arc::clone(&schema)); + let order = vec![sort_column("a", 0)]; + let eq_properties = EquivalenceProperties::new(schema); + let updated_source = + assert_inexact_source(source.try_pushdown_sort(&order, &eq_properties)?)?; + + let reordered = updated_source.reorder_files(vec![ + file_with_min("middle", Some(50)), + file_with_min("missing", None), + file_with_min("small", Some(10)), + file_with_min("large", Some(100)), + ]); + + assert_eq!( + file_names(&reordered), + vec!["small", "middle", "large", "missing"] + ); + Ok(()) + } + + #[test] + fn reorder_files_sorts_by_min_statistics_descending() -> anyhow::Result<()> { + let schema = sort_test_schema(); + let source = sort_test_source(Arc::clone(&schema)); + let order = vec![sort_column_desc("a", 0)]; + let eq_properties = EquivalenceProperties::new(schema); + let updated_source = + assert_inexact_source(source.try_pushdown_sort(&order, &eq_properties)?)?; + + let reordered = updated_source.reorder_files(vec![ + file_with_min("middle", Some(50)), + file_with_min("missing", None), + file_with_min("small", Some(10)), + file_with_min("large", Some(100)), + ]); + + assert_eq!( + file_names(&reordered), + vec!["large", "middle", "small", "missing"] + ); Ok(()) }