Skip to content

PhysicalExprSimplifier folds DynamicFilterPhysicalExpr wrapper, breaking live link to TopK updates #22949

@zhuqi-lucas

Description

@zhuqi-lucas

Describe the bug

`PhysicalExprSimplifier` does not recognize `DynamicFilterPhysicalExpr` as a live mutable wrapper. When run over an expression that contains a `DynamicFilterPhysicalExpr`, the simplifier folds the wrapper into a stale literal snapshot, so subsequent `update()` calls on the original wrapper are no longer visible through the simplified output.

This is in contrast to the schema adapter (`DefaultPhysicalExprAdapterFactory`), which uses `TreeNode::rewrite` + `with_new_children` and preserves the wrapper (and its `Arc`) correctly.

To Reproduce

use std::sync::Arc;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::physical_expr::expressions::{BinaryExpr, Column, DynamicFilterPhysicalExpr, lit};
use datafusion::physical_expr_adapter::{DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory};
use datafusion::physical_expr::simplifier::PhysicalExprSimplifier;
use datafusion::physical_plan::PhysicalExpr;
use datafusion_expr::Operator;

// Schemas with a column-position mismatch so the adapter has work to do.
let logical: SchemaRef = Arc::new(Schema::new(vec![
    Field::new(\"ticker\", DataType::Int32, false),
]));
let physical: SchemaRef = Arc::new(Schema::new(vec![
    Field::new(\"extra\", DataType::Int64, false),
    Field::new(\"ticker\", DataType::Int32, false),
]));

// Build a DynamicFilterPhysicalExpr wrapping ticker > 100.
let ticker_col: Arc<dyn PhysicalExpr> =
    Arc::new(Column::new_with_schema(\"ticker\", logical.as_ref()).unwrap());
let inner: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
    Arc::clone(&ticker_col),
    Operator::Gt,
    lit(100_i32),
));
let original: Arc<DynamicFilterPhysicalExpr> =
    Arc::new(DynamicFilterPhysicalExpr::new(vec![Arc::clone(&ticker_col)], inner));

// Path 1: adapter only -- wrapper survives, live link preserved.
let adapter = DefaultPhysicalExprAdapterFactory
    .create(Arc::clone(&logical), Arc::clone(&physical))
    .unwrap();
let adapted = adapter
    .rewrite(Arc::clone(&original) as Arc<dyn PhysicalExpr>)
    .unwrap();
assert!(
    adapted.as_any().is::<DynamicFilterPhysicalExpr>(),
    \"adapter should preserve DynamicFilterPhysicalExpr wrapper\",
);

// Path 2: adapter + simplifier -- wrapper is folded into a stale snapshot.
let simplifier = PhysicalExprSimplifier::new(&physical);
let adapted_and_simplified = simplifier.simplify(adapter.rewrite(Arc::clone(&original) as Arc<dyn PhysicalExpr>).unwrap()).unwrap();

// Update the original AFTER simplification.
let new_inner: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
    Arc::clone(&ticker_col),
    Operator::Gt,
    lit(500_i32),
));
original.update(new_inner).unwrap();

// Path 1's output sees the update; path 2's output is stuck at the
// pre-update snapshot, so anything downstream (e.g. PruningPredicate
// built from it) silently uses the stale threshold.

Expected behavior

`PhysicalExprSimplifier` should treat `DynamicFilterPhysicalExpr` as opaque (return it unchanged from the simplification pass), the same way `TreeNode::rewrite` does via `with_new_children`. The wrapper's `Arc` is the point of contact for live updates from TopK / HashJoin runtime feedback, and folding it breaks that contract.

Actual behavior

The simplifier descends into the wrapper, fetches the current snapshot, and folds based on that snapshot. The returned expression is no longer a `DynamicFilterPhysicalExpr` and is decoupled from any subsequent `update()` on the original wrapper. Any downstream consumer (e.g. `build_pruning_predicate`) then sees a stale literal where it expected a live filter.

Why this matters

This came up in a real downstream setup where a custom parquet opener uses `DynamicFilterPhysicalExpr` from a TopK SortExec as the input to incremental row-group pruning. The pruner rebuilds its `PruningPredicate` whenever `snapshot_generation()` changes; if the predicate is fed through the standard schema-adaptation helper (which composes adapter + simplifier), the resulting pruning predicate is a no-op against the live filter. Observed end-to-end impact:

  • `row_groups_pruned_statistics`: 0 (expected ≈ 99% pruned)
  • `bytes_scanned`: 91 MB (expected ≈ 136 KB)
  • `time_elapsed_processing`: 31 s (expected ≈ 100 ms)

The dyn filter itself was correctly populated and visible in `EXPLAIN ANALYZE`; only the simplifier-folded copy used by the pruner was stale.

Suggested fix

Either:

  1. Add a special-case in `PhysicalExprSimplifier` that recognizes `DynamicFilterPhysicalExpr` and returns it untouched.
  2. Expose a marker trait or method on `PhysicalExpr` (e.g. `is_live_mutable() -> bool` defaulting to `false`) that the simplifier checks before descending, and have `DynamicFilterPhysicalExpr` override it to `true`. This is more general and would cover any future live wrappers.

Happy to open a PR once we agree on direction.

Workaround

Use the schema adapter alone (`DefaultPhysicalExprAdapterFactory::create().rewrite(...)`) without composing it with `PhysicalExprSimplifier` when the predicate may contain a `DynamicFilterPhysicalExpr`. The adapter handles column-ref remapping and casts correctly; the simplifier's null-folding step is not necessary when the input predicate is a dynamic filter (its column set is fixed by construction so no missing-column substitution can fire).

Additional context

DataFusion version: 53 (also reproduces on main as of 2026-06-15).

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions