feat(parquet): runtime row-group early stop via TopK dynamic filter#22450
feat(parquet): runtime row-group early stop via TopK dynamic filter#22450zhuqi-lucas wants to merge 1 commit into
Conversation
Two CI failures on PR apache#22450: 1. **cargo doc** — broken intra-doc link in `ParquetFileMetrics::row_groups_pruned_dynamic_filter`. Switch from `[\`row_groups_pruned_statistics\`]` to `[\`Self::row_groups_pruned_statistics\`]` so rustdoc can resolve it. 2. **sqllogictest substrait round-trip** — adding `dynamic_rg_pruning=eligible` to ParquetSource's `fmt_extra` output shifted every `EXPLAIN` line that already showed a `DynamicFilter` predicate. Add the marker to 13 SLT expectations: - clickbench, explain_analyze, limit, limit_pruning, dynamic_filter_pushdown_config, preserve_file_partitioning, projection_pushdown, push_down_filter_parquet, push_down_filter_regression, repartition_subset_satisfaction, sort_pushdown, statistics_registry, topk - 134 marker insertions total, all on `DataSourceExec:` lines whose predicate contains `DynamicFilter [`. Two summary-level analyze tests also need the new `row_groups_pruned_dynamic_filter=0` counter in their metrics block (`limit_pruning.slt`, `dynamic_filter_pushdown_config.slt`). Dev-level analyze output elides zero-valued counters so the other files don't need it. No behavior change beyond what was already in the previous commit.
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-rg-level-dynamic-pruning (691926f) to 077f08a (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-rg-level-dynamic-pruning (691926f) to 077f08a (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-rg-level-dynamic-pruning (691926f) to 077f08a (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
run benchmark sort_pushdown_inexact |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-rg-level-dynamic-pruning (691926f) to 077f08a (merge-base) diff using: sort_pushdown_inexact File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
There was a problem hiding this comment.
Pull request overview
This PR adds runtime row-group pruning for Parquet scans driven by TopK’s dynamic filter, closing the gap where row groups selected at file open couldn’t be re-pruned after the TopK threshold tightens during execution.
Changes:
- Introduces a runtime
RowGroupPrunerthat re-evaluates a dynamic predicate at decoder-run boundaries and skips row groups proven unreachable. - Forces per-row-group decoder splitting when the predicate is dynamic so the runtime pruner has a boundary at every RG.
- Adds observability:
dynamic_rg_pruning=eligibleinEXPLAINand a new metricrow_groups_pruned_dynamic_filterinEXPLAIN ANALYZE, plus tests/SLTs updated accordingly.
Reviewed changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| datafusion/datasource-parquet/src/push_decoder.rs | Adds RowGroupPruner, tracks row-group indices per decoder run, and skips prunable runs at runtime. |
| datafusion/datasource-parquet/src/opener/mod.rs | Forces per-RG runs for dynamic predicates; wires pending runs + runtime pruner into PushDecoderStreamState. |
| datafusion/datasource-parquet/src/access_plan.rs | Extends split_runs with force_per_row_group to avoid coalescing runs for dynamic predicates. |
| datafusion/datasource-parquet/src/source.rs | Adds dynamic_rg_pruning=eligible marker in fmt_extra and unit tests for the marker. |
| datafusion/datasource-parquet/src/row_group_filter.rs | Exposes RowGroupPruningStatistics to reuse stats adapter for runtime pruning. |
| datafusion/datasource-parquet/src/metrics.rs | Adds row_groups_pruned_dynamic_filter metric to ParquetFileMetrics. |
| datafusion/core/tests/parquet/mod.rs | Adds helper to read row_groups_pruned_dynamic_filter from metrics. |
| datafusion/core/tests/parquet/dynamic_row_group_pruning.rs | New integration tests validating metric fires for TopK and stays quiet otherwise. |
| datafusion/sqllogictest/test_files/dynamic_row_group_pruning.slt | New SLT covering both EXPLAIN marker and EXPLAIN ANALYZE metric value. |
| datafusion/sqllogictest/test_files/topk.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/statistics_registry.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/sort_pushdown.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/push_down_filter_regression.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/push_down_filter_parquet.slt | Updates expected plans/metrics to include dynamic_rg_pruning=eligible and (where relevant) the new counter. |
| datafusion/sqllogictest/test_files/projection_pushdown.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/preserve_file_partitioning.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/limit.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/limit_pruning.slt | Updates expected metrics to include row_groups_pruned_dynamic_filter=0 plus eligibility marker. |
| datafusion/sqllogictest/test_files/explain_analyze.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt | Updates expected plans/metrics to include eligibility marker and row_groups_pruned_dynamic_filter=0 where applicable. |
| datafusion/sqllogictest/test_files/clickbench.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
Comments suppressed due to low confidence (1)
datafusion/datasource-parquet/src/access_plan.rs:458
split_runscomputesrow_group_needs_filteras!fully_matchedwithout considering theneeds_filterargument. Whenforce_per_row_group=trueand the scan has no row filter (needs_filter=false), this will still mark all runs asneeds_filter=true, causing the opener to treat them as filtered runs (e.g. attempting to fetch row filters / applying predicate-cache settings) even though no row-level filter exists.row_group_needs_filtershould be derived asneeds_filter && !fully_matchedso the run metadata stays consistent with the caller’s capabilities.
for (idx, (access, fully_matched)) in
row_groups.into_iter().zip(fully_matched).enumerate()
{
if !access.should_scan() {
continue;
}
let row_group_needs_filter = !fully_matched;
// Coalesce consecutive RGs into a run only when (a) they share
// the same filter requirement and (b) we're not forcing per-RG
// splitting for runtime pruning.
let can_coalesce = !force_per_row_group;
if can_coalesce
&& let Some(run) = runs
.last_mut()
.filter(|run| run.needs_filter == row_group_needs_filter)
{
run.access_plan.set(idx, access);
if fully_matched {
run.access_plan.mark_fully_matched(idx);
}
} else {
let mut run_plan = ParquetAccessPlan::new_none(num_row_groups);
run_plan.set(idx, access);
if fully_matched {
run_plan.mark_fully_matched(idx);
}
runs.push(RowGroupRun::new(row_group_needs_filter, run_plan));
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagesort_pushdown_inexact — base (merge-base)
sort_pushdown_inexact — branch
File an issue against this benchmark runner |
Per Copilot review on apache#22450: `RowGroupPruner` was using a single `predicate_creation_errors` counter for both predicate construction (`build_pruning_predicate`) AND predicate evaluation (`PruningPredicate::prune`) failures. The log message also said "Ignoring error building..." when the failure was during evaluation. This misattributed evaluation failures and made the metric semantics inconsistent with the static row-group pruning path in `RowGroupAccessPlanFilter::prune_by_statistics`, which already separates the two. `RowGroupPruner::new` now takes both counters: - `predicate_creation_errors`: bumped on `build_pruning_predicate` failures. Wired to `prepared.predicate_creation_errors` from the opener — same field the static path uses. - `predicate_evaluation_errors`: bumped on `PruningPredicate::prune` failures. Wired to `prepared.file_metrics.predicate_evaluation_errors` — same field the static `prune_by_statistics` path uses, so the two paths accumulate into a shared counter. The error log message is updated to say "evaluating" so the metric and the log agree.
|
run benchmark sort_pushdown_inexact |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-rg-level-dynamic-pruning (0828f1b) to a8f03fd (merge-base) diff using: sort_pushdown_inexact File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagesort_pushdown_inexact — base (merge-base)
sort_pushdown_inexact — branch
File an issue against this benchmark runner |
|
run benchmark topk_tpch |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-rg-level-dynamic-pruning (0828f1b) to a8f03fd (merge-base) diff using: topk_tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetopk_tpch — base (merge-base)
topk_tpch — branch
File an issue against this benchmark runner |
┏━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ HEAD ┃ feat_topk-rg-level-dynamic-pruning ┃ Change ┃
┡━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q1 │ 2.14 / 2.74 ±0.76 / 4.10 ms │ 2.12 / 2.79 ±0.68 / 4.02 ms │ no change │
│ Q2 │ 10.66 / 11.36 ±0.68 / 12.23 ms │ 2.81 / 3.61 ±0.87 / 4.72 ms │ +3.15x faster │
│ Q3 │ 31.77 / 32.15 ±0.43 / 32.83 ms │ 31.71 / 31.92 ±0.16 / 32.18 ms │ no change │
│ Q4 │ 11.83 / 12.29 ±0.77 / 13.82 ms │ 3.13 / 3.25 ±0.13 / 3.48 ms │ +3.78x faster │
│ Q5 │ 9.94 / 10.14 ±0.18 / 10.46 ms │ 9.95 / 10.02 ±0.05 / 10.09 ms │ no change │
│ Q6 │ 17.19 / 17.39 ±0.15 / 17.56 ms │ 17.11 / 17.36 ±0.37 / 18.09 ms │ no change │
│ Q7 │ 37.07 / 38.08 ±1.17 / 40.08 ms │ 37.00 / 37.41 ±0.37 / 38.07 ms │ no change │
│ Q8 │ 28.13 / 28.59 ±0.60 / 29.71 ms │ 6.86 / 7.16 ±0.42 / 7.98 ms │ +3.99x faster │
│ Q9 │ 35.34 / 36.86 ±1.54 / 38.77 ms │ 8.36 / 8.50 ±0.08 / 8.60 ms │ +4.34x faster │
│ Q10 │ 54.13 / 55.29 ±1.83 / 58.93 ms │ 12.77 / 13.00 ±0.45 / 13.89 ms │ +4.25x faster │
│ Q11 │ 3.75 / 3.91 ±0.11 / 4.05 ms │ 3.82 / 4.08 ±0.31 / 4.68 ms │ no change │
└───────┴────────────────────────────────┴────────────────────────────────────┴───────────────┘cc @alamb @adriangb @Dandandan |
|
Nice, impressive 🚀🚀🚀 |
|
Thanks @adriangb! On the On the #22144 overlap and morsel intersection — agreed on both; will trace #22144 for a shared abstraction in a follow-up, and morsels would be the natural cross-file extension point. Have a good trip :) |
Per @adriangb's review on apache#22450: dynamic filters that are already complete at scan-startup (the typical hash-join case, where the build side finishes before the probe scan starts) gain nothing from per-RG decoder splitting — the static `PruningPredicate` path at file-open has used the final filter value, and every subsequent `RowGroupPruner` check returns the same answer. We were paying a per-RG decoder allocation cost for no gain. Tighten the gate in the opener from is_dynamic_physical_expr(p) to is_dynamic_physical_expr(p) && !is_filter_complete(p) where `is_filter_complete` tree-walks the predicate and returns true only when every `DynamicFilterPhysicalExpr` node has been `mark_complete()`d. Keeps splitting on for TopK (never complete during scan) and for concurrent hash-join where the build finishes after the probe started reading (incomplete at scan-startup → still expected to tighten). Filters that are already complete at scan-start fall through to the existing single-run path. Five new unit tests cover the helper's tree-walk semantics: - static predicate (vacuously complete) - single incomplete dynamic filter - single complete dynamic filter (post `mark_complete()`) - composite predicate with both incomplete and complete nodes → incomplete - composite predicate with both nodes complete → complete 134 parquet unit + 204 integration + all SLT pass; clippy `-D warnings` clean.
|
I did a little experiment: #22460 |
…ange detection (apache#22460) Extracted from a design discussion around the duplicated "does this filter have a dynamic portion that might change?" / "has the filter changed?" patterns (e.g. apache#22450, `FilePruner`). ## Rationale for this change `DynamicFilterPhysicalExpr` has a rich *producer* API (`update()`, `mark_complete()`, `wait_update()`, `wait_complete()`), but *consumers* that hold a predicate which *contains* dynamic filters only had a bare, recursive `snapshot_generation() -> u64`. Call sites hand-rolled the same boilerplate around it: store a `last_generation`, recompute `snapshot_generation(&predicate)` (a full tree walk) on **every** check, diff it, and rebuild an expensive `PruningPredicate` on change. `FilePruner` did exactly this, and none of these consumers exploited `mark_complete()`. This adds a small consumer-side counterpart so the pattern lives in one place, driven by the existing `watch` channel rather than by re-walking the tree. This immediately eliminates some tree traversals (we were constantly traversing the expression tree to check if any filters updated). Long term I hope this makes changes like apache#22450 easier. ## What changes are included in this PR? **New public API (`datafusion_physical_expr`):** - `DynamicFilterTracking` (`classify` → `Static` / `AllComplete` / `Watching`, plus `contains_dynamic_filter` / `watcher`) and `DynamicFilterTracker` (`changed`). A tracker walks a (possibly composite) predicate **once**, subscribes to every still-incomplete dynamic filter, and answers `changed()` by polling only that shrinking set — steady-state is one atomic load per filter, no tree walk, no lock until something actually moves. - Lives in the `expressions::dynamic_filters` module (`dynamic_filters.rs` → `dynamic_filters/mod.rs`, tracker in `dynamic_filters/tracker.rs`). The subscription plumbing (`subscribe`, `DynamicFilterSubscription`, `DynamicFilterChange`, `observe`, `is_complete`) is `pub(crate)`; test-only constructors are `#[cfg(test)]`. **Consumers:** - `FilePruner` is driven by `DynamicFilterTracking` instead of `snapshot_generation` polling, and now decides its own existence in `try_new` (a static predicate with no usable stats builds no pruner). - The Parquet opener skips wrapping the scan in `EarlyStoppingStream` when nothing can change, and no longer needs an "is it dynamic?" gate. ## Are these changes tested? Yes — unit tests for the tracker (classification, detect-update-once, `mark_complete` is not a change, coalesced update+complete, multi-filter), plus the existing `datafusion-pruning` / `datafusion-datasource-parquet` suites (incl. the static/dynamic/partition opener pruning test) pass unchanged. ## Are there any user-facing changes? New public API as above (additive). One **deprecation** and one behavior change, both documented in the [DataFusion 55.0.0 upgrade guide](docs/source/library-user-guide/upgrading/55.0.0.md): - `datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr` is **deprecated (since 55.0.0)** — downcast to `DynamicFilterPhysicalExpr` or use `DynamicFilterTracking`. (`snapshot_generation` itself is unchanged — still backing the FFI vtable and proto roundtrip.) - `FilePruner::try_new` now returns `None` for a purely static predicate over a file with no usable column statistics (previously `Some` whenever a statistics struct was present). ## Followups I noticed a possible follow-up gate refinement, tracked in apache#22495. This also opens up the possibility to deprecate / remove the `snapshot` / `generation` machinery from the public physical expr APIs. These new APIs (the watchers, tracker) subsumes much of the functionality, and I don't think we want to add `PhysicalExpr::watch`. And after several releases the only thing using it right now is dynamic filters, i.e. no other legitimate use case has materialized. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
cdd9b25 to
458db12
Compare
Two CI failures on PR apache#22450: 1. **cargo doc** — broken intra-doc link in `ParquetFileMetrics::row_groups_pruned_dynamic_filter`. Switch from `[\`row_groups_pruned_statistics\`]` to `[\`Self::row_groups_pruned_statistics\`]` so rustdoc can resolve it. 2. **sqllogictest substrait round-trip** — adding `dynamic_rg_pruning=eligible` to ParquetSource's `fmt_extra` output shifted every `EXPLAIN` line that already showed a `DynamicFilter` predicate. Add the marker to 13 SLT expectations: - clickbench, explain_analyze, limit, limit_pruning, dynamic_filter_pushdown_config, preserve_file_partitioning, projection_pushdown, push_down_filter_parquet, push_down_filter_regression, repartition_subset_satisfaction, sort_pushdown, statistics_registry, topk - 134 marker insertions total, all on `DataSourceExec:` lines whose predicate contains `DynamicFilter [`. Two summary-level analyze tests also need the new `row_groups_pruned_dynamic_filter=0` counter in their metrics block (`limit_pruning.slt`, `dynamic_filter_pushdown_config.slt`). Dev-level analyze output elides zero-valued counters so the other files don't need it. No behavior change beyond what was already in the previous commit.
Per Copilot review on apache#22450: `RowGroupPruner` was using a single `predicate_creation_errors` counter for both predicate construction (`build_pruning_predicate`) AND predicate evaluation (`PruningPredicate::prune`) failures. The log message also said "Ignoring error building..." when the failure was during evaluation. This misattributed evaluation failures and made the metric semantics inconsistent with the static row-group pruning path in `RowGroupAccessPlanFilter::prune_by_statistics`, which already separates the two. `RowGroupPruner::new` now takes both counters: - `predicate_creation_errors`: bumped on `build_pruning_predicate` failures. Wired to `prepared.predicate_creation_errors` from the opener — same field the static path uses. - `predicate_evaluation_errors`: bumped on `PruningPredicate::prune` failures. Wired to `prepared.file_metrics.predicate_evaluation_errors` — same field the static `prune_by_statistics` path uses, so the two paths accumulate into a shared counter. The error log message is updated to say "evaluating" so the metric and the log agree.
Per @adriangb's review on apache#22450: dynamic filters that are already complete at scan-startup (the typical hash-join case, where the build side finishes before the probe scan starts) gain nothing from per-RG decoder splitting — the static `PruningPredicate` path at file-open has used the final filter value, and every subsequent `RowGroupPruner` check returns the same answer. We were paying a per-RG decoder allocation cost for no gain. Tighten the gate in the opener from is_dynamic_physical_expr(p) to is_dynamic_physical_expr(p) && !is_filter_complete(p) where `is_filter_complete` tree-walks the predicate and returns true only when every `DynamicFilterPhysicalExpr` node has been `mark_complete()`d. Keeps splitting on for TopK (never complete during scan) and for concurrent hash-join where the build finishes after the probe started reading (incomplete at scan-startup → still expected to tighten). Filters that are already complete at scan-start fall through to the existing single-run path. Five new unit tests cover the helper's tree-walk semantics: - static predicate (vacuously complete) - single incomplete dynamic filter - single complete dynamic filter (post `mark_complete()`) - composite predicate with both incomplete and complete nodes → incomplete - composite predicate with both nodes complete → complete 134 parquet unit + 204 integration + all SLT pass; clippy `-D warnings` clean.
|
Thanks! I feel this may be simplified even further by apache/arrow-rs#9968? |
Nice @adriangb , that does look much cleaner. |
|
fwiw that should go out in next minor release, so 2-4 weeks until it's available in DataFusion |
Good, i will wait it. |
|
Happy to see #22744 is ready, i will rework this after it merged. |
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
…+ into_builder pruning Replaces the upfront 'build one ParquetPushDecoder per run' design with a single live decoder driven via try_next_reader, plus a lightweight VecDeque<RgPlanEntry> tracking remaining row groups. At each RG boundary the runtime pruner peeks the next RG's stats against the current dynamic filter; if it can be pruned, we drop the head of the plan and rebuild the decoder via ParquetPushDecoder::into_builder (arrow-rs 59.0.0 via apache#22744) with a tightened with_row_groups list. Per adriangb's suggestion on apache#22450. Wins over the previous multi-decoder design: - No wasted decoder construction for pruned runs (the old code eagerly built N decoders, then discarded the ones the pruner rejected). - Per-RG pruning granularity instead of per-run (the old code could only skip whole runs; consecutive prunable RGs split across runs paid the boundary tax). - Single mid-flight decoder reuses buffered bytes across into_builder rebuilds (arrow-rs preserves them). Critical correctness fix during development: the rg_plan must use prepared_access_plan.row_group_indexes (the POST-reorder list emitted by ParquetAccessPlan::prepare → reorder_by_statistics + reverse) — not the natural-order access_plan.row_group_indexes() captured before prepare. The reorder runs whenever sort_order_for_reorder is set; if the pruner consults RG metadata in natural order while the decoder yields readers in stats-optimal order, it checks the wrong stats and either drops live rows or rebuilds a decoder over the wrong RGs. Regression covered by the new dynamic_rg_pruning_handles_sort_pushdown_reorder integration test: that test was written to fail under the pre-fix code and pass under the fix, double-verified by temporarily reintroducing the buggy ordering during review. Trade-off accepted, with follow-up: the OLD design used split_runs to build separate decoders for fully-matched vs needs-filter runs, so fully-matched RGs paid zero row-filter eval. With one shared decoder the row_filter stays installed across all RGs, and arrow-rs has no public API to clear it once set (only with_row_filter setter exists). Net impact is small — decode (the dominant per-row cost) still runs on fully-matched RGs anyway; only the row_filter.evaluate call is wasted. Restoring the optimization cleanly requires either an upstream ParquetPushDecoderBuilder::without_row_filter, or re-introducing the per-section decoder split inside the new state machine. Tracked as follow-up. Net diff: -107 lines. Dead code from the multi-decoder path (ParquetAccessPlan::split_runs + RowGroupRun + has_fully_matched + RowFilterGenerator::has_row_filter + four split_runs unit tests) is removed. All affected tests pass: 142 datasource-parquet lib tests + 206 parquet_integration (including the new regression test) + SLT sweep across dynamic_row_group_pruning, sort_pushdown, limit_pruning, push_down_filter_regression, clickbench, projection_pushdown, push_down_filter_parquet, dynamic_filter_pushdown_config, preserve_file_partitioning, limit.
adriangb
left a comment
There was a problem hiding this comment.
Did some quick review but need to find more time to look in details.
I want to double check:
- Logic for deciding when we need to go down a different path. I see this is using
DynamicFilterTracker::classifybut I'm not sure if we should always or optionally create a RowGroupPruner`, etc. - Logic for splitting runs. I thought the APIs we added in arrow-rs are flexible enough to not require splitting runs, they allow pausing a run at row group boundaries and changing it (e.g. skipping subsequent row groups) but maybe it doesn't have the right apis...
| // The pruner consults the predicate's `snapshot_generation` so the | ||
| // cost is one rebuild per dynamic-filter update, not per RG check. |
There was a problem hiding this comment.
Is this comment about snapshot_generation still current? I thought #22460 superseeded that.
There was a problem hiding this comment.
Addressed in latest PR.
Thanks @adriangb for the review! (2) splitting runs: Already gone — 7e1aae2 collapses to a single decoder paused at RG boundaries, rebuilt via decoder.into_builder().with_row_groups(remaining).build() to skip pruned RGs. RowGroupRun / (1) when to create RowGroupPruner: Agreed, current gate is too loose. Static predicates were already fully consumed by prune_by_statistics at open time, so a runtime pruner over them is wasted work. Will tighten Inline on snapshot_generation: Stale — the pruner already uses the DynamicFilterTracker watcher from #22460. Will fix the comment in the same follow-up. |
|
Makes sense thanks! Could you do a pass of updating comments + PR description and maybe restacking the commits for easier review? |
I agree, PR description updated. I'll do the comment pass + commit restack tomorrow. |
…tes only Address adriangb's review on apache#22450: - Build `RowGroupPruner` only when `DynamicFilterTracking::classify` reports `Watching`. Static and already-complete predicates were fully consumed by `prune_by_statistics` at file open, so re-evaluating them per RG boundary was wasted work. - Refresh stale comments left over from the multi-decoder design that the `into_builder` refactor superseded (push_decoder.rs module doc + struct docs, source.rs `fmt_extra` marker comment, decoder_projection.rs doc). - Drop the `snapshot_generation` reference from opener/mod.rs — the pruner uses the `DynamicFilterTracker` watch channel from apache#22460.
Closes apache#22407. DataFusion already prunes parquet at three granularities — file (`EarlyStoppingStream` + `FilePruner`), row group at scan-startup (`PruningPredicate` → `RowGroupAccessPlanFilter`), and row inside an open RG (`RowFilter`). There's a gap in the middle: once row-group pruning runs at file open, that decision is frozen because any dynamic filter is still `lit(true)` at that point. As `TopK` tightens its threshold at runtime, subsequent RGs in the already-opened file keep getting decoded even when their stats already prove they cannot beat the threshold. This is the dominant cost for `ORDER BY ... LIMIT` queries on multi-RG files where file-level pruning can't help. This PR closes the gap with a single decoder paused at row-group boundaries, a pruner consulted between row groups, and the decoder rebuilt via `into_builder()` to skip the row groups the pruner just rejected. Three coordinated pieces: 1. `RowGroupPruner` (`push_decoder.rs`) mirrors `FilePruner` at row-group granularity. Uses `DynamicFilterTracker` (apache#22460) to subscribe once to every not-yet-complete dynamic filter; `tracker.changed()` is a single atomic load — no tree traversal per check. The cached `PruningPredicate` is rebuilt only when a watched filter has actually moved. Predicate construction errors and predicate evaluation errors are counted into two separate metrics. 2. Single-decoder iteration model (`PushDecoderStreamState::transition`). The opener builds one `ParquetPushDecoder` from the prepared access plan, and the stream uses arrow-rs 59's `ParquetRecordBatchReader` iterator to pause at row-group boundaries. At each boundary the pruner is consulted against the head of `rg_plan`; pruned indices are dropped and the decoder is rebuilt via `decoder.into_builder().with_row_groups(remaining).build()` so the skipped RGs are bypassed entirely. Already-fetched buffered bytes for downstream RGs carry across the rebuild. 3. Gate: build the pruner only when `DynamicFilterTracking::classify(&predicate)` reports `Watching` AND more than one row group remains. Static or already-complete predicates were fully consumed by `prune_by_statistics` at file open, so re-evaluating them per RG boundary would be wasted work. Observability: - New `Count` metric `row_groups_pruned_dynamic_filter` on `ParquetFileMetrics`. - New `dynamic_rg_pruning=eligible` marker on `ParquetSource::fmt_extra` (Default + Verbose), emitted when the predicate has a still-watching dynamic portion. Benchmarks (`benchmarks/sort_pushdown_inexact`, 5 iterations): | Query | main | this PR | Δ | |---|---|---|---| | Q1 `ORDER BY l_orderkey DESC LIMIT 100` | 6.99 ms | 3.80 ms | -46% | | Q2 `ORDER BY l_orderkey DESC LIMIT 1000` | 3.29 ms | 1.33 ms | -60% | | Q3 `SELECT * ... DESC LIMIT 100` | 11.17 ms | 9.91 ms | -11% | | Q4 `SELECT * ... DESC LIMIT 1000` | 9.28 ms | 7.95 ms | -14% | Tests: - 6 unit tests (3 in push_decoder.rs::tests for RowGroupPruner; 3 in source.rs::tests for the EXPLAIN marker). - 3 integration tests in `datafusion/core/tests/parquet/dynamic_row_group_pruning.rs`. - New SLT `dynamic_row_group_pruning.slt` covering both EXPLAIN surfaces. - `cargo clippy --all-targets --all-features -- -D warnings` clean.
3e23ad1 to
ef23530
Compare
|
All three done:
Ready for another look, thanks @adriangb ! |
|
run benchmark topk_tpch |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-rg-level-dynamic-pruning (ef23530) to d428760 (merge-base) diff using: topk_tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetopk_tpch — base (merge-base)
topk_tpch — branch
File an issue against this benchmark runner |
Which issue does this PR close?
Closes #22407.
Rationale for this change
DataFusion already prunes parquet at three granularities — file
(
EarlyStoppingStream+FilePruner), row group at scan-startup(
PruningPredicate→RowGroupAccessPlanFilter), and row inside anopen RG (
RowFilter).There's a gap in the middle: once row-group pruning runs at file open, that
decision is frozen because any dynamic filter is still
lit(true)atthat point. As
TopKtightens its threshold at runtime, subsequent RGs inthe already-opened file keep getting decoded even when their stats already
prove they cannot beat the threshold. This is the dominant cost for
ORDER BY ... LIMITqueries on multi-RG files where file-level pruningcan't help (single large file, or scrambled-RG multi-file).
See the issue for a full architectural diagram and a concrete trace
showing where the wasted I/O / decompression / decode lives.
What changes are included in this PR?
A single decoder paused at row-group boundaries, with a pruner consulted
between row groups and the decoder rebuilt via
into_builder()to skipthe row groups the pruner just rejected. Three coordinated pieces:
RowGroupPruner(datafusion/datasource-parquet/src/push_decoder.rs)mirrors
FilePrunerat row-group granularity. It uses theDynamicFilterTrackerAPI from feat(physical-expr): DynamicFilterTracker for cheap dynamic-filter change detection #22460 to subscribe once to everynot-yet-complete dynamic filter in the predicate;
tracker.changed()is a single atomic load — no tree traversal per check. The cached
PruningPredicateis rebuilt only when a watched filter has actuallymoved, then evaluated against the next pending row group's statistics
via the existing
RowGroupPruningStatisticsadapter. Predicateconstruction errors and predicate evaluation errors are counted into
two separate metrics so a flaky predicate path can never silently
drop data.
Single-decoder iteration model
(
PushDecoderStreamState::transition). The opener builds oneParquetPushDecoderfrom the prepared access plan, and the streamuses arrow-rs 59's
ParquetRecordBatchReaderiterator to pause atrow-group boundaries. At each boundary the pruner is consulted
against the head of
rg_plan(the remaining row-group indices). Ifthe pruner proves the head RG unwinnable, that index is dropped from
the plan and the decoder is rebuilt via
decoder.into_builder().with_row_groups(remaining).build()so theskipped RGs are bypassed entirely — no decode, no row-filter eval.
Already-fetched buffered bytes for downstream RGs carry across the
rebuild.
Gate: build the pruner only when the predicate actually moves.
The opener creates a
RowGroupPruneronly whenDynamicFilterTracking::classify(&predicate)reportsWatching(atleast one not-yet-complete dynamic filter) and more than one row
group remains in the access plan. Static or already-complete
predicates were fully consumed by
prune_by_statisticsat file open,so re-evaluating them per RG boundary would be wasted work.
The earlier multi-decoder design (
PendingDecoderRun,ParquetAccessPlan::split_runs,force_per_row_group) is removed —arrow-rs 59's
into_builder+with_row_groupsmakes a single decoderstrictly more capable.
Observability
Countmetricrow_groups_pruned_dynamic_filteronParquetFileMetricssurfaces the runtime saving.dynamic_rg_pruning=eligiblemarker onParquetSource'sEXPLAIN(fmt_extraDefault + Verbose) signals plan-timeeligibility, emitted whenever the predicate has a still-watching
dynamic portion. Eligible rather than true because the static
plan can't predict the runtime outcome.
Benchmarks (
benchmarks/sort_pushdown_inexact, 5 iterations)ORDER BY l_orderkey DESC LIMIT 100ORDER BY l_orderkey DESC LIMIT 1000SELECT * ... DESC LIMIT 100SELECT * ... DESC LIMIT 1000Narrow-projection queries gain the most — their per-RG cost is dominated
by metadata + sort-column read, which this PR eliminates for unwinnable
RGs. Wide-projection queries gain less because the kept RG's
all-column decode dominates total time, but still see meaningful
savings.
Are these changes tested?
Three layers:
push_decoder.rs::tests:RowGroupPrunerbasic pruning,tracker-driven dynamic-filter updates, fallback when the predicate
has no analyzable bounds.
source.rs::tests:dynamic_rg_pruning=eligiblemarkerpresent on dynamic predicate, absent on static predicate, absent
when there is no predicate at all.
datafusion/core/tests/parquet/dynamic_row_group_pruning.rs:asserts
row_groups_pruned_dynamic_filter >= 1end-to-end on a 5-RGORDER BY DESC LIMIT 5scan; a regression test for theprepare_access_planreorder bug that usesORDER BY ASCagainst afile written in descending value order so the sort-pushdown reorder
is exercised; and a quiet-without-TopK test that asserts the metric
stays at 0 (no spurious firing).
datafusion/sqllogictest/test_files/dynamic_row_group_pruning.slt:asserts both
EXPLAINsurfaces — plainEXPLAINshowsdynamic_rg_pruning=eligible, andEXPLAIN ANALYZEpinsrow_groups_pruned_dynamic_filter=4(five RGs, four pruned atruntime).
cargo clippy --all-targets --all-features -- -D warningsclean.Are there any user-facing changes?
Two visible additions, both opt-in via existing dynamic-filter
infrastructure:
row_groups_pruned_dynamic_filtercounter visible inEXPLAIN ANALYZEfor queries whose plan carries aDynamicFilterPhysicalExpr(today: only TopK withenable_topk_dynamic_filter_pushdown=true, which is the default).dynamic_rg_pruning=eligiblemarker visible inEXPLAINoutput for the same queries.
No config changes, no API breakage, no behavior change for queries
without a dynamic predicate.