Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 43 additions & 48 deletions datafusion/datasource-parquet/src/opener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ use datafusion_common::{ColumnStatistics, Result, ScalarValue, Statistics, exec_
use datafusion_datasource::{PartitionedFile, TableSchema};
use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion_physical_expr_common::physical_expr::{
PhysicalExpr, is_dynamic_physical_expr,
};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory,
Expand Down Expand Up @@ -618,18 +616,19 @@ impl ParquetMorselizer {
.with_category(MetricCategory::Rows)
.global_counter("num_predicate_creation_errors");

// Apply literal replacements to projection and predicate
let file_pruner = predicate
.as_ref()
.filter(|p| is_dynamic_physical_expr(p) || partitioned_file.has_statistics())
.and_then(|p| {
FilePruner::try_new(
Arc::clone(p),
&logical_file_schema,
&partitioned_file,
predicate_creation_errors.clone(),
)
});
// `FilePruner::try_new` decides whether a pruner is worthwhile (it needs
// a statistics struct, and either real column statistics or a dynamic
// filter that can prune via partition-value folding) and returns `None`
// otherwise. For a static predicate the pruner's tracker reports no
// changes, so it runs once and adds no ongoing cost.
let file_pruner = predicate.as_ref().and_then(|p| {
FilePruner::try_new(
Arc::clone(p),
&logical_file_schema,
&partitioned_file,
predicate_creation_errors.clone(),
)
});

Ok(PreparedParquetOpen {
partition_index: self.partition_index,
Expand Down Expand Up @@ -677,30 +676,21 @@ impl PreparedParquetOpen {
/// Returns `None` if the file can be skipped completely.
fn prune_file(mut self) -> Result<Option<Self>> {
// Prune this file using the file level statistics and partition values.
// Since dynamic filters may have been updated since planning it is possible that we are able
// to prune files now that we couldn't prune at planning time.
// It is assumed that there is no point in doing pruning here if the predicate is not dynamic,
// as it would have been done at planning time.
// We'll also check this after every record batch we read,
// and if at some point we are able to prove we can prune the file using just the file level statistics
// we can end the stream early.
//
// Make a FilePruner only if there is either
// 1. a dynamic expr in the predicate
// 2. the file has file-level statistics.
//
// File-level statistics may prune the file without loading
// any row groups or metadata.
// Since dynamic filters may have been updated since planning it is
// possible that we are able to prune files now that we couldn't prune at
// planning time. The `FilePruner` (built when the predicate is dynamic or
// the file carries statistics) also watches any still-active dynamic
// filter, so the
// `EarlyStoppingStream` wrapping the scan can re-check after each batch
// and end the stream early once a tightened filter proves the file can
// be skipped.
//
// Dynamic filters may prune the file after initial
// planning, as the dynamic filter is updated during
// execution.
//
// The case where there is a dynamic filter but no
// statistics corresponds to a dynamic filter that
// references partition columns. While rare, this is possible
// e.g. `select * from table order by partition_col limit
// 10` could hit this condition.
// File-level statistics may prune the file without loading any row
// groups or metadata. Partition column predicates are already folded to
// literals (see `replace_columns_with_literals` above), so a dynamic
// filter that references only partition columns can prune here too even
// when the file has no column statistics, e.g.
// `select * from t order by partition_col limit 10`.
if let Some(file_pruner) = &mut self.file_pruner
&& file_pruner.should_prune()?
{
Expand Down Expand Up @@ -1247,16 +1237,21 @@ impl RowGroupsPrunedParquetOpen {
}
.into_stream();

// Wrap the stream so a dynamic filter can stop the file scan early.
if let Some(file_pruner) = prepared.file_pruner {
Ok(EarlyStoppingStream::new(
stream,
file_pruner,
files_ranges_pruned_statistics,
)
.boxed())
} else {
Ok(stream)
// Wrap the stream so a dynamic filter can stop the file scan early, but
// only when the pruner is still watching a filter that can change
// mid-scan. For a static (or already-complete) predicate the up-front
// `prune_file` check already captured everything that can be pruned, so
// per-batch re-checking would only add overhead.
match prepared.file_pruner {
Some(file_pruner) if file_pruner.is_watching() => {
Ok(EarlyStoppingStream::new(
stream,
file_pruner,
files_ranges_pruned_statistics,
)
.boxed())
}
_ => Ok(stream),
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions datafusion/physical-expr-common/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,12 @@ pub fn snapshot_generation(expr: &Arc<dyn PhysicalExpr>) -> u64 {
/// Check if the given `PhysicalExpr` is dynamic.
/// Internally this calls [`snapshot_generation`] to check if the generation is non-zero,
/// any dynamic `PhysicalExpr` should have a non-zero generation.
#[deprecated(
since = "55.0.0",
note = "Downcast to `DynamicFilterPhysicalExpr`, or use \
`DynamicFilterTracking::classify(expr).contains_dynamic_filter()` from \
`datafusion_physical_expr`"
)]
pub fn is_dynamic_physical_expr(expr: &Arc<dyn PhysicalExpr>) -> bool {
// If the generation is non-zero, then this `PhysicalExpr` is dynamic.
snapshot_generation(expr) != 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ use datafusion_common::{
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr_common::physical_expr::DynHash;

mod tracker;
pub use tracker::{DynamicFilterTracker, DynamicFilterTracking};

/// State of a dynamic filter, tracking both updates and completion.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum FilterState {
Expand Down Expand Up @@ -326,6 +329,31 @@ impl DynamicFilterPhysicalExpr {
.await;
}

/// Returns `true` if this filter has been marked complete via
/// [`Self::mark_complete`] and will therefore never change again.
pub(crate) fn is_complete(&self) -> bool {
self.inner.read().is_complete
}

/// Subscribe to this filter's updates for cheap, synchronous change
/// detection.
///
/// The returned [`DynamicFilterSubscription`] lets a consumer poll whether
/// the filter's expression has advanced since it last looked, without
/// re-walking a predicate tree or re-deriving a generation on every check.
/// This is the building block used by [`DynamicFilterTracker`](tracker::DynamicFilterTracker)
/// to watch every dynamic filter inside a (possibly composite) predicate.
pub(crate) fn subscribe(&self) -> DynamicFilterSubscription {
let mut receiver = self.state_watch.subscribe();
// Mark the current state as already-seen so the first `observe()` only
// reports updates that happen *after* subscription.
let last_generation = receiver.borrow_and_update().generation();
DynamicFilterSubscription {
receiver,
last_generation,
}
}

/// Check if this dynamic filter is being actively used by any consumers.
///
/// Returns `true` if there are references beyond the producer (e.g., the HashJoinExec
Expand Down Expand Up @@ -522,6 +550,66 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
}
}

/// The result of polling a [`DynamicFilterSubscription`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct DynamicFilterChange {
/// The filter's expression advanced since the previous observation.
pub(crate) changed: bool,
/// The filter has been marked complete; it will never change again and the
/// subscription can be dropped.
pub(crate) complete: bool,
}

/// A cheap, synchronous handle for observing updates to a single
/// [`DynamicFilterPhysicalExpr`].
///
/// Obtained via [`DynamicFilterPhysicalExpr::subscribe`]. Steady-state polling
/// via [`Self::observe`] is a single atomic load (the underlying
/// [`tokio::sync::watch`] version counter); the lock is only taken when the
/// filter has actually been updated.
#[derive(Debug)]
pub(crate) struct DynamicFilterSubscription {
receiver: watch::Receiver<FilterState>,
/// Last generation we reported as "seen". Used to distinguish a real
/// expression update from a bare [`DynamicFilterPhysicalExpr::mark_complete`]
/// (which re-broadcasts the current generation without changing the
/// expression).
last_generation: u64,
}

impl DynamicFilterSubscription {
/// Observe the latest state of the filter.
///
/// Reports whether the filter's expression advanced since the previous call
/// and whether it has since been marked complete. Cheap when nothing has
/// changed: a single atomic comparison with no lock acquisition.
pub(crate) fn observe(&mut self) -> DynamicFilterChange {
match self.receiver.has_changed() {
Ok(true) => {
let state = *self.receiver.borrow_and_update();
let changed = state.generation() > self.last_generation;
if changed {
self.last_generation = state.generation();
}
DynamicFilterChange {
changed,
complete: matches!(state, FilterState::Complete { .. }),
}
}
Ok(false) => DynamicFilterChange {
changed: false,
complete: false,
},
// The sender was dropped: no further updates are possible, so treat
// the subscription as complete.
Err(_) => DynamicFilterChange {
changed: false,
complete: true,
},
}
}
}

/// An atomic counter used to generate monotonic u64 ids.
struct ExpressionIdAtomicCounter {
inner: AtomicU64,
Expand Down
Loading
Loading