From 740db64a9d912ec8f99b408eea9a90f138d114a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 12 Jun 2026 09:01:34 +0200 Subject: [PATCH 1/2] Split Parquet tail work into morsels --- datafusion/common/src/config.rs | 7 + .../common/src/file_options/parquet_writer.rs | 3 + .../datasource-parquet/src/access_plan.rs | 395 ++++++++++++++++ .../src/decoder_projection.rs | 1 + .../datasource-parquet/src/opener/mod.rs | 429 +++++++++++++----- datafusion/datasource-parquet/src/source.rs | 6 +- .../datasource/src/file_stream/builder.rs | 7 +- .../datasource/src/file_stream/scan_state.rs | 28 +- .../datasource/src/file_stream/work_source.rs | 204 ++++++++- datafusion/datasource/src/morsel/mod.rs | 51 +++ .../proto/datafusion_common.proto | 6 + datafusion/proto-common/src/from_proto/mod.rs | 3 + .../proto-common/src/generated/pbjson.rs | 24 + .../proto-common/src/generated/prost.rs | 13 + datafusion/proto-common/src/to_proto/mod.rs | 1 + .../src/generated/datafusion_proto_common.rs | 13 + .../proto/src/logical_plan/file_formats.rs | 10 + .../test_files/information_schema.slt | 2 + docs/source/user-guide/configs.md | 1 + 19 files changed, 1079 insertions(+), 125 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index b10761a5fe816..f3a9cbfbba49f 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -979,6 +979,13 @@ config_namespace! { /// parquet reader setting. 0 means no caching. pub max_predicate_cache_size: Option, default = None + /// (reading) Target size, in compressed bytes, of the morsels a file + /// scan is split into when too little work remains to keep all output + /// streams busy (the tail of a scan). Row groups larger than this are + /// split at page-aligned row ranges and the surplus morsels are shared + /// with idle sibling streams. If NULL, splitting is disabled. + pub morsel_split_size: Option, default = Some(1024 * 1024) + // The following options affect writing to parquet files // and map to parquet::file::properties::WriterProperties diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index a5b270a8f57b6..2f3b9a95a6a7f 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -248,6 +248,7 @@ impl ParquetOptions { coerce_int96_tz: _, // not used for writer props skip_arrow_metadata: _, max_predicate_cache_size: _, + morsel_split_size: _, // reads not used for writer props } = self; let mut builder = WriterProperties::builder() @@ -506,6 +507,7 @@ mod tests { coerce_int96: None, coerce_int96_tz: None, max_predicate_cache_size: defaults.max_predicate_cache_size, + morsel_split_size: defaults.morsel_split_size, content_defined_chunking: defaults.content_defined_chunking.clone(), } } @@ -622,6 +624,7 @@ mod tests { bloom_filter_on_read: global_options_defaults.bloom_filter_on_read, max_predicate_cache_size: global_options_defaults .max_predicate_cache_size, + morsel_split_size: global_options_defaults.morsel_split_size, schema_force_view_types: global_options_defaults.schema_force_view_types, binary_as_string: global_options_defaults.binary_as_string, skip_arrow_metadata: global_options_defaults.skip_arrow_metadata, diff --git a/datafusion/datasource-parquet/src/access_plan.rs b/datafusion/datasource-parquet/src/access_plan.rs index edbea39948f09..c1c9c1b1b93a7 100644 --- a/datafusion/datasource-parquet/src/access_plan.rs +++ b/datafusion/datasource-parquet/src/access_plan.rs @@ -452,6 +452,116 @@ impl ParquetAccessPlan { } } + /// Split this plan into multiple plans, each covering roughly + /// `target_bytes` of compressed data. + /// + /// Row groups larger than the target are split sub-row-group by + /// converting them into [`RowGroupAccess::Selection`]s over disjoint row + /// ranges; smaller row groups are packed together until a chunk reaches + /// the target. Existing selections are kept intact and packed by their + /// pro-rated size. + /// + /// `page_row_boundaries` holds, per row group, the candidate row offsets + /// where a sub-row-group split may be cut (ascending, exclusive of `0` + /// and `num_rows`), typically the page boundaries of the dominant + /// projected column from the offset index. Cutting anywhere else would + /// make the streams on both sides of the cut fetch and decompress the + /// page containing it. Row groups with no candidate boundaries (e.g. no + /// offset index loaded) are never split sub-row-group, since without + /// page locations the reader fetches whole column chunks per stream, + /// duplicating that I/O once per piece. + /// + /// Fully-matched markers are preserved so chunks keep skipping row filter + /// evaluation where the original plan did. + /// + /// This is used to increase parallelism at the tail of a scan, where + /// sibling streams would otherwise go idle (see + /// [`SplitHint`](datafusion_datasource::morsel::SplitHint)): each returned + /// plan becomes its own morsel. Always returns at least one plan, and the + /// chunks concatenated in order cover exactly the rows of `self`. + pub(crate) fn split_by_size( + self, + row_group_meta_data: &[RowGroupMetaData], + target_bytes: u64, + page_row_boundaries: Option<&[Vec]>, + ) -> Vec { + let num_row_groups = self.row_groups.len(); + let fully_matched = self.fully_matched; + + let mut packer = ChunkPacker::new(num_row_groups, target_bytes); + for (idx, access) in self.row_groups.into_iter().enumerate() { + let rg = &row_group_meta_data[idx]; + let rg_bytes = rg.compressed_size().max(0) as u64; + let rg_rows = rg.num_rows().max(0) as usize; + // Estimate the compressed size of a row range by its row fraction. + let range_bytes = |rows: usize| { + if rg_rows == 0 { + 0 + } else { + rg_bytes * rows as u64 / rg_rows as u64 + } + }; + match access { + RowGroupAccess::Skip => {} + RowGroupAccess::Selection(selection) => { + let bytes = range_bytes(selection.row_count()); + packer.add( + idx, + RowGroupAccess::Selection(selection), + bytes, + fully_matched[idx], + ); + } + RowGroupAccess::Scan if rg_bytes <= target_bytes => { + packer.add(idx, RowGroupAccess::Scan, rg_bytes, fully_matched[idx]); + } + RowGroupAccess::Scan => { + // Sub-row-group split: cut the row group into roughly + // target-sized row ranges at page-aligned boundaries. + let boundaries = page_row_boundaries + .and_then(|per_rg| per_rg.get(idx)) + .map(Vec::as_slice) + .unwrap_or(&[]); + let mut start = 0; + for &boundary in boundaries { + if boundary <= start || boundary >= rg_rows { + continue; + } + let bytes = range_bytes(boundary - start); + if bytes < target_bytes { + // keep growing the current piece + continue; + } + packer.add( + idx, + select_row_range(start, boundary - start, rg_rows), + bytes, + fully_matched[idx], + ); + start = boundary; + } + if start == 0 { + // No usable boundary: keep the row group whole. + packer.add( + idx, + RowGroupAccess::Scan, + rg_bytes, + fully_matched[idx], + ); + } else if start < rg_rows { + packer.add( + idx, + select_row_range(start, rg_rows - start, rg_rows), + range_bytes(rg_rows - start), + fully_matched[idx], + ); + } + } + } + } + packer.finish() + } + /// Prepare this plan and resolve to the final `PreparedAccessPlan` pub(crate) fn prepare( self, @@ -464,6 +574,101 @@ impl ParquetAccessPlan { } } +/// A [`RowGroupAccess`] selecting rows `[start, start + len)` of a row group +/// with `num_rows` rows. +fn select_row_range(start: usize, len: usize, num_rows: usize) -> RowGroupAccess { + let mut selectors = Vec::with_capacity(3); + if start > 0 { + selectors.push(RowSelector::skip(start)); + } + selectors.push(RowSelector::select(len)); + if start + len < num_rows { + selectors.push(RowSelector::skip(num_rows - start - len)); + } + RowGroupAccess::Selection(RowSelection::from(selectors)) +} + +/// Greedily packs row group pieces into [`ParquetAccessPlan`] chunks of +/// roughly `target_bytes` each. See [`ParquetAccessPlan::split_by_size`]. +struct ChunkPacker { + num_row_groups: usize, + target_bytes: u64, + chunks: Vec, + current: ParquetAccessPlan, + current_bytes: u64, + current_has_pieces: bool, +} + +impl ChunkPacker { + fn new(num_row_groups: usize, target_bytes: u64) -> Self { + Self { + num_row_groups, + target_bytes, + chunks: Vec::new(), + current: ParquetAccessPlan::new_none(num_row_groups), + current_bytes: 0, + current_has_pieces: false, + } + } + + /// Add one piece (a whole row group or a row range within one) to the + /// current chunk, starting a new chunk first if the piece would push the + /// current chunk past the target size. + fn add( + &mut self, + idx: usize, + access: RowGroupAccess, + bytes: u64, + fully_matched: bool, + ) { + if self.current_has_pieces && self.current_bytes + bytes > self.target_bytes { + self.flush(); + } + match (&mut self.current.row_groups[idx], access) { + // Defensive: consecutive sub-row-group pieces of one row group are + // each larger than half the target, so they normally land in + // different chunks; merge them if they ever share one. + (RowGroupAccess::Selection(existing), RowGroupAccess::Selection(new)) => { + *existing = existing.union(&new); + } + (slot @ RowGroupAccess::Skip, access) => *slot = access, + // Pieces are produced in row group order and `Scan` pieces cover + // the whole row group, so other combinations cannot occur. + (existing, access) => unreachable!( + "conflicting accesses for row group {idx}: {existing:?} and {access:?}" + ), + } + if fully_matched { + self.current.mark_fully_matched(idx); + } + self.current_bytes += bytes; + self.current_has_pieces = true; + } + + fn flush(&mut self) { + if self.current_has_pieces { + let chunk = std::mem::replace( + &mut self.current, + ParquetAccessPlan::new_none(self.num_row_groups), + ); + self.chunks.push(chunk); + self.current_bytes = 0; + self.current_has_pieces = false; + } + } + + /// Flush any partial chunk and return the chunks, always at least one + /// (possibly all-skip) plan. + fn finish(mut self) -> Vec { + self.flush(); + if self.chunks.is_empty() { + self.chunks + .push(ParquetAccessPlan::new_none(self.num_row_groups)); + } + self.chunks + } +} + /// Represents a prepared, fully resolved [`ParquetAccessPlan`] /// /// The [`RowSelection`] represents the result of applying all pruning such as @@ -849,6 +1054,196 @@ mod test { Arc::new(SchemaDescriptor::new(Arc::new(schema))) } + // ---------------------------------------------------------------- + // `split_by_size` tests + // ---------------------------------------------------------------- + + const KB: u64 = 1024; + const MB: u64 = 1024 * 1024; + + /// Build row group metadata with the given `(num_rows, compressed_bytes)` + /// per row group + fn rg_metadata_with_sizes(specs: &[(i64, i64)]) -> Vec { + let schema_descr = get_test_schema_descr(); + specs + .iter() + .map(|(num_rows, compressed_bytes)| { + let column = ColumnChunkMetaData::builder(schema_descr.column(0)) + .set_num_values(*num_rows) + .set_total_compressed_size(*compressed_bytes) + .build() + .unwrap(); + RowGroupMetaData::builder(schema_descr.clone()) + .set_num_rows(*num_rows) + .set_column_metadata(vec![column]) + .build() + .unwrap() + }) + .collect() + } + + /// Sum of selected rows in `plan` for row group `idx` + fn selected_rows(plan: &ParquetAccessPlan, idx: usize, num_rows: usize) -> usize { + match &plan.inner()[idx] { + RowGroupAccess::Skip => 0, + RowGroupAccess::Scan => num_rows, + RowGroupAccess::Selection(selection) => selection.row_count(), + } + } + + #[test] + fn test_split_by_size_packs_small_row_groups() { + // 3 row groups of 400KB: the first two fit in one ~1MB chunk, the + // third starts a new chunk + let metadata = rg_metadata_with_sizes(&[(100i64, 400 * KB as i64); 3]); + let plan = ParquetAccessPlan::new_all(3); + + let chunks = plan.split_by_size(&metadata, MB, None); + + assert_eq!(chunks.len(), 2); + assert_eq!( + chunks[0].inner(), + &[ + RowGroupAccess::Scan, + RowGroupAccess::Scan, + RowGroupAccess::Skip + ] + ); + assert_eq!( + chunks[1].inner(), + &[ + RowGroupAccess::Skip, + RowGroupAccess::Skip, + RowGroupAccess::Scan + ] + ); + } + + #[test] + fn test_split_by_size_splits_large_row_group_at_page_boundaries() { + // one 3.5MB row group with 1000 rows and a page every 100 rows: + // pieces grow page by page and are cut once they reach ~1MB + // (300 rows), leaving the 100-row tail as its own chunk + let metadata = rg_metadata_with_sizes(&[(1000, (3 * MB + MB / 2) as i64)]); + let plan = ParquetAccessPlan::new_all(1); + let boundaries = vec![(100..1000).step_by(100).collect::>()]; + + let chunks = plan.split_by_size(&metadata, MB, Some(&boundaries)); + + assert_eq!(chunks.len(), 4); + let mut next_row = 0; + for (chunk, expected_rows) in chunks.iter().zip([300, 300, 300, 100]) { + let RowGroupAccess::Selection(selection) = &chunk.inner()[0] else { + panic!( + "expected sub-row-group selection, got {:?}", + chunk.inner()[0] + ); + }; + // each chunk covers the next consecutive page-aligned row range + assert_eq!(selection.row_count(), expected_rows); + let mut row = 0; + for selector in selection.iter() { + if !selector.skip { + assert_eq!(row, next_row); + } + row += selector.row_count; + } + assert_eq!(row, 1000, "selection must cover all rows of the group"); + next_row += expected_rows; + } + assert_eq!(next_row, 1000, "chunks must cover the whole row group"); + } + + #[test] + fn test_split_by_size_keeps_row_group_whole_without_page_boundaries() { + // Without page boundaries (no offset index), splitting inside a row + // group would duplicate page fetches, so the row group stays whole. + let metadata = rg_metadata_with_sizes(&[(1000, (3 * MB + MB / 2) as i64)]); + let plan = ParquetAccessPlan::new_all(1); + + let chunks = plan.split_by_size(&metadata, MB, None); + + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].inner(), &[RowGroupAccess::Scan]); + } + + #[test] + fn test_split_by_size_preserves_skips_and_selections() { + let selection: RowSelection = + vec![RowSelector::select(10), RowSelector::skip(90)].into(); + let plan = ParquetAccessPlan::new(vec![ + RowGroupAccess::Scan, + RowGroupAccess::Skip, + RowGroupAccess::Selection(selection.clone()), + ]); + let metadata = rg_metadata_with_sizes(&[ + (100, 2 * KB as i64), + (100, 5 * MB as i64), // skipped: size must not matter + (100, 2 * KB as i64), + ]); + + let chunks = plan.split_by_size(&metadata, MB, None); + + // everything is tiny, so it all packs into a single chunk + assert_eq!(chunks.len(), 1); + assert_eq!( + chunks[0].inner(), + &[ + RowGroupAccess::Scan, + RowGroupAccess::Skip, + RowGroupAccess::Selection(selection), + ] + ); + } + + #[test] + fn test_split_by_size_preserves_fully_matched() { + let metadata = rg_metadata_with_sizes(&[(1000, 3 * MB as i64)]); + let mut plan = ParquetAccessPlan::new_all(1); + plan.mark_fully_matched(0); + let boundaries = vec![(100..1000).step_by(100).collect::>()]; + + let chunks = plan.split_by_size(&metadata, MB, Some(&boundaries)); + + assert!(chunks.len() > 1); + for chunk in &chunks { + assert!(chunk.is_fully_matched(0)); + } + } + + #[test] + fn test_split_by_size_covers_all_rows() { + let metadata = rg_metadata_with_sizes(&[ + (100, 400 * KB as i64), + (1000, (2 * MB + MB / 4) as i64), + (50, 10 * KB as i64), + ]); + let plan = ParquetAccessPlan::new_all(3); + let boundaries = + vec![vec![], (100..1000).step_by(100).collect::>(), vec![]]; + + let chunks = plan.split_by_size(&metadata, MB, Some(&boundaries)); + + assert!(chunks.len() > 1); + for (idx, num_rows) in [(0, 100), (1, 1000), (2, 50)] { + let total: usize = chunks + .iter() + .map(|chunk| selected_rows(chunk, idx, num_rows)) + .sum(); + assert_eq!(total, num_rows, "row group {idx}"); + } + } + + #[test] + fn test_split_by_size_empty_plan() { + let metadata = rg_metadata_with_sizes(&[(100, 400 * KB as i64); 2]); + let plan = ParquetAccessPlan::new_none(2); + + let chunks = plan.split_by_size(&metadata, MB, None); + + assert_eq!(chunks, vec![ParquetAccessPlan::new_none(2)]); + } + // ---------------------------------------------------------------- // `reorder_by_statistics` tests // ---------------------------------------------------------------- diff --git a/datafusion/datasource-parquet/src/decoder_projection.rs b/datafusion/datasource-parquet/src/decoder_projection.rs index 27a84f2f50298..217eff21c8756 100644 --- a/datafusion/datasource-parquet/src/decoder_projection.rs +++ b/datafusion/datasource-parquet/src/decoder_projection.rs @@ -53,6 +53,7 @@ use crate::row_filter::build_projection_read_plan; /// Built once per file by the opener via [`Self::try_new`]; the /// push-decoder stream installs [`Self::projection_mask`] on each decoder /// and calls [`Self::map`] on every decoded batch. +#[derive(Clone)] pub(crate) struct DecoderProjection { projection_mask: ProjectionMask, projector: Projector, diff --git a/datafusion/datasource-parquet/src/opener/mod.rs b/datafusion/datasource-parquet/src/opener/mod.rs index 5b517663f9c03..6cc6ae853a67b 100644 --- a/datafusion/datasource-parquet/src/opener/mod.rs +++ b/datafusion/datasource-parquet/src/opener/mod.rs @@ -35,7 +35,9 @@ use crate::{ }; use arrow::array::RecordBatch; use arrow::datatypes::DataType; -use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; +use datafusion_datasource::morsel::{ + Morsel, MorselPlan, MorselPlanner, Morselizer, SplitHint, +}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr_adapter::replace_columns_with_literals; use std::collections::{HashMap, VecDeque}; @@ -70,13 +72,14 @@ use datafusion_execution::parquet_encryption::EncryptionFactory; use futures::{FutureExt, StreamExt, future::BoxFuture, stream::BoxStream}; use log::debug; use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet::arrow::ProjectionMask; use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::parquet_column; use parquet::basic::Type; use parquet::bloom_filter::Sbbf; -use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; +use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader}; /// Morselizer-level state for virtual columns, precomputed once per scan /// partition so each file skips the validator walks, `null_replacements` @@ -292,6 +295,13 @@ pub(super) struct ParquetMorselizer { /// Per-scan virtual-column state (validation already performed). `None` /// when no virtual columns are requested — the common path. pub(crate) virtual_state: Option>, + /// Hint consulted while building the final stream to decide whether the + /// remaining work should be split into multiple smaller morsels (roughly + /// `morsel_split_size` bytes each) for donation to sibling streams. + pub(crate) split_hint: SplitHint, + /// Target compressed bytes per morsel when splitting; `None` disables + /// splitting. From `datafusion.execution.parquet.morsel_split_size`. + pub(crate) morsel_split_size: Option, } impl fmt::Debug for ParquetMorselizer { @@ -309,6 +319,10 @@ impl Morselizer for ParquetMorselizer { fn plan_file(&self, file: PartitionedFile) -> Result> { Ok(Box::new(ParquetMorselPlanner::try_new(self, file)?)) } + + fn set_split_hint(&mut self, hint: SplitHint) { + self.split_hint = hint; + } } /// States for [`ParquetMorselPlanner`] @@ -380,12 +394,15 @@ enum ParquetOpenState { LoadBloomFilters(BoxFuture<'static, Result>), /// Pruning with preloaded Bloom Filters PruneWithBloomFilters(Box), - /// Builds the final reader stream + /// Builds the final reader stream(s) /// /// TODO: split state as this currently does both I/O and CPU work. BuildStream(Box), - /// Terminal state: the final opened stream is ready to return. - Ready(BoxStream<'static, Result>), + /// Terminal state: the final opened stream(s) are ready to return. + /// + /// Usually a single stream for the whole file; multiple streams when the + /// remaining work was split into smaller morsels for sibling streams. + Ready(Vec>>), /// Terminal state: reading complete Done, } @@ -450,10 +467,34 @@ struct PreparedParquetOpen { reverse_row_groups: bool, sort_order_for_reorder: Option, preserve_order: bool, + split_hint: SplitHint, + morsel_split_size: Option, #[cfg(feature = "parquet_encryption")] file_decryption_properties: Option>, } +impl PreparedParquetOpen { + /// Return the target morsel size, in compressed bytes, when the remaining + /// work of this file should be split into multiple morsels; `None` + /// otherwise. + /// + /// Splitting is only attempted when ordering does not matter (no + /// row-group reordering heuristics in play), there is no limit (a limit + /// is best served by a single stream that stops early), and the scan's + /// [`SplitHint`] reports that parallelism is currently low. + fn morsel_split_size(&self) -> Option { + if self.limit.is_some() + || self.preserve_order + || self.reverse_row_groups + || self.sort_order_for_reorder.is_some() + { + return None; + } + let target = self.morsel_split_size? as u64; + self.split_hint.should_split().then_some(target) + } +} + /// State of [`ParquetOpenState`] /// /// Result of loading parquet metadata after file-level pruning is complete. @@ -560,9 +601,9 @@ impl ParquetOpenState { ParquetOpenState::BuildStream(Box::new(loaded.prune_bloom_filters())), ), ParquetOpenState::BuildStream(prepared) => { - Ok(ParquetOpenState::Ready(prepared.build_stream()?)) + Ok(ParquetOpenState::Ready(prepared.build_streams()?)) } - ParquetOpenState::Ready(stream) => Ok(ParquetOpenState::Ready(stream)), + ParquetOpenState::Ready(streams) => Ok(ParquetOpenState::Ready(streams)), ParquetOpenState::Done => { panic!("ParquetOpenFuture polled after completion"); } @@ -678,9 +719,13 @@ impl MorselPlanner for ParquetMorselPlanner { ))) }))) } - ParquetOpenState::Ready(stream) => { - let morsels: Vec> = - vec![Box::new(ParquetStreamMorsel::new(stream))]; + ParquetOpenState::Ready(streams) => { + let morsels: Vec> = streams + .into_iter() + .map(|stream| { + Box::new(ParquetStreamMorsel::new(stream)) as Box + }) + .collect(); Ok(Some(MorselPlan::new().with_morsels(morsels))) } ParquetOpenState::Done => Ok(None), @@ -823,6 +868,8 @@ impl ParquetMorselizer { reverse_row_groups: self.reverse_row_groups, sort_order_for_reorder: self.sort_order_for_reorder.clone(), preserve_order: self.preserve_order, + split_hint: self.split_hint.clone(), + morsel_split_size: self.morsel_split_size, #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, }) @@ -1053,7 +1100,16 @@ impl FiltersPreparedParquetOpen { // metadata load above may not have read the page index structures yet. // If we need them for reading and they aren't yet loaded, we need to // load them now. - if self.page_pruning_predicate.is_some() { + // + // Besides page pruning, the offset index is also needed to split this + // file into multiple morsels: sub-row-group cuts are aligned to page + // boundaries so the resulting streams do not fetch and decompress + // shared pages. `morsel_split_size()` consults the live `SplitHint`, + // so this extra (small) read is only paid at the tail of a scan when + // a split is actually anticipated. + if self.page_pruning_predicate.is_some() + || self.loaded.prepared.morsel_split_size().is_some() + { self.loaded.reader_metadata = load_page_index( self.loaded.reader_metadata, &mut self.loaded.prepared.async_file_reader, @@ -1250,8 +1306,14 @@ impl BloomFiltersLoadedParquetOpen { } impl RowGroupsPrunedParquetOpen { - /// Build the final parquet stream once all pruning work is complete. - fn build_stream(self) -> Result>> { + /// Build the final parquet stream(s) once all pruning work is complete. + /// + /// Normally this returns a single stream for the whole file. When the + /// scan's [`SplitHint`] reports that parallelism is low (fewer remaining + /// work items than streams), the remaining work is instead split into + /// multiple independent streams of roughly [`MORSEL_SPLIT_TARGET_BYTES`] + /// each, so the surplus morsels can be donated to idle sibling streams. + fn build_streams(self) -> Result>>> { let RowGroupsPrunedParquetOpen { prepared, mut row_groups, @@ -1262,7 +1324,7 @@ impl RowGroupsPrunedParquetOpen { page_pruning_predicate, } = prepared; let MetadataLoadedParquetOpen { - prepared, + mut prepared, reader_metadata, options: _, } = loaded; @@ -1339,8 +1401,6 @@ impl RowGroupsPrunedParquetOpen { Ok(prepared_plan) }; - let arrow_reader_metrics = ArrowReaderMetrics::enabled(); - // Build the decoder projection (mask + per-batch transform) in a // single call. Encapsulating it behind `DecoderProjection` keeps the // opener's orchestration body focused on filter / decoder / stream @@ -1353,101 +1413,148 @@ impl RowGroupsPrunedParquetOpen { prepared.virtual_state.as_deref(), )?; - let (decoder, pending_decoders, remaining_limit) = { - let pushdown_predicate = prepared - .pushdown_filters - .then_some(prepared.predicate.as_ref()) - .flatten(); - let mut row_filter_generator = RowFilterGenerator::new( - pushdown_predicate, - &prepared.physical_file_schema, - file_metadata.as_ref(), - prepared.reorder_predicates, - &prepared.file_metrics, - ); - - // Split into consecutive runs of row groups that share the same filter - // requirement. Fully matched row groups skip the RowFilter; others need it. - // Reverse the run order for reverse scans so the combined decoder stream - // preserves the requested global row group order. - let mut runs = access_plan.split_runs(row_filter_generator.has_row_filter()); - if prepared.reverse_row_groups { - runs.reverse(); + // Decide whether to split the remaining work into multiple morsels to + // increase tail-of-scan parallelism. Splitting is only attempted when + // ordering does not matter (no row-group reordering heuristics in + // play) and there is no limit (a limit is best served by a single + // stream that stops early). Sub-row-group cuts are aligned to page + // boundaries (from the offset index, when loaded) so morsels on both + // sides of a cut do not fetch and decompress the same page. + let access_plans = match prepared.morsel_split_size() { + Some(target_bytes) => { + let page_boundaries = page_aligned_row_boundaries( + file_metadata.as_ref(), + decoder_projection.projection_mask(), + ); + access_plan.split_by_size( + rg_metadata, + target_bytes, + page_boundaries.as_deref(), + ) } - let run_count = runs.len(); - let decoder_limit = prepared.limit.filter(|_| run_count == 1); - let remaining_limit = prepared.limit.filter(|_| run_count > 1); - - let decoder_config = DecoderBuilderConfig { - projection_mask: decoder_projection.projection_mask(), - batch_size: prepared.batch_size, - arrow_reader_metrics: &arrow_reader_metrics, - force_filter_selections: prepared.force_filter_selections, - decoder_limit, + None => vec![access_plan], + }; + + // The dynamic-filter early-stopping wrapper holds per-file pruner + // state, so attach it to the first stream only. + let mut file_pruner = prepared.file_pruner.take(); + // The first stream reuses the file's existing reader; additional + // streams get fresh readers from the factory. + let mut first_reader = Some(prepared.async_file_reader); + + let arrow_reader_metrics = ArrowReaderMetrics::enabled(); + + let pushdown_predicate = prepared + .pushdown_filters + .then_some(prepared.predicate.as_ref()) + .flatten(); + let mut row_filter_generator = RowFilterGenerator::new( + pushdown_predicate, + &prepared.physical_file_schema, + file_metadata.as_ref(), + prepared.reorder_predicates, + &prepared.file_metrics, + ); + + let mut streams = Vec::with_capacity(access_plans.len()); + for access_plan in access_plans { + let reader = match first_reader.take() { + Some(reader) => reader, + None => prepared.parquet_file_reader_factory.create_reader( + prepared.partition_index, + prepared.partitioned_file.clone(), + prepared.metadata_size_hint, + &prepared.metrics, + )?, }; - // Build a decoder per run. - let mut decoders = VecDeque::with_capacity(runs.len()); - for run in runs { - let prepared_access_plan = prepare_access_plan(run.access_plan)?; - let mut builder = - decoder_config.build(prepared_access_plan, reader_metadata.clone()); - if run.needs_filter { - if let Some(row_filter) = row_filter_generator.next_filter() { - builder = builder.with_row_filter(row_filter); - } - if let Some(max_predicate_cache_size) = - prepared.max_predicate_cache_size - { - builder = builder - .with_max_predicate_cache_size(max_predicate_cache_size); + let (decoder, pending_decoders, remaining_limit) = { + // Split into consecutive runs of row groups that share the same filter + // requirement. Fully matched row groups skip the RowFilter; others need it. + // Reverse the run order for reverse scans so the combined decoder stream + // preserves the requested global row group order. + let mut runs = + access_plan.split_runs(row_filter_generator.has_row_filter()); + if prepared.reverse_row_groups { + runs.reverse(); + } + let run_count = runs.len(); + let decoder_limit = prepared.limit.filter(|_| run_count == 1); + let remaining_limit = prepared.limit.filter(|_| run_count > 1); + + let decoder_config = DecoderBuilderConfig { + projection_mask: decoder_projection.projection_mask(), + batch_size: prepared.batch_size, + arrow_reader_metrics: &arrow_reader_metrics, + force_filter_selections: prepared.force_filter_selections, + decoder_limit, + }; + + // Build a decoder per run. + let mut decoders = VecDeque::with_capacity(runs.len()); + for run in runs { + let prepared_access_plan = prepare_access_plan(run.access_plan)?; + let mut builder = decoder_config + .build(prepared_access_plan, reader_metadata.clone()); + if run.needs_filter { + if let Some(row_filter) = row_filter_generator.next_filter() { + builder = builder.with_row_filter(row_filter); + } + if let Some(max_predicate_cache_size) = + prepared.max_predicate_cache_size + { + builder = builder + .with_max_predicate_cache_size(max_predicate_cache_size); + } } + decoders.push_back(builder.build()?); } - decoders.push_back(builder.build()?); - } - let decoder = decoders - .pop_front() - .expect("at least one decoder must be created"); - (decoder, decoders, remaining_limit) - }; + let decoder = decoders + .pop_front() + .expect("at least one decoder must be created"); + (decoder, decoders, remaining_limit) + }; - let predicate_cache_inner_records = - prepared.file_metrics.predicate_cache_inner_records.clone(); - let predicate_cache_records = - prepared.file_metrics.predicate_cache_records.clone(); - - let files_ranges_pruned_statistics = - prepared.file_metrics.files_ranges_pruned_statistics.clone(); - let stream = PushDecoderStreamState { - decoder, - pending_decoders, - remaining_limit, - reader: prepared.async_file_reader, - decoder_projection, - arrow_reader_metrics, - predicate_cache_inner_records, - predicate_cache_records, - baseline_metrics: prepared.baseline_metrics, - } - .into_stream(); - - // Wrap the stream so a dynamic filter can stop the file scan early, but - // only when the pruner is still watching a filter that can change - // mid-scan. For a static (or already-complete) predicate the up-front - // `prune_file` check already captured everything that can be pruned, so - // per-batch re-checking would only add overhead. - match prepared.file_pruner { - Some(file_pruner) if file_pruner.is_watching() => { - Ok(EarlyStoppingStream::new( - stream, - file_pruner, - files_ranges_pruned_statistics, - ) - .boxed()) + let stream = PushDecoderStreamState { + decoder, + pending_decoders, + remaining_limit, + reader, + decoder_projection: decoder_projection.clone(), + arrow_reader_metrics: arrow_reader_metrics.clone(), + predicate_cache_inner_records: prepared + .file_metrics + .predicate_cache_inner_records + .clone(), + predicate_cache_records: prepared + .file_metrics + .predicate_cache_records + .clone(), + baseline_metrics: prepared.baseline_metrics.clone(), } - _ => Ok(stream), + .into_stream(); + + // Wrap the stream so a dynamic filter can stop the file scan early, but + // only when the pruner is still watching a filter that can change + // mid-scan. For a static (or already-complete) predicate the up-front + // `prune_file` check already captured everything that can be pruned, so + // per-batch re-checking would only add overhead. + let stream = match file_pruner.take() { + Some(file_pruner) if file_pruner.is_watching() => { + EarlyStoppingStream::new( + stream, + file_pruner, + prepared.file_metrics.files_ranges_pruned_statistics.clone(), + ) + .boxed() + } + _ => stream, + }; + streams.push(stream); } + + Ok(streams) } } @@ -1513,6 +1620,48 @@ fn constant_value_from_stats( None } +/// Return, for each row group, the row offsets where a sub-row-group morsel +/// split may be cut without sharing a page between the resulting morsels. +/// +/// Pages of different columns do not align on the same rows, so exact +/// alignment for every column is impossible; the boundaries are taken from +/// the projected column with the largest compressed size (the dominant I/O +/// and decompression cost). Other columns share at most one page per cut. +/// +/// Returns `None` when the offset index is not loaded; callers must then not +/// split sub-row-group, because without page locations the reader fetches +/// whole column chunks for every piece, duplicating that I/O once per piece. +fn page_aligned_row_boundaries( + file_metadata: &ParquetMetaData, + projection_mask: &ProjectionMask, +) -> Option>> { + let offset_index = file_metadata.offset_index()?; + let boundaries = file_metadata + .row_groups() + .iter() + .zip(offset_index) + .map(|(rg, rg_offset_index)| { + let largest_projected_column = rg + .columns() + .iter() + .enumerate() + .filter(|(leaf_idx, _)| projection_mask.leaf_included(*leaf_idx)) + .max_by_key(|(_, column)| column.compressed_size()); + match largest_projected_column { + Some((leaf_idx, _)) => rg_offset_index[leaf_idx] + .page_locations() + .iter() + .skip(1) // the first page starts at row 0, not a cut point + .map(|page| page.first_row_index as usize) + .collect(), + // No projected file columns: nothing to gain from splitting. + None => Vec::new(), + } + }) + .collect(); + Some(boundaries) +} + /// Return the initial [`ParquetAccessPlan`] /// /// If the user has supplied one as an extension, use that @@ -1842,6 +1991,9 @@ mod test { reverse_row_groups: self.reverse_row_groups, sort_order_for_reorder: None, virtual_state, + split_hint: SplitHint::disabled(), + // Matches the `morsel_split_size` config default + morsel_split_size: Some(1024 * 1024), }) } } @@ -1884,6 +2036,79 @@ mod test { } } + /// Like [`open_file`], but drives planning to completion and returns the + /// streams of *all* morsels produced. More than one stream is returned + /// when the remaining work was split into multiple morsels. + async fn open_file_morsels( + morselizer: &ParquetMorselizer, + file: PartitionedFile, + ) -> Result>>> { + let mut planners = VecDeque::from([morselizer.plan_file(file)?]); + let mut streams = vec![]; + + while let Some(planner) = planners.pop_front() { + if let Some(mut plan) = planner.plan()? { + streams.extend(plan.take_morsels().into_iter().map(Morsel::into_stream)); + planners.extend(plan.take_ready_planners()); + if let Some(pending_planner) = plan.take_pending_planner() { + planners.push_front(pending_planner.await?); + } + } + } + Ok(streams) + } + + #[tokio::test] + async fn test_split_hint_splits_file_into_multiple_morsels() { + use datafusion_datasource::morsel::SplitHint; + + let store = Arc::new(InMemory::new()) as Arc; + + // A single ~2.4MB (uncompressed) row group of 600k rows: well above + // the ~1MB morsel split target, so it must be split sub-row-group. + let num_rows: i32 = 600_000; + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(arrow::array::Int32Array::from_iter_values( + 0..num_rows, + ))], + ) + .unwrap(); + let data_size = + write_parquet_batches(Arc::clone(&store), "test.parquet", vec![batch], None) + .await; + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + + let mut morselizer = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .build(); + + // Without a split hint the whole file is a single morsel + let streams = open_file_morsels(&morselizer, file.clone()).await.unwrap(); + assert_eq!(streams.len(), 1); + + // With a split hint asking for parallelism the file is split into + // multiple morsels that together yield all rows, in order + morselizer.set_split_hint(SplitHint::new(|| true)); + let streams = open_file_morsels(&morselizer, file).await.unwrap(); + assert!( + streams.len() > 1, + "expected the scan to be split into multiple morsels" + ); + + let mut values = vec![]; + for stream in streams { + values.extend(collect_int32_values(stream).await); + } + assert_eq!(values, (0..num_rows).collect::>()); + } + fn constant_int_stats() -> (Statistics, SchemaRef) { let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, false), diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 8228cd273eae6..2c48ff4f5c7e2 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -31,7 +31,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::config::EncryptionFactoryOptions; use datafusion_datasource::as_file_source; use datafusion_datasource::file_stream::FileOpener; -use datafusion_datasource::morsel::Morselizer; +use datafusion_datasource::morsel::{Morselizer, SplitHint}; use arrow::array::timezone::Tz; use arrow::datatypes::TimeUnit; @@ -635,6 +635,10 @@ impl FileSource for ParquetSource { reverse_row_groups: self.reverse_row_groups, sort_order_for_reorder: self.sort_order_for_reorder.clone(), virtual_state, + // Installed later via `Morselizer::set_split_hint` when the scan + // has a shared work source. + split_hint: SplitHint::disabled(), + morsel_split_size: self.table_parquet_options.global.morsel_split_size, })) } diff --git a/datafusion/datasource/src/file_stream/builder.rs b/datafusion/datasource/src/file_stream/builder.rs index 7034e902550a9..7f147e07f2d43 100644 --- a/datafusion/datasource/src/file_stream/builder.rs +++ b/datafusion/datasource/src/file_stream/builder.rs @@ -107,9 +107,14 @@ impl<'a> FileStreamBuilder<'a> { let Some(partition) = partition else { return internal_err!("FileStreamBuilder missing required partition"); }; - let Some(morselizer) = morselizer else { + let Some(mut morselizer) = morselizer else { return internal_err!("FileStreamBuilder missing required morselizer"); }; + // Let the morselizer split files into smaller morsels when the shared + // queue runs low, so the surplus can be donated to sibling streams. + if let Some(shared) = &shared_work_source { + morselizer.set_split_hint(shared.split_hint()); + } let Some(metrics) = metrics else { return internal_err!("FileStreamBuilder missing required metrics"); }; diff --git a/datafusion/datasource/src/file_stream/scan_state.rs b/datafusion/datasource/src/file_stream/scan_state.rs index 21125cd08896c..3ea9aac23d2c5 100644 --- a/datafusion/datasource/src/file_stream/scan_state.rs +++ b/datafusion/datasource/src/file_stream/scan_state.rs @@ -26,7 +26,7 @@ use datafusion_physical_plan::metrics::ScopedTimerGuard; use futures::stream::BoxStream; use futures::{FutureExt as _, StreamExt as _}; -use super::work_source::WorkSource; +use super::work_source::{WorkItem, WorkSource}; use super::{FileStreamMetrics, OnError}; /// State [`FileStreamState::Scan`]. @@ -175,6 +175,9 @@ impl ScanState { let done = 1 + self.work_source.skipped_on_limit(); self.metrics.files_processed.add(done); *remain = 0; + // This stream stops at the limit and will not + // pick up shared work anymore. + self.work_source.mark_finished(); (batch, true) } } @@ -234,6 +237,10 @@ impl ScanState { Ok(Some(mut plan)) => { // Queue any newly-ready morsels, planners, or planner I/O. self.ready_morsels.extend(plan.take_morsels()); + // If planning split the file into multiple morsels, keep + // the first and share the surplus with sibling streams. + self.work_source + .donate_surplus_morsels(&mut self.ready_morsels); self.ready_planners.extend(plan.take_ready_planners()); if let Some(pending_planner) = plan.take_pending_planner() { // should not have planned if we have outstanding I/O @@ -265,10 +272,21 @@ impl ScanState { }; } - // No outstanding work remains, so begin planning the next unopened file. - let part_file = match self.work_source.pop_front() { - Some(part_file) => part_file, - None => return ScanAndReturn::Done(None), + // No outstanding work remains, so pull the next work item. Donated + // morsels from sibling streams are ready for decoding immediately; + // files still need to be planned. + let part_file = match self.work_source.pop_work() { + Some(WorkItem::File(part_file)) => *part_file, + Some(WorkItem::Morsel(morsel)) => { + self.ready_morsels.push_back(morsel); + return ScanAndReturn::Continue; + } + None => { + // This stream terminates and can no longer pick up shared + // work. + self.work_source.mark_finished(); + return ScanAndReturn::Done(None); + } }; self.metrics.time_opening.start(); diff --git a/datafusion/datasource/src/file_stream/work_source.rs b/datafusion/datasource/src/file_stream/work_source.rs index c00048453b304..d874a204ccd71 100644 --- a/datafusion/datasource/src/file_stream/work_source.rs +++ b/datafusion/datasource/src/file_stream/work_source.rs @@ -21,14 +21,25 @@ use std::sync::Arc; use crate::PartitionedFile; use crate::file_groups::FileGroup; use crate::file_scan_config::FileScanConfig; +use crate::morsel::{Morsel, SplitHint}; use parking_lot::Mutex; +/// A unit of work handed to a `ScanState`. +#[derive(Debug)] +pub(super) enum WorkItem { + /// An unopened file that still needs to be planned. + File(Box), + /// A ready morsel donated by a sibling stream that split its remaining + /// work. + Morsel(Box), +} + /// Source of work for `ScanState`. /// /// Streams that may share work across siblings use [`WorkSource::Shared`], /// while streams that can not share work (e.g. because they must preserve file /// order) use [`WorkSource::Local`]. -#[derive(Debug, Clone)] +#[derive(Debug)] pub(super) enum WorkSource { /// Files this stream will plan locally without sharing them. Local(VecDeque), @@ -37,11 +48,36 @@ pub(super) enum WorkSource { } impl WorkSource { - /// Pop the next file to plan from this work source. - pub(super) fn pop_front(&mut self) -> Option { + /// Pop the next work item from this work source. + pub(super) fn pop_work(&mut self) -> Option { match self { - Self::Local(files) => files.pop_front(), - Self::Shared(shared) => shared.pop_front(), + Self::Local(files) => { + files.pop_front().map(|file| WorkItem::File(Box::new(file))) + } + Self::Shared(shared) => shared.pop_work(), + } + } + + /// Record that the consuming stream has finished and will no longer pick + /// up work, so the split heuristic only considers still-active streams. + pub(super) fn mark_finished(&self) { + if let Self::Shared(shared) = self { + shared.mark_finished(); + } + } + + /// Donate all but the first ready morsel to sibling streams. + /// + /// This is a no-op for local sources: morsels stay queued on the stream + /// that produced them. + pub(super) fn donate_surplus_morsels( + &self, + ready_morsels: &mut VecDeque>, + ) { + if let Self::Shared(shared) = self + && ready_morsels.len() > 1 + { + shared.donate_morsels(ready_morsels.drain(1..)); } } @@ -59,10 +95,10 @@ impl WorkSource { /// /// The queue is created once per execution and shared by all reorderable /// sibling streams for that execution. Whichever stream becomes idle first may -/// take the next unopened file from the front of the queue. +/// take the next work item from the front of the queue. /// /// It uses a [`Mutex`] internally to provide thread-safe access -/// to the shared file queue. +/// to the shared work queues. #[derive(Debug, Clone)] pub(crate) struct SharedWorkSource { inner: Arc, @@ -70,16 +106,39 @@ pub(crate) struct SharedWorkSource { #[derive(Debug, Default)] pub(super) struct SharedWorkSourceInner { - files: Mutex>, + queues: Mutex, + /// Number of sibling streams consuming from this source. + workers: usize, +} + +#[derive(Debug, Default)] +struct WorkQueues { + /// Unopened files that still need to be planned. + files: VecDeque, + /// Morsels donated by sibling streams that split their remaining work. + /// + /// Preferred over opening new files so partially-planned work finishes + /// first. + morsels: VecDeque>, + /// Number of sibling streams that have already run out of work and + /// finished. + finished_workers: usize, } impl SharedWorkSource { - /// Create a shared work source containing the provided unopened files. - pub(crate) fn new(files: impl IntoIterator) -> Self { + /// Create a shared work source consumed by `workers` sibling streams. + pub(crate) fn with_workers( + files: impl IntoIterator, + workers: usize, + ) -> Self { let files = files.into_iter().collect(); Self { inner: Arc::new(SharedWorkSourceInner { - files: Mutex::new(files), + queues: Mutex::new(WorkQueues { + files, + ..Default::default() + }), + workers, }), } } @@ -97,13 +156,126 @@ impl SharedWorkSource { .cloned() .collect(); let files = config.file_source.reorder_files(files); - Self::new(files) + Self::with_workers(files, config.file_groups.len()) } - /// Pop the next file from the shared work queue. + /// Pop the next work item from the shared queues. /// - /// Returns `None` if the queue is empty - fn pop_front(&self) -> Option { - self.inner.files.lock().pop_front() + /// Donated morsels are preferred over unopened files so partially-planned + /// work finishes first. + fn pop_work(&self) -> Option { + let mut queues = self.inner.queues.lock(); + if let Some(morsel) = queues.morsels.pop_front() { + return Some(WorkItem::Morsel(morsel)); + } + queues + .files + .pop_front() + .map(|file| WorkItem::File(Box::new(file))) + } + + /// Record that a consuming stream has finished (ran out of work or hit + /// its limit) and can no longer pick up donated morsels. + fn mark_finished(&self) { + self.inner.queues.lock().finished_workers += 1; + } + + /// Add morsels donated by a stream that split its remaining work. + fn donate_morsels(&self, morsels: impl Iterator>) { + self.inner.queues.lock().morsels.extend(morsels); + } + + /// Return a [`SplitHint`] that asks for splitting only when the remaining + /// shared work is too small to keep all still-active sibling streams busy. + pub(crate) fn split_hint(&self) -> SplitHint { + let inner = Arc::clone(&self.inner); + SplitHint::new(move || { + let queues = inner.queues.lock(); + let active_workers = inner.workers.saturating_sub(queues.finished_workers); + // With a single active stream splitting only adds overhead: there + // is no idle sibling to pick up the donated morsels. + active_workers > 1 + && queues.files.len() + queues.morsels.len() < active_workers + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::morsel::Morsel; + use arrow::array::RecordBatch; + use datafusion_common::Result; + use futures::stream::BoxStream; + + #[derive(Debug)] + struct DummyMorsel; + + impl Morsel for DummyMorsel { + fn into_stream(self: Box) -> BoxStream<'static, Result> { + Box::pin(futures::stream::empty()) + } + } + + fn test_file() -> PartitionedFile { + PartitionedFile::new("test.parquet", 1024) + } + + #[test] + fn split_hint_disabled_for_single_worker() { + let source = SharedWorkSource::with_workers(vec![], 1); + assert!(!source.split_hint().should_split()); + } + + #[test] + fn split_hint_tracks_remaining_work_and_active_workers() { + let files = vec![test_file(), test_file(), test_file()]; + let source = SharedWorkSource::with_workers(files, 2); + let hint = source.split_hint(); + + // 3 files remaining >= 2 workers: no split + assert!(!hint.should_split()); + + assert!(source.pop_work().is_some()); + assert!(source.pop_work().is_some()); + // 1 file remaining < 2 workers: split + assert!(hint.should_split()); + + assert!(source.pop_work().is_some()); + // queue empty, both workers still active: split + assert!(hint.should_split()); + + // one worker finishes; only one active worker remains + assert!(source.pop_work().is_none()); + source.mark_finished(); + assert!(!hint.should_split()); + } + + #[test] + fn donated_morsels_are_popped_before_files() { + let source = SharedWorkSource::with_workers(vec![test_file()], 2); + source.donate_morsels(vec![Box::new(DummyMorsel) as Box].into_iter()); + + assert!(matches!(source.pop_work(), Some(WorkItem::Morsel(_)))); + assert!(matches!(source.pop_work(), Some(WorkItem::File(_)))); + assert!(source.pop_work().is_none()); + } + + #[test] + fn donate_surplus_keeps_first_morsel_local() { + let shared = SharedWorkSource::with_workers(vec![], 2); + let work_source = WorkSource::Shared(shared.clone()); + + let mut ready: VecDeque> = VecDeque::from([ + Box::new(DummyMorsel) as Box, + Box::new(DummyMorsel) as Box, + Box::new(DummyMorsel) as Box, + ]); + work_source.donate_surplus_morsels(&mut ready); + + assert_eq!(ready.len(), 1); + assert!(matches!(shared.pop_work(), Some(WorkItem::Morsel(_)))); + assert!(matches!(shared.pop_work(), Some(WorkItem::Morsel(_)))); + assert!(shared.pop_work().is_none()); } } diff --git a/datafusion/datasource/src/morsel/mod.rs b/datafusion/datasource/src/morsel/mod.rs index 7b5066ca07a26..cc803594cda1b 100644 --- a/datafusion/datasource/src/morsel/mod.rs +++ b/datafusion/datasource/src/morsel/mod.rs @@ -39,6 +39,7 @@ use futures::future::BoxFuture; use futures::stream::BoxStream; use std::fmt::Debug; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; /// A Morsel of work ready to resolve to a stream of [`RecordBatch`]es. @@ -66,6 +67,56 @@ pub trait Morselizer: Send + Sync + Debug { /// work, such as reading from the file. Any needed I/O should be done using /// [`MorselPlan::with_pending_planner`]. fn plan_file(&self, file: PartitionedFile) -> Result>; + + /// Install a [`SplitHint`] that planning may consult to decide whether a + /// file should be split into more, smaller morsels (for example + /// sub-row-group splits for Parquet). + /// + /// Implementations that do not support splitting may ignore this (the + /// default). + fn set_split_hint(&mut self, _hint: SplitHint) {} +} + +/// A hint that tells a [`Morselizer`] whether splitting remaining work into +/// more, smaller morsels would currently improve parallelism. +/// +/// Splitting a file into multiple morsels has overhead (e.g. a separate reader +/// and row filter per morsel), so it should only happen when sibling streams +/// would otherwise go idle: typically at the tail of a scan, when fewer work +/// items remain than there are streams consuming them. The hint is consulted +/// at planning time, so the decision reflects the state of the scan at that +/// moment rather than at plan creation. +#[derive(Clone, Default)] +pub struct SplitHint { + inner: Option bool + Send + Sync>>, +} + +impl SplitHint { + /// Create a hint backed by the given `should_split` callback. + pub fn new(should_split: impl Fn() -> bool + Send + Sync + 'static) -> Self { + Self { + inner: Some(Arc::new(should_split)), + } + } + + /// A hint that never asks for splitting (the default). + pub fn disabled() -> Self { + Self::default() + } + + /// Returns true if producing more, smaller morsels would currently improve + /// parallelism. + pub fn should_split(&self) -> bool { + self.inner.as_ref().is_some_and(|f| f()) + } +} + +impl Debug for SplitHint { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SplitHint") + .field("enabled", &self.inner.is_some()) + .finish() + } } /// A Morsel Planner is responsible for creating morsels for a given scan. diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 7fff5b6b715ff..2b46098f11418 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -627,6 +627,12 @@ message ParquetOptions { uint64 max_predicate_cache_size = 33; } + // Target compressed bytes per morsel when splitting low-parallelism scan + // work; unset disables splitting. + oneof morsel_split_size_opt { + uint64 morsel_split_size = 38; + } + oneof max_row_group_bytes_opt { uint64 max_row_group_bytes = 37; } diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 97cc9af230105..c5e19e9fd1e88 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -1130,6 +1130,9 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt { protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize), }).unwrap_or(None), + morsel_split_size: value.morsel_split_size_opt.map(|opt| match opt { + protobuf::parquet_options::MorselSplitSizeOpt::MorselSplitSize(v) => Some(v as usize), + }).unwrap_or(None), max_row_group_bytes: value.max_row_group_bytes_opt.and_then(|opt| match opt { protobuf::parquet_options::MaxRowGroupBytesOpt::MaxRowGroupBytes(v) => MaxRowGroupBytes::try_new(v as usize).ok(), }), diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index 963faa5a3e9cb..9e098323bc187 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -6448,6 +6448,9 @@ impl serde::Serialize for ParquetOptions { if self.max_predicate_cache_size_opt.is_some() { len += 1; } + if self.morsel_split_size_opt.is_some() { + len += 1; + } if self.max_row_group_bytes_opt.is_some() { len += 1; } @@ -6622,6 +6625,15 @@ impl serde::Serialize for ParquetOptions { } } } + if let Some(v) = self.morsel_split_size_opt.as_ref() { + match v { + parquet_options::MorselSplitSizeOpt::MorselSplitSize(v) => { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("morselSplitSize", ToString::to_string(&v).as_str())?; + } + } + } if let Some(v) = self.max_row_group_bytes_opt.as_ref() { match v { parquet_options::MaxRowGroupBytesOpt::MaxRowGroupBytes(v) => { @@ -6711,6 +6723,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "coerceInt96", "max_predicate_cache_size", "maxPredicateCacheSize", + "morsel_split_size", + "morselSplitSize", "max_row_group_bytes", "maxRowGroupBytes", "coerce_int96_tz", @@ -6752,6 +6766,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { BloomFilterNdv, CoerceInt96, MaxPredicateCacheSize, + MorselSplitSize, MaxRowGroupBytes, CoerceInt96Tz, } @@ -6808,6 +6823,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "bloomFilterNdv" | "bloom_filter_ndv" => Ok(GeneratedField::BloomFilterNdv), "coerceInt96" | "coerce_int96" => Ok(GeneratedField::CoerceInt96), "maxPredicateCacheSize" | "max_predicate_cache_size" => Ok(GeneratedField::MaxPredicateCacheSize), + "morselSplitSize" | "morsel_split_size" => Ok(GeneratedField::MorselSplitSize), "maxRowGroupBytes" | "max_row_group_bytes" => Ok(GeneratedField::MaxRowGroupBytes), "coerceInt96Tz" | "coerce_int96_tz" => Ok(GeneratedField::CoerceInt96Tz), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), @@ -6862,6 +6878,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut bloom_filter_ndv_opt__ = None; let mut coerce_int96_opt__ = None; let mut max_predicate_cache_size_opt__ = None; + let mut morsel_split_size_opt__ = None; let mut max_row_group_bytes_opt__ = None; let mut coerce_int96_tz_opt__ = None; while let Some(k) = map_.next_key()? { @@ -7078,6 +7095,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } max_predicate_cache_size_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(x.0)); } + GeneratedField::MorselSplitSize => { + if morsel_split_size_opt__.is_some() { + return Err(serde::de::Error::duplicate_field("morselSplitSize")); + } + morsel_split_size_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::MorselSplitSizeOpt::MorselSplitSize(x.0)); + } GeneratedField::MaxRowGroupBytes => { if max_row_group_bytes_opt__.is_some() { return Err(serde::de::Error::duplicate_field("maxRowGroupBytes")); @@ -7126,6 +7149,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { bloom_filter_ndv_opt: bloom_filter_ndv_opt__, coerce_int96_opt: coerce_int96_opt__, max_predicate_cache_size_opt: max_predicate_cache_size_opt__, + morsel_split_size_opt: morsel_split_size_opt__, max_row_group_bytes_opt: max_row_group_bytes_opt__, coerce_int96_tz_opt: coerce_int96_tz_opt__, }) diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 93b97c4f1376c..ff93372200bd2 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -900,6 +900,12 @@ pub struct ParquetOptions { pub max_predicate_cache_size_opt: ::core::option::Option< parquet_options::MaxPredicateCacheSizeOpt, >, + /// Target compressed bytes per morsel when splitting low-parallelism scan + /// work; unset disables splitting. + #[prost(oneof = "parquet_options::MorselSplitSizeOpt", tags = "38")] + pub morsel_split_size_opt: ::core::option::Option< + parquet_options::MorselSplitSizeOpt, + >, #[prost(oneof = "parquet_options::MaxRowGroupBytesOpt", tags = "37")] pub max_row_group_bytes_opt: ::core::option::Option< parquet_options::MaxRowGroupBytesOpt, @@ -968,6 +974,13 @@ pub mod parquet_options { #[prost(uint64, tag = "33")] MaxPredicateCacheSize(u64), } + /// Target compressed bytes per morsel when splitting low-parallelism scan + /// work; unset disables splitting. + #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] + pub enum MorselSplitSizeOpt { + #[prost(uint64, tag = "38")] + MorselSplitSize(u64), + } #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] pub enum MaxRowGroupBytesOpt { #[prost(uint64, tag = "37")] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index a6fa13ca7479c..35ea8ab9e7707 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -938,6 +938,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), coerce_int96_tz_opt: value.coerce_int96_tz.clone().map(protobuf::parquet_options::CoerceInt96TzOpt::CoerceInt96Tz), max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)), + morsel_split_size_opt: value.morsel_split_size.map(|v| protobuf::parquet_options::MorselSplitSizeOpt::MorselSplitSize(v as u64)), max_row_group_bytes_opt: value.max_row_group_bytes.map(|v| protobuf::parquet_options::MaxRowGroupBytesOpt::MaxRowGroupBytes(v.get() as u64)), content_defined_chunking: Some((&value.content_defined_chunking).into()), }) diff --git a/datafusion/proto-models/src/generated/datafusion_proto_common.rs b/datafusion/proto-models/src/generated/datafusion_proto_common.rs index 93b97c4f1376c..ff93372200bd2 100644 --- a/datafusion/proto-models/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto-models/src/generated/datafusion_proto_common.rs @@ -900,6 +900,12 @@ pub struct ParquetOptions { pub max_predicate_cache_size_opt: ::core::option::Option< parquet_options::MaxPredicateCacheSizeOpt, >, + /// Target compressed bytes per morsel when splitting low-parallelism scan + /// work; unset disables splitting. + #[prost(oneof = "parquet_options::MorselSplitSizeOpt", tags = "38")] + pub morsel_split_size_opt: ::core::option::Option< + parquet_options::MorselSplitSizeOpt, + >, #[prost(oneof = "parquet_options::MaxRowGroupBytesOpt", tags = "37")] pub max_row_group_bytes_opt: ::core::option::Option< parquet_options::MaxRowGroupBytesOpt, @@ -968,6 +974,13 @@ pub mod parquet_options { #[prost(uint64, tag = "33")] MaxPredicateCacheSize(u64), } + /// Target compressed bytes per morsel when splitting low-parallelism scan + /// work; unset disables splitting. + #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] + pub enum MorselSplitSizeOpt { + #[prost(uint64, tag = "38")] + MorselSplitSize(u64), + } #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] pub enum MaxRowGroupBytesOpt { #[prost(uint64, tag = "37")] diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 8e71cc926856c..1e4e13d500a70 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -456,6 +456,9 @@ mod parquet { max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64) }), + morsel_split_size_opt: global_options.global.morsel_split_size.map(|size| { + parquet_options::MorselSplitSizeOpt::MorselSplitSize(size as u64) + }), max_row_group_bytes_opt: global_options.global.max_row_group_bytes.map(|size| { parquet_options::MaxRowGroupBytesOpt::MaxRowGroupBytes(size.get() as u64) }), @@ -632,6 +635,13 @@ mod parquet { size, ) => *size as usize, }), + morsel_split_size: proto.morsel_split_size_opt.as_ref().map( + |opt| match opt { + parquet_options::MorselSplitSizeOpt::MorselSplitSize(size) => { + *size as usize + } + }, + ), max_row_group_bytes: proto .max_row_group_bytes_opt .as_ref() diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 8d334d8433284..5d2697b2dbb6f 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -258,6 +258,7 @@ datafusion.execution.parquet.max_row_group_size 1048576 datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 datafusion.execution.parquet.maximum_parallel_row_group_writers 1 datafusion.execution.parquet.metadata_size_hint 524288 +datafusion.execution.parquet.morsel_split_size 1048576 datafusion.execution.parquet.pruning true datafusion.execution.parquet.pushdown_filters false datafusion.execution.parquet.reorder_filters false @@ -415,6 +416,7 @@ datafusion.execution.parquet.max_row_group_size 1048576 (writing) Target maximum datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. datafusion.execution.parquet.maximum_parallel_row_group_writers 1 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. datafusion.execution.parquet.metadata_size_hint 524288 (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer Default setting to 512 KiB, which should be sufficient for most parquet files, it can reduce one I/O operation per parquet file. If the metadata is larger than the hint, two reads will still be performed. +datafusion.execution.parquet.morsel_split_size 1048576 (reading) Target size, in compressed bytes, of the morsels a file scan is split into when too little work remains to keep all output streams busy (the tail of a scan). Row groups larger than this are split at page-aligned row ranges and the surplus morsels are shared with idle sibling streams. If NULL, splitting is disabled. datafusion.execution.parquet.pruning true (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file datafusion.execution.parquet.pushdown_filters false (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". datafusion.execution.parquet.reorder_filters false (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 7c6756a096309..f658931a82620 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -93,6 +93,7 @@ The following configuration settings are available: | datafusion.execution.parquet.coerce_int96_tz | NULL | (reading) Optional timezone applied to INT96 columns when `coerce_int96` is set. When `Some`, INT96 columns coerce to `Timestamp(, Some())` instead of the default `Timestamp(, None)`. Spark and other systems write INT96 values as UTC-adjusted instants, so callers that need the resulting Arrow type to be timezone-aware (e.g. for Spark `TimestampType` semantics) should set this to `"UTC"`. No effect when `coerce_int96` is `None`. | | datafusion.execution.parquet.bloom_filter_on_read | true | (reading) Use any available bloom filters when reading parquet files | | datafusion.execution.parquet.max_predicate_cache_size | NULL | (reading) The maximum predicate cache size, in bytes. When `pushdown_filters` is enabled, sets the maximum memory used to cache the results of predicate evaluation between filter evaluation and output generation. Decreasing this value will reduce memory usage, but may increase IO and CPU usage. None means use the default parquet reader setting. 0 means no caching. | +| datafusion.execution.parquet.morsel_split_size | 1048576 | (reading) Target size, in compressed bytes, of the morsels a file scan is split into when too little work remains to keep all output streams busy (the tail of a scan). Row groups larger than this are split at page-aligned row ranges and the surplus morsels are shared with idle sibling streams. If NULL, splitting is disabled. | | datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes | | datafusion.execution.parquet.write_batch_size | 1024 | (writing) Sets write_batch_size in rows | | datafusion.execution.parquet.writer_version | 1.0 | (writing) Sets parquet writer version valid values are "1.0" and "2.0" | From 7bb5a202ba268990bc2156624cd1578b1b6d2ba5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 12 Jun 2026 19:14:55 +0200 Subject: [PATCH 2/2] Keep idle siblings alive while a donor may still donate morsels A stream that found the shared work queue empty terminated immediately, so by the time a donor finished planning a large file and donated its surplus morsels, no sibling was left to steal them (and the split hint often reported a single active worker, suppressing the split entirely). Popping a file now registers a FileLease on the shared work source; while any lease is alive, an idle sibling yields Poll::Pending and re-polls instead of finishing, then steals donated morsels once the donor publishes them. The lease is released when the donation window closes (the first morsel starts streaming) or the file is abandoned. Co-Authored-By: Claude Fable 5 --- datafusion/datasource/src/file_stream/mod.rs | 77 ++++++++++ .../datasource/src/file_stream/scan_state.rs | 33 ++++- .../datasource/src/file_stream/work_source.rs | 135 +++++++++++++++--- 3 files changed, 220 insertions(+), 25 deletions(-) diff --git a/datafusion/datasource/src/file_stream/mod.rs b/datafusion/datasource/src/file_stream/mod.rs index d976bf955dbb2..71a26089d4dc6 100644 --- a/datafusion/datasource/src/file_stream/mod.rs +++ b/datafusion/datasource/src/file_stream/mod.rs @@ -189,6 +189,7 @@ mod tests { use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; + use std::task::{Context, Poll}; use crate::file_stream::{ FileOpenFuture, FileOpener, FileStream, FileStreamBuilder, OnError, @@ -1325,6 +1326,82 @@ mod tests { Ok(()) } + /// Verifies that an idle sibling waits while a donor is still planning a + /// shared file — which may yet be split into donated morsels — instead of + /// terminating, and then steals the donated morsel once it appears. + #[tokio::test] + async fn morsel_idle_sibling_waits_for_donor_morsel_donation() -> Result<()> { + // file1 needs an extra poll of I/O before planning produces two + // morsels; the surplus morsel is donated to the shared queue. + let test = FileStreamMorselTest::new() + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file1.parquet") + .add_plan( + PendingPlannerBuilder::new(IoFutureId(1)) + .with_polls_to_resolve(PollsToResolve(1)), + ) + .add_plan( + MockPlanBuilder::new() + .with_morsel(MorselId(10), 101) + .with_morsel(MorselId(11), 102), + ) + .return_none(), + ) + // Declare a second, empty partition that can steal donated work. + .with_reads(vec![PartitionId(1)]); + + let config = test.test_config(); + let shared_work_source = config + .create_sibling_state() + .and_then(|state| state.as_ref().downcast_ref::().cloned()); + let metrics = ExecutionPlanMetricsSet::new(); + let mut donor = FileStreamBuilder::new(&config) + .with_partition(0) + .with_shared_work_source(shared_work_source.clone()) + .with_morselizer(Box::new(test.morselizer.clone())) + .with_metrics(&metrics) + .build()?; + let mut idle = FileStreamBuilder::new(&config) + .with_partition(1) + .with_shared_work_source(shared_work_source) + .with_morselizer(Box::new(test.morselizer.clone())) + .with_metrics(&metrics) + .build()?; + + let waker = futures::task::noop_waker(); + let mut cx = Context::from_waker(&waker); + + // The donor pops the only shared file and blocks on planning I/O. + assert!(donor.poll_next_unpin(&mut cx).is_pending()); + + // The idle sibling finds the queue empty, but the donor may still + // split its file, so the sibling must wait rather than finish. + assert!( + idle.poll_next_unpin(&mut cx).is_pending(), + "idle sibling must wait for the in-flight donor instead of finishing" + ); + + // The donor finishes planning: it keeps the first morsel and donates + // the second to the shared queue. + let Poll::Ready(Some(donor_batch)) = donor.poll_next_unpin(&mut cx) else { + panic!("donor should produce its first batch"); + }; + assert_eq!(format_result(donor_batch), "Batch: 101"); + + // The idle sibling steals the donated morsel. + let Poll::Ready(Some(stolen_batch)) = idle.poll_next_unpin(&mut cx) else { + panic!("idle sibling should steal the donated morsel"); + }; + assert_eq!(format_result(stolen_batch), "Batch: 102"); + + // With all work consumed and no donors in flight, both streams finish. + assert!(matches!(donor.poll_next_unpin(&mut cx), Poll::Ready(None))); + assert!(matches!(idle.poll_next_unpin(&mut cx), Poll::Ready(None))); + + Ok(()) + } + /// Tests how one or more `FileStream`s consume morselized file work. #[derive(Clone)] struct FileStreamMorselTest { diff --git a/datafusion/datasource/src/file_stream/scan_state.rs b/datafusion/datasource/src/file_stream/scan_state.rs index 3ea9aac23d2c5..8e1b27cee857c 100644 --- a/datafusion/datasource/src/file_stream/scan_state.rs +++ b/datafusion/datasource/src/file_stream/scan_state.rs @@ -26,7 +26,7 @@ use datafusion_physical_plan::metrics::ScopedTimerGuard; use futures::stream::BoxStream; use futures::{FutureExt as _, StreamExt as _}; -use super::work_source::{WorkItem, WorkSource}; +use super::work_source::{FileLease, PopResult, WorkItem, WorkSource}; use super::{FileStreamMetrics, OnError}; /// State [`FileStreamState::Scan`]. @@ -81,6 +81,12 @@ pub(super) struct ScanState { /// Once the I/O completes, yields the next planner and is pushed back /// onto `ready_planners`. pending_planner: Option, + /// Lease on the file currently being planned, when it was popped from a + /// shared work source. While held, idle siblings keep polling for + /// morsels this file may donate instead of terminating. Released once + /// the first morsel starts streaming (the donation window has closed) or + /// the file is abandoned. + current_file_lease: Option, /// Metrics for the active scan queues. metrics: FileStreamMetrics, } @@ -102,6 +108,7 @@ impl ScanState { ready_morsels: Default::default(), reader: None, pending_planner: None, + current_file_lease: None, metrics, } } @@ -146,6 +153,7 @@ impl ScanState { return match self.on_error { OnError::Skip => { self.metrics.files_processed.add(1); + self.current_file_lease = None; ScanAndReturn::Continue } OnError::Fail => ScanAndReturn::Error(err), @@ -221,6 +229,10 @@ impl ScanState { self.metrics.time_scanning_until_data.start(); self.metrics.time_scanning_total.start(); self.reader = Some(morsel.into_stream()); + // A morsel is now streaming, so the window in which this file + // could donate surplus morsels has closed. Release the lease so + // idle siblings can exit instead of waiting on this stream. + self.current_file_lease = None; return ScanAndReturn::Continue; } @@ -256,6 +268,7 @@ impl ScanState { Ok(None) => { self.metrics.files_processed.add(1); self.metrics.time_opening.stop(); + self.current_file_lease = None; ScanAndReturn::Continue } Err(err) => { @@ -264,6 +277,7 @@ impl ScanState { match self.on_error { OnError::Skip => { self.metrics.files_processed.add(1); + self.current_file_lease = None; ScanAndReturn::Continue } OnError::Fail => ScanAndReturn::Error(err), @@ -276,12 +290,22 @@ impl ScanState { // morsels from sibling streams are ready for decoding immediately; // files still need to be planned. let part_file = match self.work_source.pop_work() { - Some(WorkItem::File(part_file)) => *part_file, - Some(WorkItem::Morsel(morsel)) => { + PopResult::Ready(WorkItem::File(part_file), lease) => { + self.current_file_lease = lease; + *part_file + } + PopResult::Ready(WorkItem::Morsel(morsel), _) => { self.ready_morsels.push_back(morsel); return ScanAndReturn::Continue; } - None => { + PopResult::Pending => { + // A sibling is still planning a shared file and may donate + // morsels; re-schedule ourselves to re-check the queues + // instead of terminating. + cx.waker().wake_by_ref(); + return ScanAndReturn::Return(Poll::Pending); + } + PopResult::Done => { // This stream terminates and can no longer pick up shared // work. self.work_source.mark_finished(); @@ -301,6 +325,7 @@ impl ScanState { self.metrics.file_open_errors.add(1); self.metrics.time_opening.stop(); self.metrics.files_processed.add(1); + self.current_file_lease = None; ScanAndReturn::Continue } OnError::Fail => ScanAndReturn::Error(err), diff --git a/datafusion/datasource/src/file_stream/work_source.rs b/datafusion/datasource/src/file_stream/work_source.rs index d874a204ccd71..a8522ea25b5a9 100644 --- a/datafusion/datasource/src/file_stream/work_source.rs +++ b/datafusion/datasource/src/file_stream/work_source.rs @@ -49,11 +49,17 @@ pub(super) enum WorkSource { impl WorkSource { /// Pop the next work item from this work source. - pub(super) fn pop_work(&mut self) -> Option { + /// + /// Returns [`PopResult::Pending`] for [`WorkSource::Shared`] when the + /// queues are empty but a sibling is still planning a file that may yet + /// be split and donated. The caller must re-schedule its waker and yield + /// instead of terminating. + pub(super) fn pop_work(&mut self) -> PopResult { match self { - Self::Local(files) => { - files.pop_front().map(|file| WorkItem::File(Box::new(file))) - } + Self::Local(files) => match files.pop_front() { + Some(file) => PopResult::Ready(WorkItem::File(Box::new(file)), None), + None => PopResult::Done, + }, Self::Shared(shared) => shared.pop_work(), } } @@ -91,6 +97,42 @@ impl WorkSource { } } +/// Outcome of a pop attempt against a [`WorkSource`]. +pub(super) enum PopResult { + /// Work popped. The optional [`FileLease`] must be held while the file + /// may still donate morsels; while it is alive, idle siblings keep + /// polling instead of treating the shared source as drained. + Ready(WorkItem, Option), + /// No work currently available, but a sibling is still planning a file + /// that may donate morsels. The caller must re-schedule its waker and + /// yield. + Pending, + /// No work available and no donors in flight — fully drained. + Done, +} + +/// RAII guard for a file popped from a [`SharedWorkSource`]. +/// +/// While alive, idle sibling streams keep polling for donated morsels +/// instead of terminating: the holder is still planning the file and may yet +/// split it and donate the surplus. Dropped once the donation window closes +/// (the first morsel starts streaming) or the file is abandoned. +pub(super) struct FileLease { + inner: Arc, +} + +impl std::fmt::Debug for FileLease { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FileLease").finish_non_exhaustive() + } +} + +impl Drop for FileLease { + fn drop(&mut self) { + self.inner.queues.lock().in_flight -= 1; + } +} + /// Shared source of work for sibling `FileStream`s /// /// The queue is created once per execution and shared by all reorderable @@ -123,6 +165,10 @@ struct WorkQueues { /// Number of sibling streams that have already run out of work and /// finished. finished_workers: usize, + /// Number of popped files whose [`FileLease`] is still alive. Non-zero + /// means a donor is still planning a file and may yet donate morsels, so + /// idle siblings must keep polling rather than terminate. + in_flight: usize, } impl SharedWorkSource { @@ -162,16 +208,28 @@ impl SharedWorkSource { /// Pop the next work item from the shared queues. /// /// Donated morsels are preferred over unopened files so partially-planned - /// work finishes first. - fn pop_work(&self) -> Option { + /// work finishes first. Popping a file registers a [`FileLease`]: while + /// it is alive, siblings that find the queues empty get + /// [`PopResult::Pending`] (the file may still be split and donated) + /// rather than [`PopResult::Done`]. + fn pop_work(&self) -> PopResult { let mut queues = self.inner.queues.lock(); if let Some(morsel) = queues.morsels.pop_front() { - return Some(WorkItem::Morsel(morsel)); + return PopResult::Ready(WorkItem::Morsel(morsel), None); + } + if let Some(file) = queues.files.pop_front() { + queues.in_flight += 1; + return PopResult::Ready( + WorkItem::File(Box::new(file)), + Some(FileLease { + inner: Arc::clone(&self.inner), + }), + ); } - queues - .files - .pop_front() - .map(|file| WorkItem::File(Box::new(file))) + if queues.in_flight > 0 { + return PopResult::Pending; + } + PopResult::Done } /// Record that a consuming stream has finished (ran out of work or hit @@ -236,17 +294,17 @@ mod tests { // 3 files remaining >= 2 workers: no split assert!(!hint.should_split()); - assert!(source.pop_work().is_some()); - assert!(source.pop_work().is_some()); + assert!(matches!(source.pop_work(), PopResult::Ready(..))); + assert!(matches!(source.pop_work(), PopResult::Ready(..))); // 1 file remaining < 2 workers: split assert!(hint.should_split()); - assert!(source.pop_work().is_some()); + assert!(matches!(source.pop_work(), PopResult::Ready(..))); // queue empty, both workers still active: split assert!(hint.should_split()); // one worker finishes; only one active worker remains - assert!(source.pop_work().is_none()); + assert!(matches!(source.pop_work(), PopResult::Done)); source.mark_finished(); assert!(!hint.should_split()); } @@ -256,9 +314,38 @@ mod tests { let source = SharedWorkSource::with_workers(vec![test_file()], 2); source.donate_morsels(vec![Box::new(DummyMorsel) as Box].into_iter()); - assert!(matches!(source.pop_work(), Some(WorkItem::Morsel(_)))); - assert!(matches!(source.pop_work(), Some(WorkItem::File(_)))); - assert!(source.pop_work().is_none()); + assert!(matches!( + source.pop_work(), + PopResult::Ready(WorkItem::Morsel(_), None) + )); + assert!(matches!( + source.pop_work(), + PopResult::Ready(WorkItem::File(_), Some(_)) + )); + assert!(matches!(source.pop_work(), PopResult::Done)); + } + + #[test] + fn empty_queues_with_live_file_lease_are_pending() { + let source = SharedWorkSource::with_workers(vec![test_file()], 2); + let PopResult::Ready(WorkItem::File(_), Some(lease)) = source.pop_work() else { + panic!("expected a file with a lease"); + }; + + // The donor is still planning its file and may donate morsels, so an + // idle sibling must keep polling rather than terminate. + assert!(matches!(source.pop_work(), PopResult::Pending)); + + // The donor splits the file, donates a morsel, and (once the first + // morsel starts streaming) releases its lease. + source.donate_morsels(vec![Box::new(DummyMorsel) as Box].into_iter()); + drop(lease); + + assert!(matches!( + source.pop_work(), + PopResult::Ready(WorkItem::Morsel(_), None) + )); + assert!(matches!(source.pop_work(), PopResult::Done)); } #[test] @@ -274,8 +361,14 @@ mod tests { work_source.donate_surplus_morsels(&mut ready); assert_eq!(ready.len(), 1); - assert!(matches!(shared.pop_work(), Some(WorkItem::Morsel(_)))); - assert!(matches!(shared.pop_work(), Some(WorkItem::Morsel(_)))); - assert!(shared.pop_work().is_none()); + assert!(matches!( + shared.pop_work(), + PopResult::Ready(WorkItem::Morsel(_), None) + )); + assert!(matches!( + shared.pop_work(), + PopResult::Ready(WorkItem::Morsel(_), None) + )); + assert!(matches!(shared.pop_work(), PopResult::Done)); } }