Skip to content

feat(physical-expr): DynamicFilterTracker for cheap dynamic-filter change detection#22460

Draft
adriangb wants to merge 2 commits into
apache:mainfrom
adriangb:dynamic-filter-tracker
Draft

feat(physical-expr): DynamicFilterTracker for cheap dynamic-filter change detection#22460
adriangb wants to merge 2 commits into
apache:mainfrom
adriangb:dynamic-filter-tracker

Conversation

@adriangb
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

  • Closes #.

N/A — extracted from a design discussion around the duplicated "does this filter have a dynamic portion that might change?" / "has the filter changed?" patterns (e.g. #22450, FilePruner). Happy to file a tracking issue if preferred.

Rationale for this change

DynamicFilterPhysicalExpr has a rich producer API (update(), mark_complete(), wait_update(), wait_complete()), but consumers that hold a predicate which contains dynamic filters have only a bare, recursive snapshot_generation() -> u64. Several call sites hand-roll the same boilerplate around it:

  • store a last_generation: Option<u64>,
  • recompute snapshot_generation(&predicate) (a full tree walk) on every check,
  • diff it, and rebuild an expensive derived artifact (e.g. a PruningPredicate) on change.

FilePruner does exactly this today, and the runtime row-group pruner in #22450 reimplements the identical dance. None of them exploit mark_complete(), so they keep re-walking the tree even after the filters can no longer change.

This PR adds a small consumer-side counterpart so the pattern lives in one place.

What changes are included in this PR?

New API (datafusion-physical-expr):

  • DynamicFilterPhysicalExpr::subscribe() -> DynamicFilterSubscription and is_complete(). A subscription observes a single filter through its existing tokio::sync::watch channel — steady-state polling is a single atomic load; the lock is taken only when the filter actually moved. A bare mark_complete() (which re-broadcasts the current generation) is distinguished from a real expression change, so it does not trigger a spurious rebuild.
  • DynamicFilterTracker walks a (possibly composite) predicate once, subscribing to every still-incomplete dynamic filter, then answers changed() by polling only that shrinking set (completed filters are dropped). No more re-folding snapshot_generation() over the whole tree on every batch.
  • DynamicFilterTracking::classify returns Static / AllComplete / Watching(..) in one traversal, so a caller can decide both "is a one-shot prune worthwhile?" and "do I need to keep re-checking?".

Consumer migration:

  • FilePruner now classifies its predicate once at construction and rebuilds the pruning predicate on the first check + only when a watched filter moves.
  • The Parquet opener skips wrapping the scan in EarlyStoppingStream when the predicate is Static/AllComplete — the up-front prune_file check already captured everything such a predicate can prune, so per-batch re-checking was pure overhead.

This is intentionally scoped as a draft. Natural follow-ups: migrate the runtime RowGroupPruner (#22450) onto the same tracker; replace the remaining is_dynamic_physical_expr / free-function snapshot_generation call sites; and (separately) decide whether the snapshot_generation() trait method + FFI entry can eventually be retired.

Are these changes tested?

Yes — new unit tests in dynamic_filter_tracker.rs cover: static / already-complete / watching classification, detecting an update exactly once, mark_complete() not counting as a change, a coalesced update+complete reported once, and independent tracking of multiple filters in a composite predicate. Existing datafusion-pruning and datafusion-datasource-parquet suites pass unchanged.

Are there any user-facing changes?

New public API in datafusion-physical-expr (DynamicFilterTracker, DynamicFilterTracking, DynamicFilterSubscription, DynamicFilterChange, and DynamicFilterPhysicalExpr::{subscribe, is_complete}). No behavior change for end users beyond avoiding redundant per-batch re-pruning work for non-dynamic predicates.

🤖 Generated with Claude Code

adriangb and others added 2 commits May 22, 2026 09:55
Introduce a consumer-side counterpart to the producer's `update()` /
`mark_complete()` API on `DynamicFilterPhysicalExpr`.

`DynamicFilterPhysicalExpr::subscribe()` returns a `DynamicFilterSubscription`
that observes updates to a single filter through its existing `watch` channel:
steady-state polling is a single atomic load, and the lock is taken only when
the filter has actually moved. A bare `mark_complete()` (which re-broadcasts the
current generation) is correctly distinguished from a real expression change.

`DynamicFilterTracker` walks a (possibly composite) predicate once, subscribing
to every still-incomplete dynamic filter, then answers "did anything change
since I last looked?" by polling only that shrinking set — replacing the pattern
of recursively folding `snapshot_generation()` over the whole tree on every
check. `DynamicFilterTracking::classify` distinguishes Static / AllComplete /
Watching in a single traversal so callers can decide both whether a one-shot
prune is worthwhile and whether runtime re-checking is needed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace `FilePruner`'s hand-rolled `snapshot_generation()` polling (store last
`u64`, recompute + diff on every `should_prune`) with a `DynamicFilterTracking`
classification computed once at construction. The pruner now rebuilds the
pruning predicate on the first check and thereafter only when a watched dynamic
filter has actually moved.

This also lets the Parquet opener skip wrapping the scan in `EarlyStoppingStream`
when the predicate is static or its dynamic filters are already complete: the
up-front `prune_file` check already captured everything such a predicate can
prune, so per-batch re-checking was pure overhead.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@github-actions github-actions Bot added physical-expr Changes to the physical-expr crates datasource Changes to the datasource crate labels May 22, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

datasource Changes to the datasource crate physical-expr Changes to the physical-expr crates

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant