Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions datafusion/core/tests/repro_sort_api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use datafusion::prelude::*;
use datafusion_common::Result;
use datafusion_expr::SortOptions;

#[test]
fn test_sort_api_usage() -> Result<()> {
let expr = col("a");

// Old API: sort(asc, nulls_first)
// "True, False" -> Ascending, Nulls Last
let sort_expr = expr.clone().sort(true, false);

assert_eq!(sort_expr.asc, true);
assert_eq!(sort_expr.nulls_first, false);

// New API: sort_by with SortOptions
// Descending, Nulls First
let sort_expr = expr.clone().sort_by(SortOptions::new().desc().nulls_first());

assert_eq!(sort_expr.asc, false);
assert_eq!(sort_expr.nulls_first, true);

// New API: Ascending, Nulls Last (default)
let sort_expr = expr.sort_by(SortOptions::new());

assert_eq!(sort_expr.asc, true);
assert_eq!(sort_expr.nulls_first, false);

Ok(())
}


11 changes: 11 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1861,6 +1861,17 @@ impl Expr {
Sort::new(self, asc, nulls_first)
}

/// Create a sort configuration from an existing expression using `SortOptions`.
///
/// ```
/// # use datafusion_expr::{col, SortOptions};
/// let sort_expr = col("foo").sort_by(SortOptions::new().desc().nulls_first());
/// ```
pub fn sort_by(self, options: crate::sort_options::SortOptions) -> Sort {
Sort::new(self, !options.descending, options.nulls_first)
}


/// Return `IsTrue(Box(self))`
pub fn is_true(self) -> Expr {
Expr::IsTrue(Box::new(self))
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub mod dml {
pub mod planner;
pub mod registry;
pub mod simplify;
pub mod sort_options;
pub mod sort_properties {
pub use datafusion_expr_common::sort_properties::*;
}
Expand Down Expand Up @@ -128,6 +129,7 @@ pub use udaf::{
pub use udf::{ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl};
pub use udwf::{LimitEffect, ReversedUDWF, WindowUDF, WindowUDFImpl};
pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
pub use sort_options::SortOptions;

#[cfg(test)]
#[ctor::ctor]
Expand Down
53 changes: 53 additions & 0 deletions datafusion/expr/src/sort_options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use arrow::compute::SortOptions as ArrowSortOptions;

/// Options for sorting.
///
/// This struct implements a builder pattern for creating `SortOptions`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct SortOptions {
pub descending: bool,
pub nulls_first: bool,
}

impl SortOptions {
/// Create a new `SortOptions` with default values (Ascending, Nulls Last).
pub fn new() -> Self {
Self {
descending: false,
nulls_first: false,
}
}

/// Set sort order to descending.
pub fn desc(mut self) -> Self {
self.descending = true;
self
}

/// Set sort order to ascending.
pub fn asc(mut self) -> Self {
self.descending = false;
self
}

/// Set nulls to come first.
pub fn nulls_first(mut self) -> Self {
self.nulls_first = true;
self
}

/// Set nulls to come last.
pub fn nulls_last(mut self) -> Self {
self.nulls_first = false;
self
}
}

impl From<SortOptions> for ArrowSortOptions {
fn from(options: SortOptions) -> Self {
ArrowSortOptions {
descending: options.descending,
nulls_first: options.nulls_first,
}
}
}
70 changes: 54 additions & 16 deletions datafusion/physical-expr/src/equivalence/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,22 +192,25 @@ impl OrderingEquivalenceClass {
/// same number of columns, and fields at the same index have the same type
/// in both schemas.
pub fn with_new_schema(mut self, schema: &SchemaRef) -> Result<Self> {
self.orderings = self
.orderings
.into_iter()
.map(|ordering| {
ordering
.into_iter()
.map(|mut sort_expr| {
sort_expr.expr = with_new_schema(sort_expr.expr, schema)?;
Ok(sort_expr)
})
.collect::<Result<Vec<_>>>()
// The following `unwrap` is safe because the vector will always
// be non-empty.
.map(|v| LexOrdering::new(v).unwrap())
})
.collect::<Result<_>>()?;
let mut new_orderings = vec![];
for ordering in self.orderings {
let mut new_ordering = vec![];
for mut sort_expr in ordering {
// Keep the component if it still exists in the new schema
if let Ok(new_expr) = with_new_schema(sort_expr.expr, schema) {
sort_expr.expr = new_expr;
new_ordering.push(sort_expr);
} else {
// Stop if the column is missing
break;
}
}
// Add the new ordering if it's not empty
if let Some(o) = LexOrdering::new(new_ordering) {
new_orderings.push(o);
}
}
self.orderings = new_orderings;
Ok(self)
}

Expand Down Expand Up @@ -950,6 +953,41 @@ mod tests {
assert!(expected.contains(&elem), "{}", err_msg);
}
}
Ok(())
}

#[test]
fn test_with_new_schema_unaligned() -> Result<()> {
let col_a = Arc::new(Column::new("a", 0));
let col_b = Arc::new(Column::new("b", 1));
let options = SortOptions::default();

let oeq = OrderingEquivalenceClass::new(vec![vec![
PhysicalSortExpr {
expr: Arc::clone(&col_a) as _,
options,
},
PhysicalSortExpr {
expr: Arc::clone(&col_b) as _,
options,
},
]]);

// New schema only has "a"
let schema2 = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
let oeq_new = oeq.with_new_schema(&schema2)?;

assert_eq!(oeq_new.len(), 1);
assert_eq!(oeq_new[0].len(), 1);
assert_eq!(
oeq_new[0][0]
.expr
.as_any()
.downcast_ref::<Column>()
.unwrap()
.name(),
"a"
);

Ok(())
}
Expand Down
62 changes: 21 additions & 41 deletions datafusion/physical-expr/src/equivalence/properties/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::{

use arrow::datatypes::SchemaRef;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Constraint, Constraints, HashMap, Result, plan_err};
use datafusion_common::{Constraint, Constraints, HashMap, Result};
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_physical_expr_common::sort_expr::options_compatible;
Expand Down Expand Up @@ -1263,56 +1263,36 @@ impl EquivalenceProperties {
/// Transforms this `EquivalenceProperties` by mapping columns in the
/// original schema to columns in the new schema by index.
pub fn with_new_schema(mut self, schema: SchemaRef) -> Result<Self> {
// The new schema and the original schema is aligned when they have the
// same number of columns, and fields at the same index have the same
// type in both schemas.
let schemas_aligned = (self.schema.fields.len() == schema.fields.len())
&& self
.schema
.fields
.iter()
.zip(schema.fields.iter())
.all(|(lhs, rhs)| lhs.data_type().eq(rhs.data_type()));
if !schemas_aligned {
// Rewriting equivalence properties in terms of new schema is not
// safe when schemas are not aligned:
return plan_err!(
"Schemas have to be aligned to rewrite equivalences:\n Old schema: {}\n New schema: {}",
self.schema,
schema
);
}

// Rewrite equivalence classes according to the new schema:
let mut eq_classes = vec![];
for mut eq_class in self.eq_group {
// Rewrite the expressions in the equivalence class:
eq_class.exprs = eq_class
.exprs
.into_iter()
.map(|expr| with_new_schema(expr, &schema))
.collect::<Result<_>>()?;
// Rewrite the constant value (if available and known):
let data_type = eq_class
.canonical_expr()
.map(|e| e.data_type(&schema))
.transpose()?;
if let (Some(data_type), Some(AcrossPartitions::Uniform(Some(value)))) =
(data_type, &mut eq_class.constant)
{
*value = value.cast_to(&data_type)?;
let mut new_exprs = vec![];
for expr in eq_class.exprs {
// Keep the expression if it still exists in the new schema
if let Ok(new_expr) = with_new_schema(expr, &schema) {
new_exprs.push(new_expr);
}
}

// Keep the class if it has more than one expression or is a constant
if new_exprs.len() > 1 || (new_exprs.len() == 1 && eq_class.constant.is_some()) {
eq_class.exprs = new_exprs.into_iter().collect();

// Update the constant value type if needed
if let Ok(Some(data_type)) = eq_class.canonical_expr().map(|e| e.data_type(&schema)).transpose() {
if let Some(AcrossPartitions::Uniform(Some(value))) = &mut eq_class.constant {
*value = value.cast_to(&data_type)?;
}
}
eq_classes.push(eq_class);
}
eq_classes.push(eq_class);
}
self.eq_group = eq_classes.into();

// Rewrite orderings according to new schema:
// Update the orderings
self.oeq_class = self.oeq_class.with_new_schema(&schema)?;
self.oeq_cache.normal_cls = self.oeq_cache.normal_cls.with_new_schema(&schema)?;

// Update the schema:
self.schema = schema;

Ok(self)
}
}
Expand Down