Limit pushdown with filters + sort-aware reverse output#7
Conversation
Signed-off-by: Christoph Schulze <christoph.schulze@polygon.io>
Signed-off-by: Christoph Schulze <christoph.schulze@polygon.io>
zhuqi-lucas
left a comment
There was a problem hiding this comment.
LGTM, two things worth addressing before merge:
-
Test gap —
tasks.rs:182(reserved < true_countbranch where the LIMIT
straddles a split mid-way andmask.limit(reserved)trims the mask) is not
covered.order_by_desc_limit_returns_largest_rowsonly exercises the
"first split fills the budget" fast path; later splits short-circuit at the
top-of-function check (line 87-91) without entering the filter+reserve
block. Worth a test that tunes filter selectivity so LIMIT crosses a split
boundary — it's the most subtle correctness path in this PR. -
Doc nit on
try_reverse_output— please pin the mental model: this is
"scan emits rows already in target order, atomic counter caps emission,"
not the dynamic-filter-pushdown shape used byParquetSourcefor unsorted
ORDER BY x LIMIT k. The SortExec(TopK) above the rebuilt source is a
passthrough cap + cross-split merge, not the correctness guarantor. One
sentence prevents a future reader coming from the Parquet path from
misreading the design.
| /// checker. The planner keeps the upstream `SortPreservingMergeExec` but the reversed | ||
| /// scan still produces locally-sorted streams in the requested order, which is what | ||
| /// enables filter + LIMIT pushdown to short-circuit on the first file. | ||
| fn try_reverse_output( |
There was a problem hiding this comment.
So actually vortex is using exact reverse, it supports reverse reading, we can add some comments about this.
There was a problem hiding this comment.
I updated the doc comment and added a todo for when we bump to DF 53 to switch to the exact hint.
Signed-off-by: Christoph Schulze <christoph.schulze@polygon.io>
Summary
Two related perf gaps surfaced while investigating reverse order scans:
LIMITis silently dropped when a filter is set.ScanBuilderbailed outright onfilter + limit, andVortexOpenerworked around it by gatingwith_limitonfilter.is_none(). The result was thatSELECT … WHERE x > k LIMIT nalways scanned every file end-to-end.ORDER BY ts DESCagainst a table declaredWITH ORDER (ts ASC)doesn't exploit reverse scanning.VortexSource::with_reversedalready existed andScanBuilderalready supported reverse intra-file scans, but noFileSource::try_reverse_outputhook was wired up. So DataFusion always inserted a fullSortExecinstead of letting the source absorb the reversal.Together they meant that the canonical "latest N matching rows" query (e.g.
SELECT * FROM t WHERE x > k ORDER BY ts DESC LIMIT 100over time-partitioned files) read every file and every row. After the changes in this PR it typically reads only the last file, short-circuiting once the limit is satisfied.SELECT * FROM t WHERE x > k ORDER BY ts DESC LIMIT 100over time-partitionedfiles now executes as:
ORDER BY ts DESC, table's output ordering ists ASC. CallsVortexSource::try_pushdown_sort.Exactwithreversed=true. DataFusion reversesfile_groups(so the latest file comes first) and removes theSortExec.x > kviatry_pushdown_filters(already in place).LIMIT 100viawith_fetchonDataSourceExec(already in place; passes through tobase_config.limit).filter.is_none()guard).remaining = 40.remaining = 40, reserves 40, exits early after enough rows. Done.reversedis now surfaced infmt_extraso plan rendering showsreversed: true, which makes the pushdown observable in tests.Edge cases.
Unsupported.is_reverseis strict; we returnUnsupported, the planner falls back to a fullSortExec. Correct, just not optimal.reversed = !reversed).Risks & limitations
compare_exchange_weakallows spurious failures; bounded by physical concurrency. 32-thread stress test (reserve_up_to_concurrent_never_oversubscribes) exercises this.LimitExectruncates. We pay extra IO, never wrong rows.ORDER BY ts DESC, id ASC(multi-key with mixed direction)is_reverseis strict; we returnUnsupported, planner falls back toSortExec. Correct, just not optimal.SortExeceven after reverse pushdownInexact(Parquet does the same on 52). The TopK still benefits fromfetchpushdown and from the source emitting locally-sorted streams.API Changes
VortexSource::with_reversed(bool) -> SelfVortexSource::try_reverse_output(&self, &[PhysicalSortExpr], &EquivalenceProperties) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>>ScanBuilder::with_reversed(bool) -> SelfandScanBuilder::reversed(&self) -> bool(already existed in the type, now stamped into the public-api lock).public-api.lockfiles regenerated viacargo xtask public-api.Testing
vortex-scan/src/tasks.rs(CAS unit tests):reserve_up_to_drains_counter— sequential reservations drain to zero.reserve_up_to_concurrent_never_oversubscribes— 32-thread stress proves theCAS loop never lets the counter underflow or hand out more than the initial
budget.
vortex-file/src/tests.rs(filter + limit at the file level):limit_with_filter_truncates_globally— multi-chunk scan with a filter andlimit=7returns exactly 7 rows. (Previously this combination would havebailed in
ScanBuilder::prepare.)limit_zero_with_filter_yields_no_rows—limit=0short-circuits even whena filter is present.
vortex-datafusion/src/persistent/mod.rs(sort pushdown):reverse_pushdown_swaps_file_order—WITH ORDER (ts ASC)+ORDER BY ts DESC LIMIT kproduces the largest k values, the plan showsSortExec(TopK)and the source is markedreversed: true.reverse_pushdown_rejected_for_unrelated_ordering—WITH ORDER (a ASC),query asks
ORDER BY b DESC. Plan keeps theSortand source is notreversed.
reverse_pushdown_no_ordering_declared— noWITH ORDER→ no reversepushdown.
reverse_pushdown_then_filter— filter pushdown still works alongsidereverse sort pushdown.
order_by_desc_limit_returns_largest_rows— end-to-end synergy: 5 files of100 rows each, filter +
ORDER BY ts DESC LIMIT 10returns ts[490..500]and the source is reversed in the plan.