diff --git a/Cargo.lock b/Cargo.lock index ed90dd25bda7b..5fcdb6c4344a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1819,6 +1819,7 @@ dependencies = [ "datafusion-datasource-parquet", "datafusion-execution", "datafusion-expr", + "datafusion-expr-common", "datafusion-physical-expr", "datafusion-physical-expr-adapter", "datafusion-physical-expr-common", @@ -2632,7 +2633,6 @@ dependencies = [ "chrono", "clap", "datafusion", - "datafusion-datasource", "datafusion-spark", "datafusion-substrait", "env_logger", diff --git a/datafusion/catalog-listing/Cargo.toml b/datafusion/catalog-listing/Cargo.toml index 61b55397137df..417bc0d0ac710 100644 --- a/datafusion/catalog-listing/Cargo.toml +++ b/datafusion/catalog-listing/Cargo.toml @@ -38,6 +38,7 @@ datafusion-common = { workspace = true, features = ["object_store"] } datafusion-datasource = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } +datafusion-expr-common = { workspace = true } datafusion-physical-expr = { workspace = true } datafusion-physical-expr-adapter = { workspace = true } datafusion-physical-expr-common = { workspace = true } diff --git a/datafusion/catalog-listing/src/helpers.rs b/datafusion/catalog-listing/src/helpers.rs index 4f83ec4b3730f..796fca372b5bb 100644 --- a/datafusion/catalog-listing/src/helpers.rs +++ b/datafusion/catalog-listing/src/helpers.rs @@ -325,7 +325,7 @@ pub fn evaluate_partition_prefix<'a>( } } -fn filter_partitions( +pub fn filter_partitioned_file( pf: PartitionedFile, filters: &[Expr], df_schema: &DFSchema, @@ -447,7 +447,7 @@ pub async fn pruned_partition_list<'a>( )) }) .try_filter_map(move |pf| { - futures::future::ready(filter_partitions(pf, filters, &df_schema)) + futures::future::ready(filter_partitioned_file(pf, filters, &df_schema)) }) .boxed()) } diff --git a/datafusion/catalog-listing/src/options.rs b/datafusion/catalog-listing/src/options.rs index 0ab15e05abba1..6e4826485805d 100644 --- a/datafusion/catalog-listing/src/options.rs +++ b/datafusion/catalog-listing/src/options.rs @@ -21,7 +21,7 @@ use datafusion_common::plan_err; use datafusion_datasource::ListingTableUrl; use datafusion_datasource::file_format::FileFormat; use datafusion_execution::config::SessionConfig; -use datafusion_expr::SortExpr; +use datafusion_expr::{Partitioning, SortExpr}; use futures::StreamExt; use futures::TryStreamExt; use itertools::Itertools; @@ -42,8 +42,10 @@ pub struct ListingOptions { /// This can add a lot of overhead as it will usually require files /// to be opened and at least partially parsed. pub collect_stat: bool, - /// Group files to avoid that the number of partitions exceeds - /// this limit + /// Target number of file groups when no output partitioning is declared. + /// + /// If [`Self::output_partitioning`] is set, its partition count is + /// authoritative and this value is ignored for file grouping. pub target_partitions: usize, /// Optional pre-known sort order(s). Must be `SortExpr`s. /// @@ -61,6 +63,24 @@ pub struct ListingOptions { /// multiple equivalent orderings, the outer `Vec` will have a /// single element. pub file_sort_order: Vec>, + /// Optional declared output partitioning for this table. + /// + /// This source declaration supports hash and range partitioning. + /// Expressions are logical expressions against the full table schema. This + /// declaration is authoritative over [`Self::target_partitions`]: when set, + /// [`ListingTable`](crate::ListingTable) creates one scan file group per + /// declared output partition instead of using [`Self::target_partitions`]. + /// Empty file groups are added when needed to preserve that count. + /// + /// For range partitioning, split point values are validated against the + /// ordering expression types when planning the scan. Values that can be + /// represented exactly are accepted, but incompatible or lossy split point + /// values are rejected. + /// + /// Files are sorted by path before grouping. DataFusion does not validate + /// that rows match the declaration, so callers must ensure file group `i` + /// contains only rows for declared output partition `i`. + pub output_partitioning: Option, } impl ListingOptions { @@ -78,6 +98,7 @@ impl ListingOptions { collect_stat: false, target_partitions: 1, file_sort_order: vec![], + output_partitioning: None, } } @@ -136,6 +157,27 @@ impl ListingOptions { self } + /// Set declared output partitioning on [`ListingOptions`] and returns self. + /// + /// See [`Self::output_partitioning`]. When the partition count is known, + /// this also aligns [`Self::target_partitions`] to the declared count. + /// Empty file groups are added when needed to preserve the declared + /// partition count. Range split point values must be exactly representable + /// as the ordering expression types when planning the scan. + pub fn with_output_partitioning( + mut self, + output_partitioning: Option, + ) -> Self { + if let Some(partition_count) = output_partitioning + .as_ref() + .and_then(Partitioning::partition_count) + { + self.target_partitions = partition_count; + } + self.output_partitioning = output_partitioning; + self + } + /// Set `table partition columns` on [`ListingOptions`] and returns self. /// /// "partition columns," used to support [Hive Partitioning], are @@ -224,6 +266,11 @@ impl ListingOptions { /// Set number of target partitions on [`ListingOptions`] and returns self. /// + /// This controls file grouping when no explicit output partitioning is set. + /// If [`Self::output_partitioning`] is set, its partition count remains + /// authoritative and this value does not change the effective file group + /// count. + /// /// ``` /// # use std::sync::Arc; /// # use datafusion_catalog_listing::ListingOptions; @@ -235,7 +282,11 @@ impl ListingOptions { /// assert_eq!(listing_options.target_partitions, 8); /// ``` pub fn with_target_partitions(mut self, target_partitions: usize) -> Self { - self.target_partitions = target_partitions; + self.target_partitions = self + .output_partitioning + .as_ref() + .and_then(Partitioning::partition_count) + .unwrap_or(target_partitions); self } diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index dd3675bd2b39d..d2583323239d0 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -16,14 +16,17 @@ // under the License. use crate::config::SchemaSource; -use crate::helpers::{expr_applicable_for_cols, pruned_partition_list}; +use crate::helpers::{ + expr_applicable_for_cols, filter_partitioned_file, pruned_partition_list, +}; use crate::{ListingOptions, ListingTableConfig}; -use arrow::datatypes::{Field, Schema, SchemaBuilder, SchemaRef}; +use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef}; use async_trait::async_trait; use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider}; use datafusion_common::stats::Precision; use datafusion_common::{ - Constraints, SchemaExt, Statistics, internal_datafusion_err, plan_err, project_schema, + Constraints, DFSchema, ScalarValue, SchemaExt, SplitPoint, Statistics, + internal_datafusion_err, plan_err, project_schema, }; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_groups::FileGroup; @@ -38,8 +41,14 @@ use datafusion_execution::cache::TableScopedPath; use datafusion_execution::cache::cache_manager::FileStatisticsCache; use datafusion_expr::dml::InsertOp; use datafusion_expr::execution_props::ExecutionProps; -use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; -use datafusion_physical_expr::create_lex_ordering; +use datafusion_expr::{ + Expr, Partitioning as LogicalPartitioning, TableProviderFilterPushDown, TableType, +}; +use datafusion_expr_common::casts::try_cast_literal_to_type; +use datafusion_physical_expr::{ + Partitioning as PhysicalPartitioning, RangePartitioning as PhysicalRangePartitioning, + create_lex_ordering, create_physical_expr, create_physical_sort_expr, +}; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::ExecutionPlan; @@ -434,6 +443,125 @@ fn derive_common_ordering_from_files(file_groups: &[FileGroup]) -> Option datafusion_common::Result { + let df_schema = DFSchema::try_from(Arc::clone(schema))?; + match partitioning { + LogicalPartitioning::RoundRobinBatch(_) => { + datafusion_common::not_impl_err!( + "RoundRobinBatch output partitioning is not supported for ListingTable" + ) + } + LogicalPartitioning::Hash(exprs, partition_count) => { + let exprs = exprs + .iter() + .map(|expr| create_physical_expr(expr, &df_schema, execution_props)) + .collect::>>()?; + Ok(PhysicalPartitioning::Hash(exprs, *partition_count)) + } + LogicalPartitioning::Range(range) => { + let ordering = range + .ordering() + .iter() + .map(|expr| create_physical_sort_expr(expr, &df_schema, execution_props)) + .collect::>>()?; + let Some(ordering) = LexOrdering::new(ordering) else { + return plan_err!( + "Range output partitioning must have at least one ordering expression" + ); + }; + let split_points = + normalize_range_split_points(&ordering, range.split_points(), schema)?; + let range = PhysicalRangePartitioning::try_new(ordering, split_points)?; + Ok(PhysicalPartitioning::Range(range)) + } + LogicalPartitioning::DistributeBy(_) => { + datafusion_common::not_impl_err!( + "DistributeBy output partitioning is not supported for ListingTable" + ) + } + } +} + +fn normalize_range_split_points( + ordering: &LexOrdering, + split_points: &[SplitPoint], + schema: &SchemaRef, +) -> datafusion_common::Result> { + split_points + .iter() + .enumerate() + .map(|(split_idx, split_point)| { + let values = split_point + .values() + .iter() + .zip(ordering.iter()) + .enumerate() + .map(|(value_idx, (value, sort_expr))| { + let target_type = sort_expr.expr.data_type(schema.as_ref())?; + normalize_range_split_point_value( + value, + &target_type, + split_idx, + value_idx, + ) + }) + .collect::>>()?; + Ok(SplitPoint::new(values)) + }) + .collect() +} + +fn normalize_range_split_point_value( + value: &ScalarValue, + target_type: &DataType, + split_idx: usize, + value_idx: usize, +) -> datafusion_common::Result { + let value_type = value.data_type(); + if &value_type == target_type { + return Ok(value.clone()); + } + + if let Some(value) = try_cast_range_split_point_value_exact(value, target_type) { + return Ok(value); + } + + plan_err!( + "Range output partitioning split point {split_idx} value {value_idx} with type {value_type} cannot be represented exactly as ordering expression type {target_type}" + ) +} + +fn try_cast_range_split_point_value_exact( + value: &ScalarValue, + target_type: &DataType, +) -> Option { + let casted = try_cast_literal_to_type(value, target_type)?; + + // Range partitioning split points are authoritative physical metadata, so + // the normalized value must not advertise a different boundary. + let round_trip = try_cast_literal_to_type(&casted, &value.data_type())?; + + (round_trip == *value).then_some(casted) +} + +fn filter_file_group_by_partition_filters( + file_group: FileGroup, + filters: &[Expr], + df_schema: &DFSchema, +) -> datafusion_common::Result { + let files = file_group + .into_inner() + .into_iter() + .map(|file| filter_partitioned_file(file, filters, df_schema)) + .filter_map(Result::transpose) + .collect::>>()?; + Ok(FileGroup::new(files)) +} + // Expressions can be used for partition pruning if they can be evaluated using // only the partition columns and there are partition columns. fn can_be_evaluated_for_partition_pruning( @@ -503,7 +631,13 @@ impl TableProvider for ListingTable { // We should not limit the number of partitioned files to scan if there are filters and limit // at the same time. This is because the limit should be applied after the filters are applied. + let declared_output_partitioning = self.options.output_partitioning.clone(); + let statistic_file_limit = if filters.is_empty() { limit } else { None }; + let target_partitions = declared_output_partitioning + .as_ref() + .and_then(LogicalPartitioning::partition_count) + .unwrap_or(self.options.target_partitions); let ListFilesResult { file_groups: mut partitioned_file_lists, @@ -523,17 +657,19 @@ impl TableProvider for ListingTable { state.execution_props(), &partitioned_file_lists, )?; - match state - .config_options() - .execution - .split_file_groups_by_statistics + let split_file_groups_by_statistics = declared_output_partitioning.is_none() + && state + .config_options() + .execution + .split_file_groups_by_statistics; + match split_file_groups_by_statistics .then(|| { output_ordering.first().map(|output_ordering| { FileScanConfig::split_groups_by_statistics_with_target_partitions( &self.table_schema, &partitioned_file_lists, output_ordering, - self.options.target_partitions, + target_partitions, ) }) }) @@ -541,7 +677,7 @@ impl TableProvider for ListingTable { { Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"), Some(Ok(new_groups)) => { - if new_groups.len() <= self.options.target_partitions { + if new_groups.len() <= target_partitions { partitioned_file_lists = new_groups; } else { log::debug!( @@ -552,6 +688,26 @@ impl TableProvider for ListingTable { None => {} // no ordering required }; + let output_partitioning = if let Some(output_partitioning) = + &declared_output_partitioning + { + let output_partitioning = create_physical_output_partitioning( + output_partitioning, + &self.table_schema, + state.execution_props(), + )?; + let partition_count = output_partitioning.partition_count(); + if partitioned_file_lists.len() != partition_count { + return plan_err!( + "ListingTable output_partitioning has {partition_count} partitions, but the scan has {} file groups", + partitioned_file_lists.len() + ); + } + Some(output_partitioning) + } else { + None + }; + let Some(object_store_url) = self.table_paths.first().map(ListingTableUrl::object_store) else { @@ -561,24 +717,23 @@ impl TableProvider for ListingTable { }; let file_source = self.create_file_source(); + let scan_config = FileScanConfigBuilder::new(object_store_url, file_source) + .with_file_groups(partitioned_file_lists) + .with_constraints(self.constraints.clone()) + .with_statistics(statistics) + .with_projection_indices(projection)? + .with_limit(limit) + .with_output_ordering(output_ordering) + .with_output_partitioning(output_partitioning) + .with_expr_adapter(self.expr_adapter_factory.clone()) + .with_partitioned_by_file_group(partitioned_by_file_group) + .build(); // create the execution plan let plan = self .options .format - .create_physical_plan( - state, - FileScanConfigBuilder::new(object_store_url, file_source) - .with_file_groups(partitioned_file_lists) - .with_constraints(self.constraints.clone()) - .with_statistics(statistics) - .with_projection_indices(projection)? - .with_limit(limit) - .with_output_ordering(output_ordering) - .with_expr_adapter(self.expr_adapter_factory.clone()) - .with_partitioned_by_file_group(partitioned_by_file_group) - .build(), - ) + .create_physical_plan(state, scan_config) .await?; Ok(ScanResult::new(plan)) @@ -690,12 +845,29 @@ impl ListingTable { /// Get the list of files for a scan as well as the file level statistics. /// The list is grouped to let the execution plan know how the files should /// be distributed to different threads / executors. + /// + /// If [`ListingOptions::output_partitioning`] is set, the returned file + /// groups preserve that declared partition count, including empty trailing + /// groups when needed, rather than using + /// [`ListingOptions::target_partitions`]. pub async fn list_files_for_scan<'a>( &'a self, ctx: &'a dyn Session, filters: &'a [Expr], limit: Option, ) -> datafusion_common::Result { + let declared_output_partitioning = self.options.output_partitioning.as_ref(); + let target_partitions = declared_output_partitioning + .and_then(LogicalPartitioning::partition_count) + .unwrap_or(self.options.target_partitions); + let has_declared_partitioning = declared_output_partitioning.is_some(); + + if target_partitions == 0 { + return plan_err!( + "ListingTable requires target_partitions to be greater than zero" + ); + } + let store = if let Some(url) = self.table_paths.first() { ctx.runtime_env().object_store(url)? } else { @@ -705,13 +877,23 @@ impl ListingTable { grouped_by_partition: false, }); }; + // Listing-time pruning changes the file set before `split_files` + // assigns files to output partitions. For declared output partitioning, + // list the full file set, split into declared groups, then prune within + // each group below. + let listing_time_filters = if has_declared_partitioning { + &[] + } else { + filters + }; + // list files (with partitions) let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| { pruned_partition_list( ctx, store.as_ref(), table_path, - filters, + listing_time_filters, &self.options.file_extension, &self.options.table_partition_cols, ) @@ -737,8 +919,13 @@ impl ListingTable { .boxed() .buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency); + let file_limit = if has_declared_partitioning && !filters.is_empty() { + None + } else { + limit + }; let (file_group, inexact_stats) = - get_files_with_limit(files, limit, self.options.collect_stat).await?; + get_files_with_limit(files, file_limit, self.options.collect_stat).await?; // Threshold: 0 = disabled, N > 0 = enabled when distinct_keys >= N // @@ -747,27 +934,46 @@ impl ListingTable { // hash repartitioning for aggregates and joins on partition columns. let threshold = ctx.config_options().optimizer.preserve_file_partitions; - let (file_groups, grouped_by_partition) = if threshold > 0 - && !self.options.table_partition_cols.is_empty() - { - let grouped = - file_group.group_by_partition_values(self.options.target_partitions); + let (mut file_groups, grouped_by_partition) = if has_declared_partitioning { + (file_group.split_files(target_partitions), false) + } else if threshold > 0 && !self.options.table_partition_cols.is_empty() { + let grouped = file_group.group_by_partition_values(target_partitions); if grouped.len() >= threshold { (grouped, true) } else { let all_files: Vec<_> = grouped.into_iter().flat_map(|g| g.into_inner()).collect(); ( - FileGroup::new(all_files).split_files(self.options.target_partitions), + FileGroup::new(all_files).split_files(target_partitions), false, ) } } else { - ( - file_group.split_files(self.options.target_partitions), - false, - ) + (file_group.split_files(target_partitions), false) }; + if has_declared_partitioning && !file_groups.is_empty() { + file_groups.resize_with(target_partitions, || FileGroup::new(vec![])); + } + + if has_declared_partitioning && !filters.is_empty() { + let df_schema = DFSchema::from_unqualified_fields( + self.options + .table_partition_cols + .iter() + .map(|(name, data_type)| Field::new(name, data_type.clone(), true)) + .collect(), + Default::default(), + )?; + + file_groups = file_groups + .into_iter() + .map(|file_group| { + filter_file_group_by_partition_filters( + file_group, filters, &df_schema, + ) + }) + .collect::>>()?; + } let (file_groups, stats) = compute_all_files_statistics( file_groups, @@ -917,6 +1123,8 @@ async fn get_files_with_limit( mod tests { use super::*; use arrow::compute::SortOptions; + use arrow::datatypes::TimeUnit; + use datafusion_common::assert_contains; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; @@ -946,6 +1154,35 @@ mod tests { PartitionedFile::new(name.to_string(), 1024).with_ordering(ordering) } + #[test] + fn test_normalize_range_split_point_rejects_lossy_timestamp_cast() { + let result = normalize_range_split_point_value( + &ScalarValue::TimestampNanosecond(Some(123456), None), + &DataType::Timestamp(TimeUnit::Second, None), + 0, + 0, + ); + + assert!(result.is_err()); + assert_contains!( + result.unwrap_err().strip_backtrace(), + "cannot be represented exactly" + ); + } + + #[test] + fn test_normalize_range_split_point_accepts_exact_timestamp_cast() { + let result = normalize_range_split_point_value( + &ScalarValue::TimestampNanosecond(Some(123_000_000_000), None), + &DataType::Timestamp(TimeUnit::Second, None), + 0, + 0, + ) + .unwrap(); + + assert_eq!(result, ScalarValue::TimestampSecond(Some(123), None)); + } + #[test] fn test_derive_common_ordering_all_files_same_ordering() { // All files have the same ordering -> returns that ordering diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index d14ec1f56dce2..8d3642a5a575a 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -139,12 +139,17 @@ mod tests { use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_format::FileFormat; use datafusion_expr::dml::InsertOp; - use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator}; + use datafusion_expr::{ + BinaryExpr, LogicalPlanBuilder, Operator, Partitioning as LogicalPartitioning, + RangePartitioning as LogicalRangePartitioning, + }; use datafusion_physical_expr::PhysicalSortExpr; - use datafusion_physical_expr::expressions::binary; + use datafusion_physical_expr::expressions::{Column, binary}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::empty::EmptyExec; - use datafusion_physical_plan::{ExecutionPlanProperties, collect}; + use datafusion_physical_plan::{ + ExecutionPlanProperties, Partitioning, RangePartitioning, SplitPoint, collect, + }; use std::collections::HashMap; use std::io::Write; use std::sync::Arc; @@ -1286,6 +1291,222 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_list_files_uses_declared_output_partitioning_count() -> Result<()> { + let files = ["bucket/key-prefix/file0", "bucket/key-prefix/file1"]; + + let ctx = SessionContext::new(); + register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::>()); + + let opt = ListingOptions::new(Arc::new(JsonFormat::default())) + .with_file_extension_opt(Some("")) + .with_target_partitions(1) + .with_output_partitioning(Some(LogicalPartitioning::Range( + LogicalRangePartitioning::try_new( + vec![col("a").sort(true, true)], + vec![ + SplitPoint::new(vec![ScalarValue::Int32(Some(10))]), + SplitPoint::new(vec![ScalarValue::Int32(Some(20))]), + SplitPoint::new(vec![ScalarValue::Int32(Some(30))]), + ], + )?, + ))); + + let table_path = ListingTableUrl::parse("test:///bucket/key-prefix/")?; + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let config = ListingTableConfig::new(table_path) + .with_listing_options(opt) + .with_schema(schema); + let table = ListingTable::try_new(config)?; + + let result = table.list_files_for_scan(&ctx.state(), &[], None).await?; + let group_sizes = result + .file_groups + .iter() + .map(|group| group.len()) + .collect::>(); + + assert_eq!(group_sizes, vec![1, 1, 0, 0]); + + Ok(()) + } + + #[test] + fn test_listing_options_output_partitioning_overrides_target_partitions() -> Result<()> + { + let output_partitioning = + LogicalPartitioning::Range(LogicalRangePartitioning::try_new( + vec![col("a").sort(true, true)], + vec![ + SplitPoint::new(vec![ScalarValue::Int32(Some(10))]), + SplitPoint::new(vec![ScalarValue::Int32(Some(20))]), + SplitPoint::new(vec![ScalarValue::Int32(Some(30))]), + ], + )?); + + let output_partitioning_first = + ListingOptions::new(Arc::new(JsonFormat::default())) + .with_output_partitioning(Some(output_partitioning.clone())) + .with_target_partitions(1); + assert_eq!(output_partitioning_first.target_partitions, 4); + + let target_partitions_first = + ListingOptions::new(Arc::new(JsonFormat::default())) + .with_target_partitions(1) + .with_output_partitioning(Some(output_partitioning)); + assert_eq!(target_partitions_first.target_partitions, 4); + + Ok(()) + } + + #[tokio::test] + async fn test_range_output_partitioning_normalizes_split_point_types() -> Result<()> { + let files = ["bucket/key-prefix/file0"]; + + let ctx = SessionContext::new(); + register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::>()); + + let output_partitioning = + LogicalPartitioning::Range(LogicalRangePartitioning::try_new( + vec![col("a").sort(true, true)], + vec![SplitPoint::new(vec![ScalarValue::Int32(Some(10))])], + )?); + let expected_output_partitioning = + Partitioning::Range(RangePartitioning::try_new( + LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new("a", 0)), + SortOptions::default(), + )]) + .unwrap(), + vec![SplitPoint::new(vec![ScalarValue::Int64(Some(10))])], + )?); + + let opt = ListingOptions::new(Arc::new(JsonFormat::default())) + .with_file_extension_opt(Some("")) + .with_output_partitioning(Some(output_partitioning)); + + let table_path = ListingTableUrl::parse("test:///bucket/key-prefix/")?; + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let config = ListingTableConfig::new(table_path) + .with_listing_options(opt) + .with_schema(schema); + let table = ListingTable::try_new(config)?; + + let scan = table.scan(&ctx.state(), None, &[], None).await?; + assert_eq!(scan.output_partitioning(), &expected_output_partitioning); + + Ok(()) + } + + #[tokio::test] + async fn test_range_output_partitioning_rejects_invalid_split_point_type() + -> Result<()> { + let files = ["bucket/key-prefix/file0"]; + + let ctx = SessionContext::new(); + register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::>()); + + let output_partitioning = + LogicalPartitioning::Range(LogicalRangePartitioning::try_new( + vec![col("a").sort(true, true)], + vec![SplitPoint::new(vec![ScalarValue::Utf8(Some( + "not-an-int".to_string(), + ))])], + )?); + + let opt = ListingOptions::new(Arc::new(JsonFormat::default())) + .with_file_extension_opt(Some("")) + .with_output_partitioning(Some(output_partitioning)); + + let table_path = ListingTableUrl::parse("test:///bucket/key-prefix/")?; + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let config = ListingTableConfig::new(table_path) + .with_listing_options(opt) + .with_schema(schema); + let table = ListingTable::try_new(config)?; + + let err = table.scan(&ctx.state(), None, &[], None).await.unwrap_err(); + assert_contains!( + err.to_string(), + "Range output partitioning split point 0 value 0 with type Utf8 cannot be represented exactly as ordering expression type Int32" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_partition_filter_preserves_declared_output_partitioning() -> Result<()> + { + let files = ["bucket/test/pid=1/file1", "bucket/test/pid=2/file2"]; + + let ctx = SessionContext::new(); + register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::>()); + + let output_partitioning = + LogicalPartitioning::Range(LogicalRangePartitioning::try_new( + vec![col("pid").sort(true, true)], + vec![SplitPoint::new(vec![ScalarValue::Int32(Some(2))])], + )?); + let expected_output_partitioning = + Partitioning::Range(RangePartitioning::try_new( + LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new("pid", 1)), + SortOptions::default(), + )]) + .unwrap(), + vec![SplitPoint::new(vec![ScalarValue::Int32(Some(2))])], + )?); + + let opt = ListingOptions::new(Arc::new(JsonFormat::default())) + .with_file_extension_opt(Some("")) + .with_table_partition_cols(vec![("pid".to_string(), DataType::Int32)]) + .with_output_partitioning(Some(output_partitioning.clone())); + + let table_path = ListingTableUrl::parse("test:///bucket/test/")?; + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)])); + let config = ListingTableConfig::new(table_path) + .with_listing_options(opt) + .with_schema(schema); + let table = ListingTable::try_new(config)?; + + let unfiltered = table.scan(&ctx.state(), None, &[], None).await?; + assert_eq!( + unfiltered.output_partitioning(), + &expected_output_partitioning + ); + + let filter = Expr::eq(col("pid"), lit(2_i32)); + let file_groups = table + .list_files_for_scan(&ctx.state(), std::slice::from_ref(&filter), None) + .await? + .file_groups + .into_iter() + .map(|group| { + group + .into_inner() + .into_iter() + .map(|file| file.path().to_string()) + .collect::>() + }) + .collect::>(); + assert_eq!( + file_groups, + vec![ + Vec::::new(), + vec!["bucket/test/pid=2/file2".to_string()] + ] + ); + + let filtered = table.scan(&ctx.state(), None, &[filter], None).await?; + assert_eq!( + filtered.output_partitioning(), + &expected_output_partitioning + ); + + Ok(()) + } + #[tokio::test] async fn test_listing_table_prunes_extra_files_in_hive() -> Result<()> { let files = [ diff --git a/datafusion/datasource/src/file_scan_config/mod.rs b/datafusion/datasource/src/file_scan_config/mod.rs index 3ebd588a0770f..65771b44bc5d1 100644 --- a/datafusion/datasource/src/file_scan_config/mod.rs +++ b/datafusion/datasource/src/file_scan_config/mod.rs @@ -40,7 +40,7 @@ use datafusion_expr::Operator; use crate::source::OpenArgs; use datafusion_physical_expr::expressions::{BinaryExpr, Column}; -use datafusion_physical_expr::projection::ProjectionExprs; +use datafusion_physical_expr::projection::{ProjectionExprs, ProjectionMapping}; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, split_conjunction}; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; @@ -206,6 +206,13 @@ pub struct FileScanConfig { /// If the number of file partitions > target_partitions, the file partitions will be grouped /// in a round-robin fashion such that number of file partitions = target_partitions. pub partitioned_by_file_group: bool, + /// Optional declared output partitioning of this file scan. + /// + /// Expressions are in terms of the full table schema, before scan + /// projection or filtering. `ListingTable` validates the partition count + /// before building the scan; direct mismatches fall back to + /// [`Partitioning::UnknownPartitioning`]. + pub output_partitioning: Option, } /// A builder for [`FileScanConfig`]'s. @@ -274,6 +281,7 @@ pub struct FileScanConfigBuilder { file_groups: Vec, statistics: Option, output_ordering: Vec, + output_partitioning: Option, file_compression_type: Option, batch_size: Option, expr_adapter_factory: Option>, @@ -297,6 +305,7 @@ impl FileScanConfigBuilder { file_groups: vec![], statistics: None, output_ordering: vec![], + output_partitioning: None, file_compression_type: None, limit: None, preserve_order: false, @@ -463,6 +472,17 @@ impl FileScanConfigBuilder { self } + /// Set declared output partitioning for this scan. + /// + /// See [`FileScanConfig::output_partitioning`]. + pub fn with_output_partitioning( + mut self, + output_partitioning: Option, + ) -> Self { + self.output_partitioning = output_partitioning; + self + } + /// Set the file compression type pub fn with_file_compression_type( mut self, @@ -521,6 +541,7 @@ impl FileScanConfigBuilder { file_groups, statistics, output_ordering, + output_partitioning, file_compression_type, batch_size, expr_adapter_factory: expr_adapter, @@ -550,6 +571,7 @@ impl FileScanConfigBuilder { expr_adapter_factory: expr_adapter, statistics, partitioned_by_file_group, + output_partitioning, } } } @@ -562,6 +584,7 @@ impl From for FileScanConfigBuilder { file_groups: config.file_groups, statistics: Some(config.statistics), output_ordering: config.output_ordering, + output_partitioning: config.output_partitioning, file_compression_type: Some(config.file_compression_type), limit: config.limit, preserve_order: config.preserve_order, @@ -573,6 +596,52 @@ impl From for FileScanConfigBuilder { } } +fn hash_partitioning_from_partition_fields( + schema: &Schema, + partition_cols: &Fields, + partition_count: usize, +) -> Option { + if partition_cols.is_empty() { + return None; + } + + let mut exprs: Vec> = Vec::with_capacity(partition_cols.len()); + for partition_col in partition_cols { + let name = partition_col.name(); + let idx = schema + .fields() + .iter() + .position(|field| field.name() == name)?; + exprs.push(Arc::new(Column::new(name, idx))); + } + + Some(Partitioning::Hash(exprs, partition_count)) +} + +fn project_output_partitioning( + partitioning: &Partitioning, + mapping: &ProjectionMapping, + input_schema: &SchemaRef, + partition_count: usize, +) -> Partitioning { + let input_eq_properties = EquivalenceProperties::new(Arc::clone(input_schema)); + match partitioning { + Partitioning::Hash(exprs, _) => { + let projected_exprs = input_eq_properties + .project_expressions(exprs, mapping) + .collect::>>(); + projected_exprs + .map(|exprs| Partitioning::Hash(exprs, partition_count)) + .unwrap_or_else(|| Partitioning::UnknownPartitioning(partition_count)) + } + Partitioning::Range(_) + | Partitioning::RoundRobinBatch(_) + | Partitioning::UnknownPartitioning(_) => { + partitioning.project(mapping, &input_eq_properties) + } + } +} + impl DataSource for FileScanConfig { fn open( &self, @@ -660,6 +729,10 @@ impl DataSource for FileScanConfig { display_orderings(f, &orderings)?; + if self.output_partitioning.is_some() { + write!(f, ", output_partitioning={}", self.output_partitioning())?; + } + if !self.constraints.is_empty() { write!(f, ", {}", self.constraints)?; } @@ -683,10 +756,9 @@ impl DataSource for FileScanConfig { repartition_file_min_size: usize, output_ordering: Option, ) -> Result>> { - // When files are grouped by partition values, we cannot allow byte-range - // splitting. It would mix rows from different partition values across - // file groups, breaking the Hash partitioning. - if self.partitioned_by_file_group { + // When file groups define output partitioning, repartitioning files + // would invalidate the partition-to-file-group mapping. + if self.output_partitioning.is_some() || self.partitioned_by_file_group { return Ok(None); } @@ -702,54 +774,63 @@ impl DataSource for FileScanConfig { /// Returns the output partitioning for this file scan. /// - /// When `partitioned_by_file_group` is true, this returns `Partitioning::Hash` on - /// the Hive partition columns, allowing the optimizer to skip hash repartitioning - /// for aggregates and joins on those columns. + /// When output partitioning is declared, this returns it after remapping + /// through the scan projection. If the declared partition count does not + /// match the number of file groups, this logs a warning and returns + /// [`Partitioning::UnknownPartitioning`] to avoid advertising an invalid + /// partitioning property. Otherwise, when `partitioned_by_file_group` is + /// true, this returns `Partitioning::Hash` on the Hive partition columns, + /// allowing the optimizer to skip repartitioning for compatible aggregates + /// and joins. /// /// Tradeoffs - /// - Benefit: Eliminates `RepartitionExec` and `SortExec` for queries with - /// `GROUP BY` or `ORDER BY` on partition columns. - /// - Cost: Files are grouped by partition values rather than split by byte - /// ranges, which may reduce I/O parallelism when partition sizes are uneven. - /// For simple aggregations without `ORDER BY`, this cost may outweigh the benefit. + /// - Benefit: Eliminates `RepartitionExec` for compatible queries. + /// - Cost: File groups must remain intact, so byte-range file splitting + /// and sibling work stealing are disabled. /// /// Follow-up Work - /// - Idea: Could allow byte-range splitting within partition-aware groups, + /// - Idea: Could allow byte-range splitting within each output partition, /// preserving I/O parallelism while maintaining partition semantics. fn output_partitioning(&self) -> Partitioning { - if self.partitioned_by_file_group { - let partition_cols = self.table_partition_cols(); - if !partition_cols.is_empty() { - let projected_schema = match self.projected_schema() { - Ok(schema) => schema, - Err(_) => { - debug!( - "Could not get projected schema, falling back to UnknownPartitioning." - ); - return Partitioning::UnknownPartitioning(self.file_groups.len()); - } - }; - - // Build Column expressions for partition columns based on their - // position in the projected schema - let mut exprs: Vec> = Vec::new(); - for partition_col in partition_cols { - if let Some((idx, _)) = projected_schema - .fields() - .iter() - .enumerate() - .find(|(_, f)| f.name() == partition_col.name()) - { - exprs.push(Arc::new(Column::new(partition_col.name(), idx))); - } - } + let Some(output_partitioning) = self.output_partitioning.clone().or_else(|| { + self.partitioned_by_file_group.then(|| { + hash_partitioning_from_partition_fields( + self.file_source.table_schema().table_schema(), + self.table_partition_cols(), + self.file_groups.len(), + ) + })? + }) else { + return Partitioning::UnknownPartitioning(self.file_groups.len()); + }; + if output_partitioning.partition_count() != self.file_groups.len() { + warn!( + "Declared output partitioning has {} partitions, but file scan has {} file groups. Falling back to UnknownPartitioning.", + output_partitioning.partition_count(), + self.file_groups.len() + ); + return Partitioning::UnknownPartitioning(self.file_groups.len()); + } - if exprs.len() == partition_cols.len() { - return Partitioning::Hash(exprs, self.file_groups.len()); + if let Some(projection) = self.file_source.projection() { + let schema = self.file_source.table_schema().table_schema(); + return match projection.projection_mapping(schema) { + Ok(mapping) => project_output_partitioning( + &output_partitioning, + &mapping, + schema, + self.file_groups.len(), + ), + Err(e) => { + debug!( + "Could not project output partitioning, falling back to UnknownPartitioning: {e}" + ); + Partitioning::UnknownPartitioning(self.file_groups.len()) } - } + }; } - Partitioning::UnknownPartitioning(self.file_groups.len()) + + output_partitioning } /// Computes the effective equivalence properties of this file scan, taking @@ -1043,7 +1124,10 @@ impl DataSource for FileScanConfig { /// when file order must be preserved or the file groups define the output /// partitioning needed for the rest of the plan fn create_sibling_state(&self) -> Option> { - if self.preserve_order || self.partitioned_by_file_group { + if self.preserve_order + || self.output_partitioning.is_some() + || self.partitioned_by_file_group + { return None; } @@ -2457,6 +2541,58 @@ mod tests { assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_))); } + #[test] + fn test_declared_output_partitioning_projects_with_scan() { + let file_schema = aggr_test_schema(); + let output_partitioning = + Partitioning::Hash(vec![Arc::new(Column::new("c2", 1))], 4); + + let mut config = config_for_projection( + Arc::clone(&file_schema), + Some(vec![1, 2]), + Statistics::new_unknown(&file_schema), + vec![], + ); + config.file_groups = vec![ + FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]), + FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]), + FileGroup::new(vec![PartitionedFile::new("f3.parquet".to_string(), 1024)]), + FileGroup::new(vec![PartitionedFile::new("f4.parquet".to_string(), 1024)]), + ]; + config.output_partitioning = Some(output_partitioning); + + match config.output_partitioning() { + Partitioning::Hash(exprs, num_partitions) => { + assert_eq!(num_partitions, 4); + assert_eq!(exprs.len(), 1); + let column = exprs[0].downcast_ref::().unwrap(); + assert_eq!(column.name(), "c2"); + assert_eq!(column.index(), 0); + } + _ => panic!("Expected Hash partitioning"), + } + + let mut config = config_for_projection( + Arc::clone(&file_schema), + Some(vec![2]), + Statistics::new_unknown(&file_schema), + vec![], + ); + config.file_groups = vec![ + FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]), + FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]), + FileGroup::new(vec![PartitionedFile::new("f3.parquet".to_string(), 1024)]), + FileGroup::new(vec![PartitionedFile::new("f4.parquet".to_string(), 1024)]), + ]; + config.output_partitioning = + Some(Partitioning::Hash(vec![Arc::new(Column::new("c2", 1))], 4)); + + assert!(matches!( + config.output_partitioning(), + Partitioning::UnknownPartitioning(4) + )); + } + #[test] fn test_output_partitioning_no_partition_columns() { let file_schema = aggr_test_schema(); diff --git a/datafusion/proto-models/proto/datafusion.proto b/datafusion/proto-models/proto/datafusion.proto index 322395ab3728c..093005028e7e1 100644 --- a/datafusion/proto-models/proto/datafusion.proto +++ b/datafusion/proto-models/proto/datafusion.proto @@ -1131,6 +1131,7 @@ message FileScanExecConf { optional ProjectionExprs projection_exprs = 13; optional bool partitioned_by_file_group = 14; + optional Partitioning output_partitioning = 15; } message ParquetScanExecNode { diff --git a/datafusion/proto-models/src/generated/pbjson.rs b/datafusion/proto-models/src/generated/pbjson.rs index 1eb9de00fb362..65cc443b3af51 100644 --- a/datafusion/proto-models/src/generated/pbjson.rs +++ b/datafusion/proto-models/src/generated/pbjson.rs @@ -6945,6 +6945,9 @@ impl serde::Serialize for FileScanExecConf { if self.partitioned_by_file_group.is_some() { len += 1; } + if self.output_partitioning.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.FileScanExecConf", len)?; if !self.file_groups.is_empty() { struct_ser.serialize_field("fileGroups", &self.file_groups)?; @@ -6984,6 +6987,9 @@ impl serde::Serialize for FileScanExecConf { if let Some(v) = self.partitioned_by_file_group.as_ref() { struct_ser.serialize_field("partitionedByFileGroup", v)?; } + if let Some(v) = self.output_partitioning.as_ref() { + struct_ser.serialize_field("outputPartitioning", v)?; + } struct_ser.end() } } @@ -7013,6 +7019,8 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { "projectionExprs", "partitioned_by_file_group", "partitionedByFileGroup", + "output_partitioning", + "outputPartitioning", ]; #[allow(clippy::enum_variant_names)] @@ -7029,6 +7037,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { BatchSize, ProjectionExprs, PartitionedByFileGroup, + OutputPartitioning, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -7062,6 +7071,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { "batchSize" | "batch_size" => Ok(GeneratedField::BatchSize), "projectionExprs" | "projection_exprs" => Ok(GeneratedField::ProjectionExprs), "partitionedByFileGroup" | "partitioned_by_file_group" => Ok(GeneratedField::PartitionedByFileGroup), + "outputPartitioning" | "output_partitioning" => Ok(GeneratedField::OutputPartitioning), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -7093,6 +7103,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { let mut batch_size__ = None; let mut projection_exprs__ = None; let mut partitioned_by_file_group__ = None; + let mut output_partitioning__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::FileGroups => { @@ -7172,6 +7183,12 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { } partitioned_by_file_group__ = map_.next_value()?; } + GeneratedField::OutputPartitioning => { + if output_partitioning__.is_some() { + return Err(serde::de::Error::duplicate_field("outputPartitioning")); + } + output_partitioning__ = map_.next_value()?; + } } } Ok(FileScanExecConf { @@ -7187,6 +7204,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { batch_size: batch_size__, projection_exprs: projection_exprs__, partitioned_by_file_group: partitioned_by_file_group__, + output_partitioning: output_partitioning__, }) } } diff --git a/datafusion/proto-models/src/generated/prost.rs b/datafusion/proto-models/src/generated/prost.rs index 3ac04a6164db8..7f66114b0e15d 100644 --- a/datafusion/proto-models/src/generated/prost.rs +++ b/datafusion/proto-models/src/generated/prost.rs @@ -1695,6 +1695,8 @@ pub struct FileScanExecConf { pub projection_exprs: ::core::option::Option, #[prost(bool, optional, tag = "14")] pub partitioned_by_file_group: ::core::option::Option, + #[prost(message, optional, tag = "15")] + pub output_partitioning: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct ParquetScanExecNode { diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 36751d8a61a3e..04431a779f8e2 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -610,6 +610,12 @@ pub fn parse_protobuf_file_scan_config( )?; output_ordering.extend(LexOrdering::new(sort_exprs)); } + let output_partitioning = parse_protobuf_partitioning( + proto.output_partitioning.as_ref(), + ctx, + &schema, + proto_converter, + )?; // Parse projection expressions if present and apply to file source let file_source = if let Some(proto_projection_exprs) = &proto.projection_exprs { @@ -638,15 +644,18 @@ pub fn parse_protobuf_file_scan_config( file_source }; - let config = FileScanConfigBuilder::new(object_store_url, file_source) + let mut config_builder = FileScanConfigBuilder::new(object_store_url, file_source) .with_file_groups(file_groups) .with_constraints(constraints) .with_statistics(statistics) .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize)) .with_output_ordering(output_ordering) - .with_batch_size(proto.batch_size.map(|s| s as usize)) - .with_partitioned_by_file_group(proto.partitioned_by_file_group.unwrap_or(false)) - .build(); + .with_output_partitioning(output_partitioning) + .with_batch_size(proto.batch_size.map(|s| s as usize)); + if proto.partitioned_by_file_group.unwrap_or(false) { + config_builder = config_builder.with_partitioned_by_file_group(true); + } + let config = config_builder.build(); Ok(config) } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index d9315af431e22..8297d67583c86 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -532,6 +532,11 @@ pub fn serialize_file_scan_config( serialize_physical_sort_exprs(order.to_vec(), codec, proto_converter)?; output_orderings.push(ordering) } + let output_partitioning = conf + .output_partitioning + .as_ref() + .map(|partitioning| serialize_partitioning(partitioning, codec, proto_converter)) + .transpose()?; // Fields must be added to the schema so that they can persist in the protobuf, // and then they are to be removed from the schema in `parse_protobuf_file_scan_config` @@ -592,6 +597,7 @@ pub fn serialize_file_scan_config( batch_size: conf.batch_size.map(|s| s as u64), projection_exprs, partitioned_by_file_group: Some(conf.partitioned_by_file_group), + output_partitioning, }) } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 8e80467788598..270dd485adf3d 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -4081,22 +4081,27 @@ fn test_custom_node_with_dynamic_filter_dedup_roundtrip() -> Result<()> { } #[test] -fn roundtrip_parquet_exec_partitioned_by_file_group() -> Result<()> { +fn roundtrip_parquet_exec_output_partitioning() -> Result<()> { use datafusion::datasource::physical_plan::FileScanConfig; let file_schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)])); let file_source = Arc::new(ParquetSource::new(Arc::clone(&file_schema))); + let output_partitioning = + Partitioning::Hash(vec![Arc::new(Column::new("col", 0))], 1); let scan_config = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new( "/path/to/file.parquet".to_string(), 1024, )])]) - .with_partitioned_by_file_group(true) + .with_output_partitioning(Some(output_partitioning.clone())) .build(); - assert!(scan_config.partitioned_by_file_group); + assert_eq!( + scan_config.output_partitioning, + Some(output_partitioning.clone()) + ); let exec_plan: Arc = DataSourceExec::from_data_source(scan_config); @@ -4123,7 +4128,72 @@ fn roundtrip_parquet_exec_partitioned_by_file_group() -> Result<()> { .downcast_ref::() .expect("Expected FileScanConfig"); - assert!(file_scan_config.partitioned_by_file_group); + assert_eq!( + file_scan_config.output_partitioning, + Some(output_partitioning) + ); + + Ok(()) +} + +#[test] +fn roundtrip_parquet_exec_range_output_partitioning() -> Result<()> { + use datafusion::datasource::physical_plan::FileScanConfig; + + let file_schema = + Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, false)])); + let file_source = Arc::new(ParquetSource::new(Arc::clone(&file_schema))); + let output_partitioning = Partitioning::Range(RangePartitioning::new( + LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new(Column::new( + "col", 0, + )))]) + .unwrap(), + vec![SplitPoint::new(vec![ScalarValue::Int32(Some(10))])], + )); + let scan_config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(vec![ + FileGroup::new(vec![PartitionedFile::new( + "/path/to/file-1.parquet".to_string(), + 1024, + )]), + FileGroup::new(vec![PartitionedFile::new( + "/path/to/file-2.parquet".to_string(), + 1024, + )]), + ]) + .with_output_partitioning(Some(output_partitioning.clone())) + .build(); + + let exec_plan: Arc = DataSourceExec::from_data_source(scan_config); + + let ctx = SessionContext::new(); + let codec = DefaultPhysicalExtensionCodec {}; + let proto_converter = DefaultPhysicalProtoConverter {}; + let bytes = physical_plan_to_bytes_with_proto_converter( + Arc::clone(&exec_plan), + &codec, + &proto_converter, + )?; + let result_plan = physical_plan_from_bytes_with_proto_converter( + bytes.as_ref(), + ctx.task_ctx().as_ref(), + &codec, + &proto_converter, + )?; + + let data_source_exec = result_plan + .downcast_ref::() + .expect("Expected DataSourceExec"); + let file_scan_config = data_source_exec + .data_source() + .downcast_ref::() + .expect("Expected FileScanConfig"); + + assert_eq!( + file_scan_config.output_partitioning, + Some(output_partitioning) + ); Ok(()) } diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index a642fbe22a6e3..e2ffe1415a1fb 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -47,7 +47,6 @@ bytes = { workspace = true, optional = true } chrono = { workspace = true, optional = true } clap = { version = "4.5.60", features = ["derive", "env"] } datafusion = { workspace = true, default-features = true, features = ["avro"] } -datafusion-datasource = { workspace = true } datafusion-spark = { workspace = true, features = ["core"] } datafusion-substrait = { workspace = true, default-features = true, optional = true } futures = { workspace = true } diff --git a/datafusion/sqllogictest/src/test_context/range_partitioning.rs b/datafusion/sqllogictest/src/test_context/range_partitioning.rs index 88e49708baf60..a3e16eefd881a 100644 --- a/datafusion/sqllogictest/src/test_context/range_partitioning.rs +++ b/datafusion/sqllogictest/src/test_context/range_partitioning.rs @@ -15,236 +15,94 @@ // specific language governing permissions and limitations // under the License. -use std::fmt; +use std::fs::{create_dir_all, remove_dir_all, write}; +use std::path::Path; use std::sync::Arc; -use arrow::array::Int32Array; -use arrow::compute::SortOptions; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use arrow::record_batch::RecordBatch; -use async_trait::async_trait; -use datafusion::catalog::Session; -use datafusion::common::{Result, ScalarValue, project_schema}; -use datafusion::datasource::source::{DataSource, DataSourceExec}; -use datafusion::datasource::{TableProvider, TableType}; -use datafusion::execution::context::TaskContext; -use datafusion::logical_expr::Expr; -use datafusion::physical_expr::EquivalenceProperties; -use datafusion::physical_expr::expressions::col as physical_col; -use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr}; -use datafusion::physical_plan::execution_plan::SchedulingType; -use datafusion::physical_plan::projection::ProjectionExprs; -use datafusion::physical_plan::{ - DisplayFormatType, ExecutionPlan, Partitioning, RangePartitioning, - SendableRecordBatchStream, SplitPoint, Statistics, +use arrow::datatypes::{DataType, Field, Schema}; +use datafusion::common::{ScalarValue, SplitPoint}; +use datafusion::datasource::file_format::csv::CsvFormat; +use datafusion::datasource::listing::{ + ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; +use datafusion::logical_expr::{Partitioning, RangePartitioning, col}; use datafusion::prelude::SessionContext; -use datafusion_datasource::memory::MemorySourceConfig; // ============================================================================== // Range Partitioned Table (sqllogictest-only) // ============================================================================== -/// Simple range-partitioned table for testing before declaring such tables is -/// supported via SQL. -#[derive(Debug)] -struct RangePartitionedTable { - schema: SchemaRef, - partitions: Vec>, - range_column_index: usize, - split_points: Vec, -} - -#[async_trait] -impl TableProvider for RangePartitionedTable { - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } - - fn table_type(&self) -> TableType { - TableType::Base - } - - async fn scan( - &self, - state: &dyn Session, - projection: Option<&Vec>, - _filters: &[Expr], - _limit: Option, - ) -> Result> { - let projected_schema = project_schema(&self.schema, projection)?; - let mut source = MemorySourceConfig::try_new( - &self.partitions, - Arc::clone(&self.schema), - projection.cloned(), - )?; - source = source.with_show_sizes(state.config_options().explain.show_sizes); - - let output_partitioning = - self.output_partitioning(projection, &projected_schema)?; - let source = RangePartitionedSource { - inner: source, - output_partitioning, - }; - - Ok(DataSourceExec::from_data_source(source)) - } -} - -impl RangePartitionedTable { - fn output_partitioning( - &self, - projection: Option<&Vec>, - projected_schema: &SchemaRef, - ) -> Result { - let Some(projected_range_index) = - projected_index(self.range_column_index, projection) - else { - return Ok(Partitioning::UnknownPartitioning(self.partitions.len())); - }; - - let range_column = projected_schema.field(projected_range_index).name(); - let ordering = LexOrdering::new(vec![PhysicalSortExpr::new( - physical_col(range_column, projected_schema)?, - SortOptions::default(), - )]) - .expect("range ordering should not be empty"); - - Ok(Partitioning::Range(RangePartitioning::try_new( - ordering, - self.split_points.clone(), - )?)) - } -} - -fn projected_index( - column_index: usize, - projection: Option<&Vec>, -) -> Option { - projection - .map(|projection| projection.iter().position(|idx| *idx == column_index)) - .unwrap_or(Some(column_index)) -} - -#[derive(Clone, Debug)] -struct RangePartitionedSource { - inner: MemorySourceConfig, - output_partitioning: Partitioning, -} - -impl DataSource for RangePartitionedSource { - fn open( - &self, - partition: usize, - context: Arc, - ) -> Result { - self.inner.open(partition, context) - } - - fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { - self.inner.fmt_as(t, f)?; - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, ", output_partitioning={}", self.output_partitioning) - } - DisplayFormatType::TreeRender => Ok(()), - } - } - - fn output_partitioning(&self) -> Partitioning { - self.output_partitioning.clone() - } - - fn eq_properties(&self) -> EquivalenceProperties { - self.inner.eq_properties() - } - - fn scheduling_type(&self) -> SchedulingType { - self.inner.scheduling_type() - } - - fn partition_statistics(&self, partition: Option) -> Result> { - self.inner.partition_statistics(partition) - } - - fn with_fetch(&self, limit: Option) -> Option> { - Some(Arc::new(Self { - inner: self.inner.clone().with_limit(limit), - output_partitioning: self.output_partitioning.clone(), - })) - } - - fn fetch(&self) -> Option { - self.inner.fetch() - } - - fn try_swapping_with_projection( - &self, - _projection: &ProjectionExprs, - ) -> Result>> { - // Range partitioning metadata is projection-sensitive. This fixture - // computes it in TableProvider::scan, so do not rewrite later - // ProjectionExec nodes into the source. - Ok(None) - } -} - +/// Registers a simple range-partitioned listing table for testing before +/// declaring such tables is supported via SQL. pub(super) fn register_range_partitioned_table(ctx: &SessionContext) { let schema = Arc::new(Schema::new(vec![ Field::new("range_key", DataType::Int32, false), Field::new("non_range_key", DataType::Int32, false), Field::new("value", DataType::Int32, false), ])); - let partitions = vec![ - vec![range_partition_batch(&schema, &[1, 5], &[1, 2], &[10, 50])], - vec![range_partition_batch( - &schema, - &[10, 15], - &[1, 2], - &[100, 150], - )], - vec![range_partition_batch( - &schema, - &[20, 25], - &[1, 2], - &[200, 250], - )], - vec![range_partition_batch( - &schema, - &[30, 35], - &[1, 2], - &[300, 350], - )], - ]; - let split_points = vec![ - SplitPoint::new(vec![ScalarValue::Int32(Some(10))]), - SplitPoint::new(vec![ScalarValue::Int32(Some(20))]), - SplitPoint::new(vec![ScalarValue::Int32(Some(30))]), - ]; - let table = RangePartitionedTable { + let output_partitioning = Partitioning::Range( + RangePartitioning::try_new( + vec![col("range_key").sort(true, true)], + vec![ + SplitPoint::new(vec![ScalarValue::Int32(Some(10))]), + SplitPoint::new(vec![ScalarValue::Int32(Some(20))]), + SplitPoint::new(vec![ScalarValue::Int32(Some(30))]), + ], + ) + .expect("range partitioning should be valid"), + ); + + register_csv_listing_table( + ctx, + "range_partitioned", + Path::new(env!("CARGO_MANIFEST_DIR")) + .join("test_files/scratch_range_partitioning/range_partitioned"), schema, - partitions, - range_column_index: 0, - split_points, - }; - - ctx.register_table("range_partitioned", Arc::new(table)) - .expect("range partitioned table registration should succeed"); + [ + "1,1,10\n5,2,50\n", + "10,1,100\n15,2,150\n", + "20,1,200\n25,2,250\n", + "30,1,300\n35,2,350\n", + ], + Some(output_partitioning), + ); } -fn range_partition_batch( - schema: &SchemaRef, - range_key: &[i32], - non_range_key: &[i32], - value: &[i32], -) -> RecordBatch { - RecordBatch::try_new( - Arc::clone(schema), - vec![ - Arc::new(Int32Array::from(range_key.to_vec())), - Arc::new(Int32Array::from(non_range_key.to_vec())), - Arc::new(Int32Array::from(value.to_vec())), - ], - ) - .expect("range partition batch should be valid") +fn register_csv_listing_table( + ctx: &SessionContext, + name: &str, + table_dir: impl AsRef, + schema: Arc, + partitions: impl IntoIterator, + output_partitioning: Option, +) { + let table_dir = table_dir.as_ref(); + if table_dir.exists() { + remove_dir_all(table_dir).expect("test table dir should be removable"); + } + create_dir_all(table_dir).expect("test table dir should be created"); + for (idx, rows) in partitions.into_iter().enumerate() { + write(table_dir.join(format!("part-{idx}.csv")), rows) + .expect("test table csv partition should be written"); + } + + let table_path = format!( + "{}/", + table_dir + .to_str() + .expect("test table path should be valid utf8") + ); + let table_url = + ListingTableUrl::parse(&table_path).expect("test table url should parse"); + let options = + ListingOptions::new(Arc::new(CsvFormat::default().with_has_header(false))) + .with_output_partitioning(output_partitioning); + let config = ListingTableConfig::new(table_url) + .with_listing_options(options) + .with_schema(schema); + let table = + ListingTable::try_new(config).expect("test listing table should be valid"); + + ctx.register_table(name, Arc::new(table)) + .expect("test listing table registration should succeed"); } diff --git a/datafusion/sqllogictest/test_files/range_partitioning.slt b/datafusion/sqllogictest/test_files/range_partitioning.slt index a61f17a039eb8..2b7a2cfdf4083 100644 --- a/datafusion/sqllogictest/test_files/range_partitioning.slt +++ b/datafusion/sqllogictest/test_files/range_partitioning.slt @@ -16,7 +16,7 @@ # under the License. # The sqllogictest harness registers range_partitioned(range_key, non_range_key, value) -# as an in-memory source with four physical source partitions: +# as a CSV ListingTable with four declared range-partitioned file groups: # # partition 0: range_key in [..., 10), rows (1, 1, 10), (5, 2, 50) # partition 1: range_key in [10, 20), rows (10, 1, 100), (15, 2, 150) @@ -40,7 +40,7 @@ physical_plan 01)AggregateExec: mode=FinalPartitioned, gby=[range_key@0 as range_key], aggr=[sum(range_partitioned.value)] 02)--RepartitionExec: partitioning=Hash([range_key@0], 4), input_partitions=4 03)----AggregateExec: mode=Partial, gby=[range_key@0 as range_key], aggr=[sum(range_partitioned.value)] -04)------DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4) +04)------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-0.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-1.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-2.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-3.csv]]}, projection=[range_key, value], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4), file_type=csv, has_header=false query II SELECT range_key, SUM(value) FROM range_partitioned GROUP BY range_key ORDER BY range_key; @@ -69,7 +69,7 @@ physical_plan 01)AggregateExec: mode=FinalPartitioned, gby=[non_range_key@0 as non_range_key], aggr=[sum(range_partitioned.value)] 02)--RepartitionExec: partitioning=Hash([non_range_key@0], 4), input_partitions=4 03)----AggregateExec: mode=Partial, gby=[non_range_key@0 as non_range_key], aggr=[sum(range_partitioned.value)] -04)------DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], output_partitioning=UnknownPartitioning(4) +04)------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-0.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-1.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-2.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-3.csv]]}, projection=[non_range_key, value], output_partitioning=UnknownPartitioning(4), file_type=csv, has_header=false query II SELECT non_range_key, SUM(value) FROM range_partitioned GROUP BY non_range_key ORDER BY non_range_key; @@ -104,8 +104,8 @@ SELECT range_key, value FROM range_partitioned; ---- physical_plan 01)UnionExec -02)--DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4) -03)--DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4) +02)--DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-0.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-1.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-2.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-3.csv]]}, projection=[range_key, value], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4), file_type=csv, has_header=false +03)--DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-0.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-1.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-2.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-3.csv]]}, projection=[range_key, value], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4), file_type=csv, has_header=false query II SELECT range_key, value FROM range_partitioned