diff --git a/datafusion-examples/examples/relation_planner/match_recognize.rs b/datafusion-examples/examples/relation_planner/match_recognize.rs index c4b3d522efc17..e64de9e841b61 100644 --- a/datafusion-examples/examples/relation_planner/match_recognize.rs +++ b/datafusion-examples/examples/relation_planner/match_recognize.rs @@ -374,7 +374,7 @@ impl RelationPlanner for MatchRecognizePlanner { .iter() .map(|m| { let alias = ctx.normalize_ident(m.alias.clone()); - let expr = ctx.sql_to_expr(m.expr.clone(), schema.as_ref())?; + let expr = ctx.sql_to_expr(m.expr.clone(), &schema)?; Ok((alias, expr)) }) .collect::>()?; @@ -384,7 +384,7 @@ impl RelationPlanner for MatchRecognizePlanner { .iter() .map(|s| { let name = ctx.normalize_ident(s.symbol.clone()); - let expr = ctx.sql_to_expr(s.definition.clone(), schema.as_ref())?; + let expr = ctx.sql_to_expr(s.definition.clone(), &schema)?; Ok((name, expr)) }) .collect::>()?; diff --git a/datafusion-examples/examples/relation_planner/pivot_unpivot.rs b/datafusion-examples/examples/relation_planner/pivot_unpivot.rs index 2e1696956bf62..decb557fcf4e8 100644 --- a/datafusion-examples/examples/relation_planner/pivot_unpivot.rs +++ b/datafusion-examples/examples/relation_planner/pivot_unpivot.rs @@ -364,7 +364,7 @@ fn plan_pivot( // Parse aggregate functions let aggregates: Vec = aggregate_functions .iter() - .map(|agg| ctx.sql_to_expr(agg.expr.clone(), schema.as_ref())) + .map(|agg| ctx.sql_to_expr(agg.expr.clone(), schema)) .collect::>()?; // Get the pivot column (only single-column pivot supported) @@ -373,7 +373,7 @@ fn plan_pivot( "Only single-column PIVOT is supported" )); } - let pivot_col = ctx.sql_to_expr(value_column[0].clone(), schema.as_ref())?; + let pivot_col = ctx.sql_to_expr(value_column[0].clone(), schema)?; let pivot_col_name = extract_column_name(&pivot_col)?; // Parse pivot values @@ -385,7 +385,7 @@ fn plan_pivot( .alias .as_ref() .map(|id| ctx.normalize_ident(id.clone())); - let expr = ctx.sql_to_expr(item.expr.clone(), schema.as_ref())?; + let expr = ctx.sql_to_expr(item.expr.clone(), schema)?; Ok((alias, expr)) }) .collect::>>()?, @@ -495,7 +495,7 @@ fn plan_unpivot( .as_ref() .map(|id| ctx.normalize_ident(id.clone())) .unwrap_or_else(|| c.expr.to_string()); - let expr = ctx.sql_to_expr(c.expr.clone(), schema.as_ref())?; + let expr = ctx.sql_to_expr(c.expr.clone(), schema)?; let col_name = extract_column_name(&expr)?; Ok((col_name.to_string(), label)) }) diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 9560616c1b6da..2d16c3eb07ff0 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -43,8 +43,8 @@ use datafusion_common::config::{ConfigExtension, ConfigOptions, TableOptions}; use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan}; use datafusion_common::tree_node::TreeNode; use datafusion_common::{ - DFSchema, DataFusionError, ResolvedTableReference, TableReference, config_err, - exec_err, plan_datafusion_err, + DFSchema, DFSchemaRef, DataFusionError, ResolvedTableReference, TableReference, + config_err, exec_err, plan_datafusion_err, }; use datafusion_execution::TaskContext; use datafusion_execution::config::SessionConfig; @@ -568,8 +568,9 @@ impl SessionState { let dialect = self.config.options().sql_parser.dialect; let sql_expr = self.sql_to_expr_with_alias(sql, &dialect)?; + let df_schema = Arc::new(df_schema.clone()); - self.create_logical_expr_from_sql_expr(sql_expr, df_schema) + self.create_logical_expr_from_sql_expr(sql_expr, &df_schema) } /// Creates a datafusion style AST [`Expr`] from a SQL expression. @@ -577,7 +578,7 @@ impl SessionState { pub fn create_logical_expr_from_sql_expr( &self, sql_expr: SQLExprWithAlias, - df_schema: &DFSchema, + df_schema: &DFSchemaRef, ) -> datafusion_common::Result { let provider = SessionContextProvider { state: self, @@ -2187,7 +2188,7 @@ mod tests { let sql = "[1,2,3]"; let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); - let df_schema = DFSchema::try_from(schema)?; + let df_schema = Arc::new(DFSchema::try_from(schema)?); let dialect = state.config.options().sql_parser.dialect; let sql_expr = state.sql_to_expr(sql, &dialect)?; @@ -2216,7 +2217,7 @@ mod tests { }; let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); - let df_schema = DFSchema::try_from(schema).unwrap(); + let df_schema = Arc::new(DFSchema::try_from(schema).unwrap()); let dialect = state.config.options().sql_parser.dialect; let query = SqlToRel::new_with_options(&provider, state.get_parser_options()); diff --git a/datafusion/expr/src/planner.rs b/datafusion/expr/src/planner.rs index 837a9eefe289f..0f5e75894dedf 100644 --- a/datafusion/expr/src/planner.rs +++ b/datafusion/expr/src/planner.rs @@ -30,7 +30,7 @@ use crate::{ use arrow::datatypes::{DataType, Field, FieldRef, SchemaRef}; use datafusion_common::datatype::DataTypeExt; use datafusion_common::{ - DFSchema, Result, TableReference, config::ConfigOptions, + DFSchema, DFSchemaRef, Result, TableReference, config::ConfigOptions, file_options::file_type::FileType, not_impl_err, }; #[cfg(feature = "sql")] @@ -408,14 +408,14 @@ pub trait RelationPlannerContext { /// Converts a SQL expression into a logical expression using the current /// planner context. - fn sql_to_expr(&mut self, expr: SQLExpr, schema: &DFSchema) -> Result; + fn sql_to_expr(&mut self, expr: SQLExpr, schema: &DFSchemaRef) -> Result; /// Converts a SQL expression into a logical expression without DataFusion /// rewrites. fn sql_expr_to_logical_expr( &mut self, expr: SQLExpr, - schema: &DFSchema, + schema: &DFSchemaRef, ) -> Result; /// Normalizes an identifier according to session settings. diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs index 641f3bb8dcad1..ba630b6b0ba92 100644 --- a/datafusion/sql/src/expr/function.rs +++ b/datafusion/sql/src/expr/function.rs @@ -19,7 +19,7 @@ use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use arrow::datatypes::DataType; use datafusion_common::{ - DFSchema, Dependency, Diagnostic, Result, Span, internal_datafusion_err, + DFSchema, DFSchemaRef, Dependency, Diagnostic, Result, Span, internal_datafusion_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, }; use datafusion_expr::{ @@ -225,7 +225,7 @@ impl SqlToRel<'_, S> { pub(super) fn sql_function_to_expr( &self, function: SQLFunction, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { let function_args = FunctionArgs::try_new(function)?; @@ -695,7 +695,7 @@ impl SqlToRel<'_, S> { &self, expr: SQLExpr, fn_name: &str, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { let fun = self @@ -734,7 +734,7 @@ impl SqlToRel<'_, S> { fn sql_fn_arg_to_logical_expr( &self, sql: FunctionArg, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { let (expr, _) = @@ -745,7 +745,7 @@ impl SqlToRel<'_, S> { fn sql_fn_arg_to_logical_expr_with_name( &self, sql: FunctionArg, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result<(Expr, Option)> { match sql { @@ -840,7 +840,7 @@ impl SqlToRel<'_, S> { pub(super) fn function_args_to_expr( &self, args: Vec, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result> { args.into_iter() @@ -851,7 +851,7 @@ impl SqlToRel<'_, S> { pub(super) fn function_args_to_expr_with_names( &self, args: Vec, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result<(Vec, Vec>)> { let results: Result)>> = args @@ -872,7 +872,7 @@ impl SqlToRel<'_, S> { within_group: Vec, mut args: Vec, mut arg_names: Vec>, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { let within_group = self.order_by_to_sort_expr( diff --git a/datafusion/sql/src/expr/grouping_set.rs b/datafusion/sql/src/expr/grouping_set.rs index bedbf2a7d3470..a4a68017cd27c 100644 --- a/datafusion/sql/src/expr/grouping_set.rs +++ b/datafusion/sql/src/expr/grouping_set.rs @@ -16,8 +16,8 @@ // under the License. use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; -use datafusion_common::plan_err; -use datafusion_common::{DFSchema, Result}; +use datafusion_common::Result; +use datafusion_common::{DFSchemaRef, plan_err}; use datafusion_expr::{Expr, GroupingSet}; use sqlparser::ast::Expr as SQLExpr; @@ -25,7 +25,7 @@ impl SqlToRel<'_, S> { pub(super) fn sql_grouping_sets_to_expr( &self, exprs: Vec>, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { let args: Result>> = exprs @@ -42,7 +42,7 @@ impl SqlToRel<'_, S> { pub(super) fn sql_rollup_to_expr( &self, exprs: Vec>, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { let args: Result> = exprs @@ -63,7 +63,7 @@ impl SqlToRel<'_, S> { pub(super) fn sql_cube_to_expr( &self, exprs: Vec>, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { let args: Result> = exprs diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 34fbe2edf8dd9..303e73e6d5136 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -18,7 +18,7 @@ use arrow::datatypes::FieldRef; use datafusion_common::datatype::DataTypeExt; use datafusion_common::{ - Column, DFSchema, Result, Span, TableReference, assert_or_internal_err, + Column, DFSchema, DFSchemaRef, Result, Span, TableReference, assert_or_internal_err, exec_datafusion_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, }; use datafusion_expr::planner::PlannerResult; @@ -33,7 +33,7 @@ impl SqlToRel<'_, S> { pub(super) fn sql_identifier_to_expr( &self, id: Ident, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { let id_span = id.span; @@ -101,7 +101,7 @@ impl SqlToRel<'_, S> { pub(crate) fn sql_compound_identifier_to_expr( &self, ids: Vec, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { assert_or_internal_err!(ids.len() >= 2, "Not a compound identifier: {ids:?}"); @@ -226,7 +226,7 @@ impl SqlToRel<'_, S> { operand: Option>, conditions: Vec, else_result: Option>, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { let expr = if let Some(e) = operand { diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index dbf2ce67732ec..e3df599e3cc4a 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -27,8 +27,8 @@ use sqlparser::ast::{ }; use datafusion_common::{ - DFSchema, Result, ScalarValue, internal_datafusion_err, internal_err, not_impl_err, - plan_err, + DFSchema, DFSchemaRef, Result, ScalarValue, internal_datafusion_err, internal_err, + not_impl_err, plan_err, }; use datafusion_expr::expr::ScalarFunction; @@ -56,7 +56,7 @@ impl SqlToRel<'_, S> { pub(crate) fn sql_expr_to_logical_expr_with_alias( &self, sql: SQLExprWithAlias, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { let mut expr = @@ -69,7 +69,7 @@ impl SqlToRel<'_, S> { pub(crate) fn sql_expr_to_logical_expr( &self, sql: SQLExpr, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { enum StackEntry { @@ -150,7 +150,7 @@ impl SqlToRel<'_, S> { pub fn sql_to_expr_with_alias( &self, sql: SQLExprWithAlias, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { let mut expr = @@ -165,7 +165,7 @@ impl SqlToRel<'_, S> { pub fn sql_to_expr( &self, sql: SQLExpr, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { // The location of the original SQL expression in the source code @@ -177,7 +177,7 @@ impl SqlToRel<'_, S> { } /// Rewrite aliases which are not-complete (e.g. ones that only include only table qualifier in a schema.table qualified relation) - fn rewrite_partial_qualifier(&self, expr: Expr, schema: &DFSchema) -> Expr { + fn rewrite_partial_qualifier(&self, expr: Expr, schema: &DFSchemaRef) -> Expr { match expr { Expr::Column(col) => match &col.relation { Some(q) => { @@ -204,7 +204,7 @@ impl SqlToRel<'_, S> { fn sql_expr_to_logical_expr_internal( &self, sql: SQLExpr, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { // NOTE: This function is called recursively, so each match arm body should be as @@ -652,7 +652,7 @@ impl SqlToRel<'_, S> { /// Parses a struct(..) expression and plans it creation fn parse_struct( &self, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, values: Vec, fields: &[StructField], @@ -681,7 +681,7 @@ impl SqlToRel<'_, S> { fn parse_tuple( &self, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, values: Vec, ) -> Result { @@ -702,7 +702,7 @@ impl SqlToRel<'_, S> { &self, substr_expr: SQLExpr, str_expr: SQLExpr, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { let substr = @@ -724,7 +724,7 @@ impl SqlToRel<'_, S> { fn try_plan_dictionary_literal( &self, fields: Vec, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { let mut keys = vec![]; @@ -753,7 +753,7 @@ impl SqlToRel<'_, S> { fn try_plan_map_literal( &self, entries: Vec, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { let mut exprs: Vec<_> = entries @@ -777,7 +777,7 @@ impl SqlToRel<'_, S> { fn create_named_struct_expr( &self, values: Vec, - input_schema: &DFSchema, + input_schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result> { Ok(values @@ -818,7 +818,7 @@ impl SqlToRel<'_, S> { fn create_struct_expr( &self, values: Vec, - input_schema: &DFSchema, + input_schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result> { values @@ -834,7 +834,7 @@ impl SqlToRel<'_, S> { expr: SQLExpr, list: Vec, negated: bool, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { let list_expr = list @@ -856,7 +856,7 @@ impl SqlToRel<'_, S> { expr: SQLExpr, pattern: SQLExpr, escape_char: Option, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, case_insensitive: bool, any: bool, @@ -891,7 +891,7 @@ impl SqlToRel<'_, S> { expr: SQLExpr, pattern: SQLExpr, escape_char: Option, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?; @@ -925,7 +925,7 @@ impl SqlToRel<'_, S> { trim_where: Option, trim_what: Option>, trim_characters: Option>, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { let arg = self.sql_expr_to_logical_expr(expr, schema, planner_context)?; @@ -975,7 +975,7 @@ impl SqlToRel<'_, S> { overlay_what: SQLExpr, overlay_from: SQLExpr, overlay_for: Option>, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { let arg = self.sql_expr_to_logical_expr(expr, schema, planner_context)?; @@ -1005,7 +1005,7 @@ impl SqlToRel<'_, S> { expr: SQLExpr, data_type: &SQLDataType, format: Option, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { if let Some(format) = format { @@ -1058,7 +1058,7 @@ impl SqlToRel<'_, S> { &self, root: SQLExpr, mut access_chain: Vec, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result<(Expr, Vec)> { let SQLExpr::Identifier(root_ident) = root else { @@ -1099,7 +1099,7 @@ impl SqlToRel<'_, S> { &self, root: SQLExpr, access_chain: Vec, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { let (root, access_chain) = self.extract_root_and_access_chain( @@ -1314,7 +1314,7 @@ mod tests { paste::item! { #[test] fn []() { - let schema = DFSchema::empty(); + let schema = Arc::new(DFSchema::empty()); let mut planner_context = PlannerContext::default(); let expr_str = (0..$num_expr) @@ -1352,7 +1352,7 @@ mod tests { test_stack_overflow!(8192); #[test] fn test_sql_to_expr_with_alias() { - let schema = DFSchema::empty(); + let schema = Arc::new(DFSchema::empty()); let mut planner_context = PlannerContext::default(); let expr_str = "SUM(int_col) as sum_int_col"; diff --git a/datafusion/sql/src/expr/order_by.rs b/datafusion/sql/src/expr/order_by.rs index faecfbcfecc05..155f2d8be5782 100644 --- a/datafusion/sql/src/expr/order_by.rs +++ b/datafusion/sql/src/expr/order_by.rs @@ -17,13 +17,14 @@ use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use datafusion_common::{ - Column, DFSchema, Result, not_impl_err, plan_datafusion_err, plan_err, + Column, DFSchema, DFSchemaRef, Result, not_impl_err, plan_datafusion_err, plan_err, }; use datafusion_expr::expr::Sort; use datafusion_expr::{Expr, SortExpr}; use sqlparser::ast::{ Expr as SQLExpr, OrderByExpr, OrderByOptions, Value, ValueWithSpan, }; +use std::sync::Arc; impl SqlToRel<'_, S> { /// Convert sql [OrderByExpr] to `Vec`. @@ -42,7 +43,7 @@ impl SqlToRel<'_, S> { pub(crate) fn order_by_to_sort_expr( &self, order_by_exprs: Vec, - input_schema: &DFSchema, + input_schema: &DFSchemaRef, planner_context: &mut PlannerContext, literal_to_column: bool, additional_schema: Option<&DFSchema>, @@ -54,8 +55,8 @@ impl SqlToRel<'_, S> { let mut combined_schema; let order_by_schema = match additional_schema { Some(schema) => { - combined_schema = input_schema.clone(); - combined_schema.merge(schema); + combined_schema = Arc::clone(input_schema); + Arc::make_mut(&mut combined_schema).merge(schema); &combined_schema } None => input_schema, diff --git a/datafusion/sql/src/expr/subquery.rs b/datafusion/sql/src/expr/subquery.rs index 6837b2671cb1c..ec9cd5d0dda94 100644 --- a/datafusion/sql/src/expr/subquery.rs +++ b/datafusion/sql/src/expr/subquery.rs @@ -16,7 +16,7 @@ // under the License. use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; -use datafusion_common::{DFSchema, Diagnostic, Result, Span, Spans, plan_err}; +use datafusion_common::{DFSchemaRef, Diagnostic, Result, Span, Spans, plan_err}; use datafusion_expr::expr::{Exists, InSubquery, SetComparison, SetQuantifier}; use datafusion_expr::{Expr, LogicalPlan, Subquery}; use sqlparser::ast::Expr as SQLExpr; @@ -28,11 +28,11 @@ impl SqlToRel<'_, S> { &self, subquery: Query, negated: bool, - input_schema: &DFSchema, + input_schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { let old_outer_query_schema = - planner_context.set_outer_query_schema(Some(input_schema.clone().into())); + planner_context.set_outer_query_schema(Some(Arc::clone(input_schema))); let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); planner_context.set_outer_query_schema(old_outer_query_schema); @@ -51,11 +51,11 @@ impl SqlToRel<'_, S> { expr: SQLExpr, subquery: Query, negated: bool, - input_schema: &DFSchema, + input_schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { let old_outer_query_schema = - planner_context.set_outer_query_schema(Some(input_schema.clone().into())); + planner_context.set_outer_query_schema(Some(Arc::clone(input_schema))); let mut spans = Spans::new(); if let SetExpr::Select(select) = &subquery.body.as_ref() { @@ -95,11 +95,11 @@ impl SqlToRel<'_, S> { pub(super) fn parse_scalar_subquery( &self, subquery: Query, - input_schema: &DFSchema, + input_schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { let old_outer_query_schema = - planner_context.set_outer_query_schema(Some(input_schema.clone().into())); + planner_context.set_outer_query_schema(Some(Arc::clone(input_schema))); let mut spans = Spans::new(); if let SetExpr::Select(select) = subquery.body.as_ref() { for item in &select.projection { @@ -169,11 +169,11 @@ impl SqlToRel<'_, S> { subquery: Query, compare_op: &BinaryOperator, quantifier: SetQuantifier, - input_schema: &DFSchema, + input_schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { let old_outer_query_schema = - planner_context.set_outer_query_schema(Some(input_schema.clone().into())); + planner_context.set_outer_query_schema(Some(Arc::clone(input_schema))); let mut spans = Spans::new(); if let SetExpr::Select(select) = subquery.body.as_ref() { diff --git a/datafusion/sql/src/expr/substring.rs b/datafusion/sql/src/expr/substring.rs index d3b56097c1f58..46773ccc2fa68 100644 --- a/datafusion/sql/src/expr/substring.rs +++ b/datafusion/sql/src/expr/substring.rs @@ -16,7 +16,7 @@ // under the License. use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; -use datafusion_common::{DFSchema, Result, ScalarValue}; +use datafusion_common::{DFSchemaRef, Result, ScalarValue}; use datafusion_common::{not_impl_err, plan_err}; use datafusion_expr::{Expr, planner::PlannerResult}; @@ -28,7 +28,7 @@ impl SqlToRel<'_, S> { expr: Box, substring_from: Option>, substring_for: Option>, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { let mut substring_args = match (substring_from, substring_for) { diff --git a/datafusion/sql/src/expr/unary_op.rs b/datafusion/sql/src/expr/unary_op.rs index cd118c0fdd5c5..6d4407cc09860 100644 --- a/datafusion/sql/src/expr/unary_op.rs +++ b/datafusion/sql/src/expr/unary_op.rs @@ -16,7 +16,7 @@ // under the License. use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; -use datafusion_common::{DFSchema, Diagnostic, Result, not_impl_err, plan_err}; +use datafusion_common::{DFSchemaRef, Diagnostic, Result, not_impl_err, plan_err}; use datafusion_expr::{ Expr, ExprSchemable, type_coercion::{is_interval, is_timestamp}, @@ -28,7 +28,7 @@ impl SqlToRel<'_, S> { &self, op: UnaryOperator, expr: SQLExpr, - schema: &DFSchema, + schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result { match op { diff --git a/datafusion/sql/src/expr/value.rs b/datafusion/sql/src/expr/value.rs index bd75ac36306fb..b52110bb237f1 100644 --- a/datafusion/sql/src/expr/value.rs +++ b/datafusion/sql/src/expr/value.rs @@ -25,7 +25,7 @@ use arrow::datatypes::{ use bigdecimal::num_bigint::BigInt; use bigdecimal::{BigDecimal, Signed, ToPrimitive}; use datafusion_common::{ - DFSchema, DataFusionError, Result, ScalarValue, internal_datafusion_err, + DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, internal_datafusion_err, not_impl_err, plan_err, }; use datafusion_expr::expr::{BinaryExpr, Placeholder}; @@ -152,7 +152,7 @@ impl SqlToRel<'_, S> { pub(super) fn sql_array_literal( &self, elements: Vec, - schema: &DFSchema, + schema: &DFSchemaRef, ) -> Result { let values = elements .into_iter() diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 520a2d55ef6a2..e53c51a434e19 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -298,8 +298,8 @@ impl PlannerContext { } // Return a reference to the outer query's schema - pub fn outer_query_schema(&self) -> Option<&DFSchema> { - self.outer_query_schema.as_ref().map(|s| s.as_ref()) + pub fn outer_query_schema(&self) -> Option<&DFSchemaRef> { + self.outer_query_schema.as_ref() } /// Sets the outer query schema, returning the existing one, if @@ -454,7 +454,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ) -> Result> { let mut column_defaults = vec![]; // Default expressions are restricted, column references are not allowed - let empty_schema = DFSchema::empty(); + let empty_schema = Arc::new(DFSchema::empty()); let error_desc = |e: DataFusionError| match e { DataFusionError::SchemaError(ref err, _) if matches!(**err, SchemaError::FieldNotFound { .. }) => diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 1b7bb856a592b..57ebbc2999ed4 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -253,7 +253,7 @@ impl SqlToRel<'_, S> { return Ok(input); }; - let empty_schema = DFSchema::empty(); + let empty_schema = Arc::new(DFSchema::empty()); let (skip, fetch, limit_by_exprs) = match limit_clause { LimitClause::LimitOffset { diff --git a/datafusion/sql/src/relation/join.rs b/datafusion/sql/src/relation/join.rs index 8e1a8817309f0..1bd7d937a0369 100644 --- a/datafusion/sql/src/relation/join.rs +++ b/datafusion/sql/src/relation/join.rs @@ -22,6 +22,7 @@ use sqlparser::ast::{ Join, JoinConstraint, JoinOperator, ObjectName, TableFactor, TableWithJoins, }; use std::collections::HashSet; +use std::sync::Arc; impl SqlToRel<'_, S> { pub(crate) fn plan_table_with_joins( @@ -120,7 +121,7 @@ impl SqlToRel<'_, S> { ) -> Result { match constraint { JoinConstraint::On(sql_expr) => { - let join_schema = left.schema().join(right.schema())?; + let join_schema = Arc::new(left.schema().join(right.schema())?); // parse ON expression let expr = self.sql_to_expr(sql_expr, &join_schema, planner_context)?; LogicalPlanBuilder::from(left) diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index cef3726c62e40..80ecd52e9895b 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -21,7 +21,8 @@ use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{ - DFSchema, Diagnostic, Result, Span, Spans, TableReference, not_impl_err, plan_err, + DFSchema, DFSchemaRef, Diagnostic, Result, Span, Spans, TableReference, not_impl_err, + plan_err, }; use datafusion_expr::builder::subquery_alias; use datafusion_expr::planner::{ @@ -53,7 +54,7 @@ impl<'a, 'b, S: ContextProvider> RelationPlannerContext fn sql_to_expr( &mut self, expr: sqlparser::ast::Expr, - schema: &DFSchema, + schema: &DFSchemaRef, ) -> Result { self.planner.sql_to_expr(expr, schema, self.planner_context) } @@ -61,7 +62,7 @@ impl<'a, 'b, S: ContextProvider> RelationPlannerContext fn sql_expr_to_logical_expr( &mut self, expr: sqlparser::ast::Expr, - schema: &DFSchema, + schema: &DFSchemaRef, ) -> Result { self.planner .sql_expr_to_logical_expr(expr, schema, self.planner_context) @@ -148,6 +149,7 @@ impl SqlToRel<'_, S> { if let Some(func_args) = args { let tbl_func_name = name.0.first().unwrap().as_ident().unwrap().to_string(); + let empty_schema = Arc::new(DFSchema::empty()); let args = func_args .args .into_iter() @@ -156,7 +158,7 @@ impl SqlToRel<'_, S> { { self.sql_expr_to_logical_expr( expr, - &DFSchema::empty(), + &empty_schema, planner_context, ) } else { @@ -230,7 +232,7 @@ impl SqlToRel<'_, S> { } // Unnest table factor has empty input - let schema = DFSchema::empty(); + let schema = Arc::new(DFSchema::empty()); let input = LogicalPlanBuilder::empty(true).build()?; // Unnest table factor can have multiple arguments. // We treat each argument as a separate unnest expression. @@ -242,7 +244,7 @@ impl SqlToRel<'_, S> { &schema, planner_context, )?; - Self::check_unnest_arg(&expr, &schema)?; + Self::check_unnest_arg(&expr, schema.as_ref())?; Ok(Expr::Unnest(Unnest::new(expr))) }) .collect::>>()?; @@ -263,8 +265,8 @@ impl SqlToRel<'_, S> { let tbl_func_ref = self.object_name_to_table_reference(name)?; let schema = planner_context .outer_query_schema() - .cloned() - .unwrap_or_else(DFSchema::empty); + .map(Arc::clone) + .unwrap_or_else(|| Arc::new(DFSchema::empty())); let func_args = args .into_iter() .map(|arg| match arg { diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 1d6ccde6be13a..630a6cf26a56c 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -119,14 +119,14 @@ impl SqlToRel<'_, S> { // Place the fields of the base plan at the front so that when there are references // with the same name, the fields of the base plan will be searched first. // See https://github.com/apache/datafusion/issues/9162 - let mut combined_schema = base_plan.schema().as_ref().clone(); - combined_schema.merge(projected_plan.schema()); + let mut combined_schema = Arc::clone(base_plan.schema()); + Arc::make_mut(&mut combined_schema).merge(projected_plan.schema()); // Order-by expressions prioritize referencing columns from the select list, // then from the FROM clause. let order_by_rex = self.order_by_to_sort_expr( order_by, - projected_plan.schema().as_ref(), + projected_plan.schema(), planner_context, true, Some(base_plan.schema().as_ref()), @@ -640,7 +640,7 @@ impl SqlToRel<'_, S> { let outer_query_schema = planner_context.outer_query_schema().cloned(); let outer_query_schema_vec = outer_query_schema .as_ref() - .map(|schema| vec![schema]) + .map(|schema| vec![schema.as_ref()]) .unwrap_or_else(Vec::new); let filter_expr = diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 14ec64f874c31..5e75e51551b7d 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -847,7 +847,7 @@ impl SqlToRel<'_, S> { "Execute statement with DEFAULT is not supported" ); } - let empty_schema = DFSchema::empty(); + let empty_schema = Arc::new(DFSchema::empty()); let parameters = parameters .into_iter() .map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context)) @@ -1232,7 +1232,7 @@ impl SqlToRel<'_, S> { None => None, }; let mut planner_context = PlannerContext::new(); - let empty_schema = &DFSchema::empty(); + let empty_schema = Arc::new(DFSchema::empty()); let args = match args { Some(function_args) => { @@ -1245,7 +1245,7 @@ impl SqlToRel<'_, S> { let default_expr = match arg.default_expr { Some(expr) => Some(self.sql_to_expr( expr, - empty_schema, + &empty_schema, &mut planner_context, )?), None => None, @@ -1315,6 +1315,7 @@ impl SqlToRel<'_, S> { let mut planner_context = PlannerContext::new() .with_prepare_param_data_types(arg_types.unwrap_or_default()); + let function_body_empty_schema = Arc::new(DFSchema::empty()); let function_body = match function_body { Some(r) => Some(self.sql_to_expr( match r { @@ -1334,7 +1335,7 @@ impl SqlToRel<'_, S> { )? } }, - &DFSchema::empty(), + &function_body_empty_schema, &mut planner_context, )?), None => None, @@ -2073,10 +2074,10 @@ impl SqlToRel<'_, S> { // Do a table lookup to verify the table exists let table_ref = self.object_name_to_table_reference(table_name.clone())?; let table_source = self.context_provider.get_table_source(table_ref.clone())?; - let schema = DFSchema::try_from_qualified_schema( + let schema = Arc::new(DFSchema::try_from_qualified_schema( table_ref.clone(), &table_source.schema(), - )?; + )?); let scan = LogicalPlanBuilder::scan(table_ref.clone(), Arc::clone(&table_source), None)? .build()?; @@ -2087,7 +2088,6 @@ impl SqlToRel<'_, S> { Some(predicate_expr) => { let filter_expr = self.sql_to_expr(predicate_expr, &schema, &mut planner_context)?; - let schema = Arc::new(schema); let mut using_columns = HashSet::new(); expr_to_columns(&filter_expr, &mut using_columns)?; let filter_expr = normalize_col_with_schemas_and_ambiguity_check( @@ -2100,7 +2100,7 @@ impl SqlToRel<'_, S> { }; if let Some(limit) = limit { - let empty_schema = DFSchema::empty(); + let empty_schema = Arc::new(DFSchema::empty()); let limit = self.sql_to_expr(limit, &empty_schema, &mut planner_context)?; source = LogicalPlanBuilder::from(source) .limit_by_expr(None, Some(limit))? diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 4717b843abb53..2cf70c9b3489a 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -98,7 +98,7 @@ fn roundtrip_expr(table: TableReference, sql: &str) -> Result { let state = MockSessionState::default().with_aggregate_function(sum_udaf()); let context = MockContextProvider { state }; let schema = context.get_table_source(table)?.schema(); - let df_schema = DFSchema::try_from(schema)?; + let df_schema = Arc::new(DFSchema::try_from(schema)?); let sql_to_rel = SqlToRel::new(&context); let expr = sql_to_rel.sql_to_expr(sql_expr, &df_schema, &mut PlannerContext::new())?; @@ -1330,7 +1330,7 @@ fn test_pretty_roundtrip() -> Result<()> { Field::new("age", DataType::Utf8, false), ]); - let df_schema = DFSchema::try_from(schema)?; + let df_schema = Arc::new(DFSchema::try_from(schema)?); let context = MockContextProvider { state: MockSessionState::default(),