Adaptive (runtime, stats-based) conjunct reordering for FilterExec#22698
Adaptive (runtime, stats-based) conjunct reordering for FilterExec#22698adriangb wants to merge 12 commits into
Conversation
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (5e71ea4) to 85bc5ef (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (5e71ea4) to 85bc5ef (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (5e71ea4) to 85bc5ef (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
run benchmarks env:
DATAFUSION_EXECUTION_ADAPTIVE_FILTER_REORDERING: true |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (5e71ea4) to 85bc5ef (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (5e71ea4) to 85bc5ef (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (5e71ea4) to 85bc5ef (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
run benchmarks env:
DATAFUSION_EXECUTION_ADAPTIVE_FILTER_REORDERING: true |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (5e71ea4) to 85bc5ef (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
a24471d to
4d7b733
Compare
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (be79329) to 0f8a121 (merge-base) diff using: predicate_eval File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (be79329) to 0f8a121 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (be79329) to 0f8a121 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (be79329) to 0f8a121 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagepredicate_eval — base (merge-base)
predicate_eval — branch
File an issue against this benchmark runner |
…s no proposal" This reverts commit be79329. The exploration trials won the correlated case (predicate_eval q73, 1.28x) but dragged tied and guard-pattern filters through per-query trial churn: CI showed TPC-DS 4 faster / 12 slower and predicate_eval cost_q10/q11 at 1.15-1.16x. Two rounds of statistical hardening (sequential early stopping, a 5% adoption margin) moved the cost between shapes rather than removing it — exploration's wins amortize across queries, but its costs are paid per query, so it structurally needs cross-query persistence of the learned order (cf. DuckDB's multi-file adaptive filter cache) before it can be on by default. Parked, with the early-stopping/margin work, for a follow-up on top of order persistence. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
|
run benchmark predicate_eval env:
DATAFUSION_EXECUTION_ADAPTIVE_FILTER_REORDERING: true |
|
run benchmarks env:
DATAFUSION_EXECUTION_ADAPTIVE_FILTER_REORDERING: true |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (796bca3) to 0f8a121 (merge-base) diff using: predicate_eval File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (796bca3) to 0f8a121 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (796bca3) to 0f8a121 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (796bca3) to 0f8a121 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
run benchmark predicate_eval env:
DATAFUSION_EXECUTION_ADAPTIVE_FILTER_REORDERING: true |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (796bca3) to 0f8a121 (merge-base) diff using: predicate_eval File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagepredicate_eval — base (merge-base)
predicate_eval — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagepredicate_eval — base (merge-base)
predicate_eval — branch
File an issue against this benchmark runner |
…biter
Split the responsibility of coordinating "which arrangement should a
collection of filters execute with" from what an arrangement is. The
champion/challenger machinery FilterExec's adaptive reordering uses — the
epoch-broadcast champion, the shared paired A/B trial ledger, the
CI-separated verdict, the rejected-candidate memory — was entangled with
ordering-specific types (conjunct permutations, the evaluation strategy)
inside datafusion-physical-plan, but none of it depends on what is being
decided.
Move it into the adaptive substrate as AdaptiveArbiter<A>, generic over an
opaque arrangement type: FilterExec instantiates it with
{order, strategy}, and an adaptive parquet scan deciding filter *placement*
(row filter vs post-scan, i.e. when to late-materialize — cf. the apache#22144
experiment, which already tracks the same per-filter stats) can instantiate
it with a placement assignment while reusing the identical trial protocol
and the existing SelectivityStats/AdaptiveStatsRegistry. The substrate now
has three policy-free pieces: per-filter measurement, the concurrent
registry, and decision coordination; consumers own proposals, scheduling,
and arrangement semantics.
No behavior change: FilterExec's adaptive path is a mechanical rewire
(AdaptiveFilterShared = registry + arbiter), verified by the unchanged unit
suite and an unchanged perf-harness profile.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
|
run benchmarks env:
DATAFUSION_EXECUTION_ADAPTIVE_FILTER_REORDERING: true |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (917e543) to 0f8a121 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (917e543) to 0f8a121 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (917e543) to 0f8a121 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Which issue does this PR close?
(static cheap/expensive heuristic reordering). No single issue; happy to file
one if useful.
Rationale for this change
Predicate evaluation order matters: running a selective predicate first lets it
gate the work of the predicates after it. The static cheap/expensive heuristic
(#22343) sorts conjuncts into two cost classes and stable-sorts within each, so
it does nothing to order multiple similarly-expensive predicates; and
BinaryExpr'sANDshort-circuit only gates on a leftmost selective conjunct.So a conjunction of several expensive predicates whose selective member is not
written first is evaluated with every predicate scanning ~every row — and
neither mechanism fixes it.
This PR adds runtime, statistics-based conjunct reordering for
FilterExec:it measures each conjunct's selectivity and cost on the rows that actually reach
it and runs the ones that discard the most rows per unit of CPU time first.
Maximising discards-per-second is exactly minimising
cost_per_row / (1 - pass_rate),the classic optimal ordering key for independent conjuncts.
It is off by default (
datafusion.execution.adaptive_filter_reordering).What changes are included in this PR?
Split into four reviewable commits:
physical-expr-common: adaptive selectivity-stats substrate — apolicy-free
SelectivityStats(online selectivity + cost with Welfordmean/variance and confidence bounds) and a concurrent
AdaptiveStatsRegistry.Reusable by other consumers (e.g. a future parquet-scan integration).
common: config flagexecution.adaptive_filter_reordering(defaultfalse), plus regenerated
configs.md/information_schemalisting.physical-plan: adaptive conjunct reordering inFilterExec— astream-local evaluator that:
BinaryExpr's pre-selection) and measures each marginally;statistically certain (adjacent confidence intervals stop overlapping),
or after a small sample cap if the conjuncts are indistinguishable;
ANDin the learnedorder and evaluates it as an ordinary predicate (no measurement overhead,
inherits
BinaryExprpre-selection);drift, so steady-state overhead decays toward zero.
State is stream-local; the plan, results, and
EXPLAINare unchanged.Tests — an end-to-end
.sltasserting identical results/plan with theflag on and off.
Are these changes tested?
Yes:
SelectivityStats, registry) and theFilterExecevaluator (gating correctness, certainty-freeze, re-thaw backoff,drift adaptation).
adaptive_filter.slt: results andEXPLAINidentical with the flag on/off.Are there any user-facing changes?
One new config option,
datafusion.execution.adaptive_filter_reordering(experimental, default false). When enabled, the order in which a conjunctive
filter's predicates are evaluated may change at runtime; results are unchanged,
but observable side effects of fallible predicates could differ (predicates
containing volatile expressions are never reordered).