diff --git a/datafusion/datasource-parquet/src/opener/mod.rs b/datafusion/datasource-parquet/src/opener/mod.rs index f138a26bf4701..5b40a947d9ea4 100644 --- a/datafusion/datasource-parquet/src/opener/mod.rs +++ b/datafusion/datasource-parquet/src/opener/mod.rs @@ -52,9 +52,7 @@ use datafusion_common::{ColumnStatistics, Result, ScalarValue, Statistics, exec_ use datafusion_datasource::{PartitionedFile, TableSchema}; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; -use datafusion_physical_expr_common::physical_expr::{ - PhysicalExpr, is_dynamic_physical_expr, -}; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory, @@ -618,18 +616,19 @@ impl ParquetMorselizer { .with_category(MetricCategory::Rows) .global_counter("num_predicate_creation_errors"); - // Apply literal replacements to projection and predicate - let file_pruner = predicate - .as_ref() - .filter(|p| is_dynamic_physical_expr(p) || partitioned_file.has_statistics()) - .and_then(|p| { - FilePruner::try_new( - Arc::clone(p), - &logical_file_schema, - &partitioned_file, - predicate_creation_errors.clone(), - ) - }); + // `FilePruner::try_new` decides whether a pruner is worthwhile (it needs + // a statistics struct, and either real column statistics or a dynamic + // filter that can prune via partition-value folding) and returns `None` + // otherwise. For a static predicate the pruner's tracker reports no + // changes, so it runs once and adds no ongoing cost. + let file_pruner = predicate.as_ref().and_then(|p| { + FilePruner::try_new( + Arc::clone(p), + &logical_file_schema, + &partitioned_file, + predicate_creation_errors.clone(), + ) + }); Ok(PreparedParquetOpen { partition_index: self.partition_index, @@ -677,30 +676,21 @@ impl PreparedParquetOpen { /// Returns `None` if the file can be skipped completely. fn prune_file(mut self) -> Result> { // Prune this file using the file level statistics and partition values. - // Since dynamic filters may have been updated since planning it is possible that we are able - // to prune files now that we couldn't prune at planning time. - // It is assumed that there is no point in doing pruning here if the predicate is not dynamic, - // as it would have been done at planning time. - // We'll also check this after every record batch we read, - // and if at some point we are able to prove we can prune the file using just the file level statistics - // we can end the stream early. - // - // Make a FilePruner only if there is either - // 1. a dynamic expr in the predicate - // 2. the file has file-level statistics. - // - // File-level statistics may prune the file without loading - // any row groups or metadata. + // Since dynamic filters may have been updated since planning it is + // possible that we are able to prune files now that we couldn't prune at + // planning time. The `FilePruner` (built when the predicate is dynamic or + // the file carries statistics) also watches any still-active dynamic + // filter, so the + // `EarlyStoppingStream` wrapping the scan can re-check after each batch + // and end the stream early once a tightened filter proves the file can + // be skipped. // - // Dynamic filters may prune the file after initial - // planning, as the dynamic filter is updated during - // execution. - // - // The case where there is a dynamic filter but no - // statistics corresponds to a dynamic filter that - // references partition columns. While rare, this is possible - // e.g. `select * from table order by partition_col limit - // 10` could hit this condition. + // File-level statistics may prune the file without loading any row + // groups or metadata. Partition column predicates are already folded to + // literals (see `replace_columns_with_literals` above), so a dynamic + // filter that references only partition columns can prune here too even + // when the file has no column statistics, e.g. + // `select * from t order by partition_col limit 10`. if let Some(file_pruner) = &mut self.file_pruner && file_pruner.should_prune()? { @@ -1247,16 +1237,21 @@ impl RowGroupsPrunedParquetOpen { } .into_stream(); - // Wrap the stream so a dynamic filter can stop the file scan early. - if let Some(file_pruner) = prepared.file_pruner { - Ok(EarlyStoppingStream::new( - stream, - file_pruner, - files_ranges_pruned_statistics, - ) - .boxed()) - } else { - Ok(stream) + // Wrap the stream so a dynamic filter can stop the file scan early, but + // only when the pruner is still watching a filter that can change + // mid-scan. For a static (or already-complete) predicate the up-front + // `prune_file` check already captured everything that can be pruned, so + // per-batch re-checking would only add overhead. + match prepared.file_pruner { + Some(file_pruner) if file_pruner.is_watching() => { + Ok(EarlyStoppingStream::new( + stream, + file_pruner, + files_ranges_pruned_statistics, + ) + .boxed()) + } + _ => Ok(stream), } } } diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 887ed73745c73..0e0efaf758f69 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -870,6 +870,12 @@ pub fn snapshot_generation(expr: &Arc) -> u64 { /// Check if the given `PhysicalExpr` is dynamic. /// Internally this calls [`snapshot_generation`] to check if the generation is non-zero, /// any dynamic `PhysicalExpr` should have a non-zero generation. +#[deprecated( + since = "55.0.0", + note = "Downcast to `DynamicFilterPhysicalExpr`, or use \ + `DynamicFilterTracking::classify(expr).contains_dynamic_filter()` from \ + `datafusion_physical_expr`" +)] pub fn is_dynamic_physical_expr(expr: &Arc) -> bool { // If the generation is non-zero, then this `PhysicalExpr` is dynamic. snapshot_generation(expr) != 0 diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters/mod.rs similarity index 92% rename from datafusion/physical-expr/src/expressions/dynamic_filters.rs rename to datafusion/physical-expr/src/expressions/dynamic_filters/mod.rs index 5b9de882160aa..6ef848e218fb1 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters/mod.rs @@ -29,6 +29,9 @@ use datafusion_common::{ use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::DynHash; +mod tracker; +pub use tracker::{DynamicFilterTracker, DynamicFilterTracking}; + /// State of a dynamic filter, tracking both updates and completion. #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum FilterState { @@ -326,6 +329,31 @@ impl DynamicFilterPhysicalExpr { .await; } + /// Returns `true` if this filter has been marked complete via + /// [`Self::mark_complete`] and will therefore never change again. + pub(crate) fn is_complete(&self) -> bool { + self.inner.read().is_complete + } + + /// Subscribe to this filter's updates for cheap, synchronous change + /// detection. + /// + /// The returned [`DynamicFilterSubscription`] lets a consumer poll whether + /// the filter's expression has advanced since it last looked, without + /// re-walking a predicate tree or re-deriving a generation on every check. + /// This is the building block used by [`DynamicFilterTracker`] to watch + /// every dynamic filter inside a (possibly composite) predicate. + pub(crate) fn subscribe(&self) -> DynamicFilterSubscription { + let mut receiver = self.state_watch.subscribe(); + // Mark the current state as already-seen so the first `observe()` only + // reports updates that happen *after* subscription. + let last_generation = receiver.borrow_and_update().generation(); + DynamicFilterSubscription { + receiver, + last_generation, + } + } + /// Check if this dynamic filter is being actively used by any consumers. /// /// Returns `true` if there are references beyond the producer (e.g., the HashJoinExec @@ -522,6 +550,66 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { } } +/// The result of polling a [`DynamicFilterSubscription`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) struct DynamicFilterChange { + /// The filter's expression advanced since the previous observation. + pub(crate) changed: bool, + /// The filter has been marked complete; it will never change again and the + /// subscription can be dropped. + pub(crate) complete: bool, +} + +/// A cheap, synchronous handle for observing updates to a single +/// [`DynamicFilterPhysicalExpr`]. +/// +/// Obtained via [`DynamicFilterPhysicalExpr::subscribe`]. Steady-state polling +/// via [`Self::observe`] is a single atomic load (the underlying +/// [`tokio::sync::watch`] version counter); the lock is only taken when the +/// filter has actually been updated. +#[derive(Debug)] +pub(crate) struct DynamicFilterSubscription { + receiver: watch::Receiver, + /// Last generation we reported as "seen". Used to distinguish a real + /// expression update from a bare [`DynamicFilterPhysicalExpr::mark_complete`] + /// (which re-broadcasts the current generation without changing the + /// expression). + last_generation: u64, +} + +impl DynamicFilterSubscription { + /// Observe the latest state of the filter. + /// + /// Reports whether the filter's expression advanced since the previous call + /// and whether it has since been marked complete. Cheap when nothing has + /// changed: a single atomic comparison with no lock acquisition. + pub(crate) fn observe(&mut self) -> DynamicFilterChange { + match self.receiver.has_changed() { + Ok(true) => { + let state = *self.receiver.borrow_and_update(); + let changed = state.generation() > self.last_generation; + if changed { + self.last_generation = state.generation(); + } + DynamicFilterChange { + changed, + complete: matches!(state, FilterState::Complete { .. }), + } + } + Ok(false) => DynamicFilterChange { + changed: false, + complete: false, + }, + // The sender was dropped: no further updates are possible, so treat + // the subscription as complete. + Err(_) => DynamicFilterChange { + changed: false, + complete: true, + }, + } + } +} + /// An atomic counter used to generate monotonic u64 ids. struct ExpressionIdAtomicCounter { inner: AtomicU64, diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters/tracker.rs b/datafusion/physical-expr/src/expressions/dynamic_filters/tracker.rs new file mode 100644 index 0000000000000..c66f319c7f9f3 --- /dev/null +++ b/datafusion/physical-expr/src/expressions/dynamic_filters/tracker.rs @@ -0,0 +1,312 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tracking changes to the dynamic filters inside a predicate. +//! +//! Several operators (Parquet file/row-group pruning, remote execution, ...) +//! hold a predicate that *may* contain one or more +//! [`DynamicFilterPhysicalExpr`] nodes which are updated during execution +//! (e.g. a `TopK` tightening its threshold, or a `HashJoinExec` publishing the +//! build-side bounds). These consumers repeatedly ask two questions: +//! +//! 1. *"Does this predicate contain anything that can still change?"* — to +//! decide whether it is worth setting up runtime re-pruning at all. +//! 2. *"Has it changed since I last looked?"* — to decide whether to rebuild an +//! expensive derived artifact (e.g. a `PruningPredicate`). +//! +//! Historically each call site answered these by recursively folding +//! [`PhysicalExpr::snapshot_generation`] over the whole tree on *every* check +//! and diffing the resulting `u64`. [`DynamicFilterTracker`] replaces that with +//! a single up-front walk that subscribes to each still-incomplete dynamic +//! filter; subsequent checks only poll the (shrinking) set of subscriptions, +//! each of which is a cheap atomic load in the common "nothing changed" case. +//! +//! [`PhysicalExpr::snapshot_generation`]: crate::PhysicalExpr::snapshot_generation + +use std::sync::Arc; + +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; + +use crate::PhysicalExpr; + +use super::{DynamicFilterPhysicalExpr, DynamicFilterSubscription}; + +/// Classification of a predicate according to the dynamic filters it contains. +/// +/// Produced by [`DynamicFilterTracking::classify`] with a single tree walk so +/// callers can answer both "is it worth pruning at all?" and "do I need to keep +/// watching?" without traversing the predicate twice. +#[derive(Debug)] +pub enum DynamicFilterTracking { + /// The predicate contains no [`DynamicFilterPhysicalExpr`] at all. It is + /// fully static and will never change. + Static, + /// The predicate contains one or more dynamic filters, but all of them have + /// already been marked complete. Their *current* values may differ from + /// what was known at planning time (so a one-shot prune is still + /// worthwhile), but they will not change again — there is nothing to watch. + AllComplete, + /// The predicate contains at least one dynamic filter that can still change. + /// The embedded [`DynamicFilterTracker`] should be polled to detect updates. + Watching(DynamicFilterTracker), +} + +impl DynamicFilterTracking { + /// Walk `predicate` once and classify its dynamic-filter content, + /// subscribing to every filter that is not yet complete. + pub fn classify(predicate: &Arc) -> Self { + let mut subscriptions = Vec::new(); + let mut found_any = false; + predicate + .apply(|expr| { + if let Some(filter) = expr.downcast_ref::() { + found_any = true; + // Already-complete filters can never change again, so there + // is no point subscribing to them. + if !filter.is_complete() { + subscriptions.push(filter.subscribe()); + } + } + Ok(TreeNodeRecursion::Continue) + }) + .expect("traversal closure is infallible"); + + if !found_any { + DynamicFilterTracking::Static + } else if subscriptions.is_empty() { + DynamicFilterTracking::AllComplete + } else { + DynamicFilterTracking::Watching(DynamicFilterTracker { subscriptions }) + } + } + + /// `true` if the predicate contains any dynamic filter (complete or not), + /// i.e. its value may differ from what was known at planning time and is + /// therefore worth re-evaluating at least once. + pub fn contains_dynamic_filter(&self) -> bool { + !matches!(self, DynamicFilterTracking::Static) + } + + /// Mutable access to the underlying tracker when there is still something to + /// watch. + pub fn watcher(&mut self) -> Option<&mut DynamicFilterTracker> { + match self { + DynamicFilterTracking::Watching(tracker) => Some(tracker), + _ => None, + } + } +} + +/// Watches every still-incomplete [`DynamicFilterPhysicalExpr`] reachable from a +/// predicate and reports, cheaply, whether any of them has been updated since +/// the last check. +/// +/// Obtain one from [`DynamicFilterTracking::classify`] via +/// [`DynamicFilterTracking::watcher`]; the `Watching` variant carries it only +/// when there is at least one dynamic filter that can still change. +#[derive(Debug)] +pub struct DynamicFilterTracker { + /// Subscriptions to the not-yet-complete dynamic filters. Entries are + /// dropped as their filters complete, so the set only shrinks. + subscriptions: Vec, +} + +impl DynamicFilterTracker { + /// Returns `true` if any watched filter's expression has advanced since the + /// previous call. + /// + /// Filters that have completed are dropped from the watch set as they are + /// observed; once every filter has completed this is a no-op that always + /// returns `false`. + pub fn changed(&mut self) -> bool { + let mut changed = false; + self.subscriptions.retain_mut(|subscription| { + let change = subscription.observe(); + changed |= change.changed; + // Keep the subscription only while the filter can still change. + !change.complete + }); + changed + } +} + +#[cfg(test)] +impl DynamicFilterTracker { + /// Build a tracker directly, or `None` if `predicate` has no dynamic filter + /// that can still change. Test-only; production builds a tracker via + /// [`DynamicFilterTracking::classify`] + [`DynamicFilterTracking::watcher`]. + fn try_new(predicate: &Arc) -> Option { + match DynamicFilterTracking::classify(predicate) { + DynamicFilterTracking::Watching(tracker) => Some(tracker), + DynamicFilterTracking::Static | DynamicFilterTracking::AllComplete => None, + } + } + + /// `true` once every watched filter has completed and been dropped. + fn is_exhausted(&self) -> bool { + self.subscriptions.is_empty() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::expressions::{BinaryExpr, col, lit}; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_expr::Operator; + + /// `col > ` where the dynamic filter starts as `lit(true)`. + fn dynamic_predicate() -> (Arc, Arc) { + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let column = col("a", &schema).unwrap(); + let filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&column)], + lit(true), + )); + let predicate = Arc::new(BinaryExpr::new( + column, + Operator::Gt, + Arc::clone(&filter) as Arc, + )) as Arc; + (predicate, filter) + } + + #[test] + fn static_predicate_is_not_watched() { + let predicate = lit(true); + assert!(matches!( + DynamicFilterTracking::classify(&predicate), + DynamicFilterTracking::Static + )); + assert!(DynamicFilterTracker::try_new(&predicate).is_none()); + } + + #[test] + fn already_complete_filter_is_not_watched() { + let (predicate, filter) = dynamic_predicate(); + filter.mark_complete(); + + match DynamicFilterTracking::classify(&predicate) { + DynamicFilterTracking::AllComplete => {} + other => panic!("expected AllComplete, got {other:?}"), + } + // Still reported as dynamic (worth a one-shot prune)... + assert!(DynamicFilterTracking::classify(&predicate).contains_dynamic_filter()); + // ...but there is nothing to watch. + assert!(DynamicFilterTracker::try_new(&predicate).is_none()); + } + + #[test] + fn detects_update_exactly_once() { + let (predicate, filter) = dynamic_predicate(); + let mut tracker = DynamicFilterTracker::try_new(&predicate) + .expect("predicate has an incomplete dynamic filter"); + + // No update yet. + assert!(!tracker.changed()); + + filter.update(lit(false)).unwrap(); + // The update is reported once... + assert!(tracker.changed()); + // ...and not repeatedly. + assert!(!tracker.changed()); + } + + #[test] + fn mark_complete_does_not_count_as_a_change() { + let (predicate, filter) = dynamic_predicate(); + let mut tracker = DynamicFilterTracker::try_new(&predicate).unwrap(); + + filter.update(lit(false)).unwrap(); + assert!(tracker.changed()); + + // `mark_complete()` re-broadcasts the current generation without + // changing the expression: it must not trigger a spurious rebuild. + filter.mark_complete(); + assert!(!tracker.changed()); + // The filter has completed, so the tracker drains itself. + assert!(tracker.is_exhausted()); + } + + #[test] + fn coalesced_update_then_complete_is_one_change() { + let (predicate, filter) = dynamic_predicate(); + let mut tracker = DynamicFilterTracker::try_new(&predicate).unwrap(); + + // Update and complete before the tracker gets a chance to observe. + // The watch channel only retains the latest value, so the tracker sees + // `Complete` directly; it must still report the (final) change once. + filter.update(lit(false)).unwrap(); + filter.mark_complete(); + + assert!(tracker.changed()); + assert!(tracker.is_exhausted()); + assert!(!tracker.changed()); + } + + #[test] + fn watches_multiple_filters_independently() { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]); + let col_a = col("a", &schema).unwrap(); + let col_b = col("b", &schema).unwrap(); + let filter_a = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_a)], + lit(true), + )); + let filter_b = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_b)], + lit(true), + )); + let predicate = Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + col_a, + Operator::Gt, + Arc::clone(&filter_a) as Arc, + )), + Operator::And, + Arc::new(BinaryExpr::new( + col_b, + Operator::Lt, + Arc::clone(&filter_b) as Arc, + )), + )) as Arc; + + let mut tracker = DynamicFilterTracker::try_new(&predicate).unwrap(); + assert!(!tracker.changed()); + + filter_a.update(lit(false)).unwrap(); + assert!(tracker.changed()); + assert!(!tracker.changed()); + + filter_b.update(lit(false)).unwrap(); + assert!(tracker.changed()); + assert!(!tracker.changed()); + + // Completing one filter leaves the other still watched. + filter_a.mark_complete(); + assert!(!tracker.changed()); + assert!(!tracker.is_exhausted()); + + filter_b.mark_complete(); + assert!(!tracker.changed()); + assert!(tracker.is_exhausted()); + } +} diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index 7cf874c448ea0..05a04f88dcadf 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -45,7 +45,10 @@ pub use case::{CaseExpr, case}; pub use cast::{CastExpr, cast}; pub use column::{Column, col, with_new_schema}; pub use datafusion_expr::utils::format_state_name; -pub use dynamic_filters::{DynamicFilterPhysicalExpr, Inner as DynamicFilterInner}; +pub use dynamic_filters::{ + DynamicFilterPhysicalExpr, DynamicFilterTracker, DynamicFilterTracking, + Inner as DynamicFilterInner, +}; pub use in_list::{InListExpr, in_list}; pub use is_not_null::{IsNotNullExpr, is_not_null}; pub use is_null::{IsNullExpr, is_null}; diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 848bf81d15979..45b4a4b8619c0 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -58,6 +58,7 @@ pub use analysis::{AnalysisContext, ExprBoundaries, analyze}; pub use equivalence::{ AcrossPartitions, ConstExpr, EquivalenceProperties, calculate_union, }; +pub use expressions::{DynamicFilterTracker, DynamicFilterTracking}; pub use partitioning::{Distribution, Partitioning}; pub use physical_expr::{ add_offset_to_expr, add_offset_to_physical_sort_exprs, create_lex_ordering, diff --git a/datafusion/pruning/src/file_pruner.rs b/datafusion/pruning/src/file_pruner.rs index f850e0c0114fb..661832915c40f 100644 --- a/datafusion/pruning/src/file_pruner.rs +++ b/datafusion/pruning/src/file_pruner.rs @@ -22,7 +22,8 @@ use std::sync::Arc; use arrow::datatypes::{FieldRef, SchemaRef}; use datafusion_common::{Result, internal_datafusion_err, pruning::PrunableStatistics}; use datafusion_datasource::PartitionedFile; -use datafusion_physical_expr_common::physical_expr::{PhysicalExpr, snapshot_generation}; +use datafusion_physical_expr::DynamicFilterTracking; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_plan::metrics::Count; use log::debug; @@ -34,8 +35,14 @@ use crate::build_pruning_predicate; /// which substitutes partition column references with their literal values before /// the predicate reaches this pruner. pub struct FilePruner { - predicate_generation: Option, predicate: Arc, + /// Tracks the dynamic filters inside `predicate` so we only rebuild the + /// pruning predicate when one of them has actually moved. + tracking: DynamicFilterTracking, + /// Whether [`Self::should_prune`] has built+evaluated the pruning predicate + /// at least once. The first check always runs; subsequent checks only run + /// when a watched dynamic filter changed. + checked_once: bool, /// Schema used for pruning (the logical file schema). file_schema: SchemaRef, file_stats_pruning: PrunableStatistics, @@ -69,42 +76,72 @@ impl FilePruner { }) } - /// Create a new file pruner if statistics are available. - /// Returns None if this file does not have statistics. + /// Create a file pruner for this file, or `None` when pruning it cannot + /// help. + /// + /// Returns `None` when the file has no statistics struct to evaluate a + /// pruning predicate against, or when the predicate is purely static and the + /// file has no usable column statistics — in that case planning already did + /// everything such a pruner could. A predicate carrying a dynamic filter is + /// always accepted (given a statistics struct), since it may prune via + /// partition-value folding even without column statistics. pub fn try_new( predicate: Arc, file_schema: &SchemaRef, partitioned_file: &PartitionedFile, predicate_creation_errors: Count, ) -> Option { + // A pruning predicate is evaluated against a statistics struct, so one + // must exist (its columns may all be `Absent`). let file_stats = partitioned_file.statistics.as_ref()?; + let tracking = DynamicFilterTracking::classify(&predicate); + // Only build a pruner when it could prune something planning didn't + // already: the file has real column statistics, or the predicate carries + // a dynamic filter (whose value, or folded partition columns, can prune + // even without column statistics). For a purely static predicate with no + // usable stats there is nothing to gain. + if !partitioned_file.has_statistics() && !tracking.contains_dynamic_filter() { + return None; + } let file_stats_pruning = PrunableStatistics::new(vec![file_stats.clone()], Arc::clone(file_schema)); Some(Self { - predicate_generation: None, predicate, + tracking, + checked_once: false, file_schema: Arc::clone(file_schema), file_stats_pruning, predicate_creation_errors, }) } + /// Returns `true` if this pruner watches a dynamic filter that can still + /// change, meaning [`Self::should_prune`] is worth re-checking as the scan + /// progresses. When `false`, the predicate is effectively static for the + /// remainder of the scan and the caller can avoid wrapping the stream in a + /// per-batch re-pruning adapter. + pub fn is_watching(&self) -> bool { + matches!(self.tracking, DynamicFilterTracking::Watching(_)) + } + pub fn should_prune(&mut self) -> Result { - // Check if the predicate has changed since last invocation by tracking - // its "generation". Dynamic filter expressions can change their values - // during query execution, so we use generation tracking to detect when - // the predicate has been updated and needs to be rebuilt. + // Building the pruning predicate is expensive (it involves expression + // analysis), so we only do it on the first check and whenever a dynamic + // filter inside the predicate has actually moved. // - // If the generation hasn't changed, we can skip rebuilding the pruning - // predicate, which is an expensive operation involving expression analysis. - let new_generation = snapshot_generation(&self.predicate); - if let Some(current_generation) = self.predicate_generation.as_mut() { - if *current_generation == new_generation { - return Ok(false); - } - *current_generation = new_generation; + // Dynamic filter expressions can change their values during query + // execution; `DynamicFilterTracking` watches the still-incomplete + // filters and reports a change at most once per update. A purely static + // predicate (or one whose dynamic filters have all completed) is checked + // exactly once. + let should_build = if self.checked_once { + self.tracking.watcher().is_some_and(|w| w.changed()) } else { - self.predicate_generation = Some(new_generation); + self.checked_once = true; + true + }; + if !should_build { + return Ok(false); } let pruning_predicate = build_pruning_predicate( Arc::clone(&self.predicate), diff --git a/docs/source/library-user-guide/upgrading/55.0.0.md b/docs/source/library-user-guide/upgrading/55.0.0.md new file mode 100644 index 0000000000000..e9a86332cfc49 --- /dev/null +++ b/docs/source/library-user-guide/upgrading/55.0.0.md @@ -0,0 +1,72 @@ + + +# Upgrade Guides + +## DataFusion 55.0.0 + +**Note:** DataFusion `55.0.0` has not been released yet. The information provided +in this section pertains to features and changes that have already been merged +to the main branch and are awaiting release in this version. + +### `is_dynamic_physical_expr` is deprecated + +`datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr` is +deprecated. It was a thin wrapper over `snapshot_generation(expr) != 0` used to +ask "does this predicate contain a dynamic filter?". + +Prefer asking the question directly against the concrete type. For a one-off +check, downcast to `DynamicFilterPhysicalExpr`: + +```rust +use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; + +let mut is_dynamic = false; +predicate.apply(|e| { + if e.downcast_ref::().is_some() { + is_dynamic = true; + Ok(TreeNodeRecursion::Stop) + } else { + Ok(TreeNodeRecursion::Continue) + } +})?; +``` + +If you also need to know whether the dynamic filters can still change (and to be +notified when they do), use the new `DynamicFilterTracking` / +`DynamicFilterTracker` API in `datafusion_physical_expr`: + +```rust +use datafusion_physical_expr::DynamicFilterTracking; + +let tracking = DynamicFilterTracking::classify(&predicate); +if tracking.contains_dynamic_filter() { + // worth re-evaluating the predicate at runtime +} +``` + +### `FilePruner::try_new` no longer builds a pruner for static predicates without statistics + +`datafusion_pruning::FilePruner::try_new` now returns `None` when the predicate +is purely static _and_ the file carries no usable column statistics, because +such a pruner can never prune anything beyond what planning already did. +Previously it returned `Some` whenever a statistics struct was present (the +"is this worth pruning?" decision lived in the Parquet opener). Files with column +statistics, and predicates that carry a dynamic filter, are unaffected. diff --git a/docs/source/library-user-guide/upgrading/index.rst b/docs/source/library-user-guide/upgrading/index.rst index 1ed5eca2a5d2a..51c7f1413172b 100644 --- a/docs/source/library-user-guide/upgrading/index.rst +++ b/docs/source/library-user-guide/upgrading/index.rst @@ -21,6 +21,7 @@ Upgrade Guides .. toctree:: :maxdepth: 1 + DataFusion 55.0.0 <55.0.0> DataFusion 54.0.0 <54.0.0> DataFusion 53.0.0 <53.0.0> DataFusion 52.0.0 <52.0.0>