From 4f32396ce109dda2a89fb78febf9e965160e4948 Mon Sep 17 00:00:00 2001 From: Sahitya0805 Date: Mon, 9 Feb 2026 14:48:54 +0530 Subject: [PATCH 1/3] feat: Add SortOptions builder for better Expr::sort API Introduces a new SortOptions struct with fluent builder methods to eliminate boolean blindness in sort expressions. Changes: - Add SortOptions struct with desc(), asc(), nulls_first(), nulls_last() - Add Expr::sort_by() method accepting SortOptions - Keep existing Expr::sort() for backward compatibility - Add test demonstrating both old and new APIs Fixes #20227 --- datafusion/core/tests/repro_sort_api.rs | 32 ++++++++++++++++ datafusion/expr/src/expr.rs | 11 ++++++ datafusion/expr/src/lib.rs | 2 + datafusion/expr/src/sort_options.rs | 50 +++++++++++++++++++++++++ 4 files changed, 95 insertions(+) create mode 100644 datafusion/core/tests/repro_sort_api.rs create mode 100644 datafusion/expr/src/sort_options.rs diff --git a/datafusion/core/tests/repro_sort_api.rs b/datafusion/core/tests/repro_sort_api.rs new file mode 100644 index 0000000000000..336f21f15d403 --- /dev/null +++ b/datafusion/core/tests/repro_sort_api.rs @@ -0,0 +1,32 @@ +use datafusion::prelude::*; +use datafusion_common::Result; +use datafusion_expr::SortOptions; + +#[test] +fn test_sort_api_usage() -> Result<()> { + let expr = col("a"); + + // Old API: sort(asc, nulls_first) + // "True, False" -> Ascending, Nulls Last + let sort_expr = expr.clone().sort(true, false); + + assert_eq!(sort_expr.asc, true); + assert_eq!(sort_expr.nulls_first, false); + + // New API: sort_by with SortOptions + // Descending, Nulls First + let sort_expr = expr.clone().sort_by(SortOptions::new().desc().nulls_first()); + + assert_eq!(sort_expr.asc, false); + assert_eq!(sort_expr.nulls_first, true); + + // New API: Ascending, Nulls Last (default) + let sort_expr = expr.sort_by(SortOptions::new()); + + assert_eq!(sort_expr.asc, true); + assert_eq!(sort_expr.nulls_first, false); + + Ok(()) +} + + diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 09454795fd42d..4cf5087610231 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -1861,6 +1861,17 @@ impl Expr { Sort::new(self, asc, nulls_first) } + /// Create a sort configuration from an existing expression using `SortOptions`. + /// + /// ``` + /// # use datafusion_expr::{col, SortOptions}; + /// let sort_expr = col("foo").sort_by(SortOptions::new().desc().nulls_first()); + /// ``` + pub fn sort_by(self, options: crate::sort_options::SortOptions) -> Sort { + Sort::new(self, !options.descending, options.nulls_first) + } + + /// Return `IsTrue(Box(self))` pub fn is_true(self) -> Expr { Expr::IsTrue(Box::new(self)) diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index cb136229bf88d..3e2394eab467c 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -68,6 +68,7 @@ pub mod dml { pub mod planner; pub mod registry; pub mod simplify; +pub mod sort_options; pub mod sort_properties { pub use datafusion_expr_common::sort_properties::*; } @@ -128,6 +129,7 @@ pub use udaf::{ pub use udf::{ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl}; pub use udwf::{LimitEffect, ReversedUDWF, WindowUDF, WindowUDFImpl}; pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits}; +pub use sort_options::SortOptions; #[cfg(test)] #[ctor::ctor] diff --git a/datafusion/expr/src/sort_options.rs b/datafusion/expr/src/sort_options.rs new file mode 100644 index 0000000000000..7c80b29c7a76c --- /dev/null +++ b/datafusion/expr/src/sort_options.rs @@ -0,0 +1,50 @@ +use arrow::compute::SortOptions as ArrowSortOptions; + +/// Options for sorting. +/// +/// This struct implements a builder pattern for creating `SortOptions`. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub struct SortOptions { + pub descending: bool, + pub nulls_first: bool, +} + +impl SortOptions { + /// Create a new `SortOptions` with default values (Ascending, Nulls Last). + pub fn new() -> Self { + Self::default() + } + + /// Set sort order to descending. + pub fn desc(mut self) -> Self { + self.descending = true; + self + } + + /// Set sort order to ascending. + pub fn asc(mut self) -> Self { + self.descending = false; + self + } + + /// Set nulls to come first. + pub fn nulls_first(mut self) -> Self { + self.nulls_first = true; + self + } + + /// Set nulls to come last. + pub fn nulls_last(mut self) -> Self { + self.nulls_first = false; + self + } +} + +impl From for ArrowSortOptions { + fn from(options: SortOptions) -> Self { + ArrowSortOptions { + descending: options.descending, + nulls_first: options.nulls_first, + } + } +} From 93dd5ca4bc5317406e4d04a2c5bac11df4251507 Mon Sep 17 00:00:00 2001 From: Sahitya0805 Date: Mon, 9 Feb 2026 15:40:21 +0530 Subject: [PATCH 2/3] Make SortOptions::new() explicit about default values Address review feedback to make the default values explicit instead of hiding them behind Self::default(). --- datafusion/expr/src/sort_options.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datafusion/expr/src/sort_options.rs b/datafusion/expr/src/sort_options.rs index 7c80b29c7a76c..ed06c2c4ad0e4 100644 --- a/datafusion/expr/src/sort_options.rs +++ b/datafusion/expr/src/sort_options.rs @@ -12,7 +12,10 @@ pub struct SortOptions { impl SortOptions { /// Create a new `SortOptions` with default values (Ascending, Nulls Last). pub fn new() -> Self { - Self::default() + Self { + descending: false, + nulls_first: false, + } } /// Set sort order to descending. From 76412684dfc74ca073afa958632d4949b2fe2312 Mon Sep 17 00:00:00 2001 From: Sahitya0805 Date: Mon, 9 Feb 2026 16:35:17 +0530 Subject: [PATCH 3/3] Remove schema alignment constraint from EquivalenceProperties::with_new_schema --- .../physical-expr/src/equivalence/ordering.rs | 70 ++++++++++++++----- .../src/equivalence/properties/mod.rs | 62 ++++++---------- 2 files changed, 75 insertions(+), 57 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index 2ce8a8d246fe7..a52dcc86127f8 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -192,22 +192,25 @@ impl OrderingEquivalenceClass { /// same number of columns, and fields at the same index have the same type /// in both schemas. pub fn with_new_schema(mut self, schema: &SchemaRef) -> Result { - self.orderings = self - .orderings - .into_iter() - .map(|ordering| { - ordering - .into_iter() - .map(|mut sort_expr| { - sort_expr.expr = with_new_schema(sort_expr.expr, schema)?; - Ok(sort_expr) - }) - .collect::>>() - // The following `unwrap` is safe because the vector will always - // be non-empty. - .map(|v| LexOrdering::new(v).unwrap()) - }) - .collect::>()?; + let mut new_orderings = vec![]; + for ordering in self.orderings { + let mut new_ordering = vec![]; + for mut sort_expr in ordering { + // Keep the component if it still exists in the new schema + if let Ok(new_expr) = with_new_schema(sort_expr.expr, schema) { + sort_expr.expr = new_expr; + new_ordering.push(sort_expr); + } else { + // Stop if the column is missing + break; + } + } + // Add the new ordering if it's not empty + if let Some(o) = LexOrdering::new(new_ordering) { + new_orderings.push(o); + } + } + self.orderings = new_orderings; Ok(self) } @@ -950,6 +953,41 @@ mod tests { assert!(expected.contains(&elem), "{}", err_msg); } } + Ok(()) + } + + #[test] + fn test_with_new_schema_unaligned() -> Result<()> { + let col_a = Arc::new(Column::new("a", 0)); + let col_b = Arc::new(Column::new("b", 1)); + let options = SortOptions::default(); + + let oeq = OrderingEquivalenceClass::new(vec![vec![ + PhysicalSortExpr { + expr: Arc::clone(&col_a) as _, + options, + }, + PhysicalSortExpr { + expr: Arc::clone(&col_b) as _, + options, + }, + ]]); + + // New schema only has "a" + let schema2 = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])); + let oeq_new = oeq.with_new_schema(&schema2)?; + + assert_eq!(oeq_new.len(), 1); + assert_eq!(oeq_new[0].len(), 1); + assert_eq!( + oeq_new[0][0] + .expr + .as_any() + .downcast_ref::() + .unwrap() + .name(), + "a" + ); Ok(()) } diff --git a/datafusion/physical-expr/src/equivalence/properties/mod.rs b/datafusion/physical-expr/src/equivalence/properties/mod.rs index 996bc4b08fcd2..49ea5fc2b8427 100644 --- a/datafusion/physical-expr/src/equivalence/properties/mod.rs +++ b/datafusion/physical-expr/src/equivalence/properties/mod.rs @@ -41,7 +41,7 @@ use crate::{ use arrow::datatypes::SchemaRef; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::{Constraint, Constraints, HashMap, Result, plan_err}; +use datafusion_common::{Constraint, Constraints, HashMap, Result}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_physical_expr_common::sort_expr::options_compatible; @@ -1263,56 +1263,36 @@ impl EquivalenceProperties { /// Transforms this `EquivalenceProperties` by mapping columns in the /// original schema to columns in the new schema by index. pub fn with_new_schema(mut self, schema: SchemaRef) -> Result { - // The new schema and the original schema is aligned when they have the - // same number of columns, and fields at the same index have the same - // type in both schemas. - let schemas_aligned = (self.schema.fields.len() == schema.fields.len()) - && self - .schema - .fields - .iter() - .zip(schema.fields.iter()) - .all(|(lhs, rhs)| lhs.data_type().eq(rhs.data_type())); - if !schemas_aligned { - // Rewriting equivalence properties in terms of new schema is not - // safe when schemas are not aligned: - return plan_err!( - "Schemas have to be aligned to rewrite equivalences:\n Old schema: {}\n New schema: {}", - self.schema, - schema - ); - } - - // Rewrite equivalence classes according to the new schema: let mut eq_classes = vec![]; for mut eq_class in self.eq_group { - // Rewrite the expressions in the equivalence class: - eq_class.exprs = eq_class - .exprs - .into_iter() - .map(|expr| with_new_schema(expr, &schema)) - .collect::>()?; - // Rewrite the constant value (if available and known): - let data_type = eq_class - .canonical_expr() - .map(|e| e.data_type(&schema)) - .transpose()?; - if let (Some(data_type), Some(AcrossPartitions::Uniform(Some(value)))) = - (data_type, &mut eq_class.constant) - { - *value = value.cast_to(&data_type)?; + let mut new_exprs = vec![]; + for expr in eq_class.exprs { + // Keep the expression if it still exists in the new schema + if let Ok(new_expr) = with_new_schema(expr, &schema) { + new_exprs.push(new_expr); + } + } + + // Keep the class if it has more than one expression or is a constant + if new_exprs.len() > 1 || (new_exprs.len() == 1 && eq_class.constant.is_some()) { + eq_class.exprs = new_exprs.into_iter().collect(); + + // Update the constant value type if needed + if let Ok(Some(data_type)) = eq_class.canonical_expr().map(|e| e.data_type(&schema)).transpose() { + if let Some(AcrossPartitions::Uniform(Some(value))) = &mut eq_class.constant { + *value = value.cast_to(&data_type)?; + } + } + eq_classes.push(eq_class); } - eq_classes.push(eq_class); } self.eq_group = eq_classes.into(); - // Rewrite orderings according to new schema: + // Update the orderings self.oeq_class = self.oeq_class.with_new_schema(&schema)?; self.oeq_cache.normal_cls = self.oeq_cache.normal_cls.with_new_schema(&schema)?; - // Update the schema: self.schema = schema; - Ok(self) } }