diff --git a/vortex-datafusion/src/persistent/mod.rs b/vortex-datafusion/src/persistent/mod.rs index d3f9e886562..04fd7c7d242 100644 --- a/vortex-datafusion/src/persistent/mod.rs +++ b/vortex-datafusion/src/persistent/mod.rs @@ -41,7 +41,8 @@ mod tests { ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; use datafusion::execution::SessionStateBuilder; - use datafusion::prelude::SessionContext; + use datafusion::prelude::{SessionConfig, SessionContext}; + use datafusion_common::record_batch; use datafusion_datasource::file_format::format_as_file_type; use datafusion_expr::LogicalPlanBuilder; use datafusion_physical_plan::display::DisplayableExecutionPlan; @@ -190,6 +191,103 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_dynamic_filter_pushdown() -> anyhow::Result<()> { + let factory = VortexFormatFactory::new(); + let config = SessionConfig::default().with_target_partitions(1); + let mut session_state_builder = SessionStateBuilder::new() + .with_default_features() + .with_config(config); + register_vortex_format_factory(factory, &mut session_state_builder); + let session = SessionContext::new_with_state(session_state_builder.build()); + + // Write the first table + let dir1 = TempDir::new()?; + let data = session.read_batch( + record_batch!( + ("a", Int32, vec![1, 2, 3]), + ("b", Utf8, vec!["x", "y", "z"]) + ) + .unwrap(), + )?; + let logical_plan = LogicalPlanBuilder::copy_to( + data.logical_plan().clone(), + dir1.path().to_str().unwrap().to_string(), + format_as_file_type(Arc::new(VortexFormatFactory::new())), + Default::default(), + vec![], + )? + .build()?; + session + .execute_logical_plan(logical_plan) + .await? + .collect() + .await?; + + // Write the second table + let dir2 = TempDir::new()?; + let data = session.read_batch( + record_batch!(("a", Int32, vec![2, 3, 4]), ("c", Int32, vec![20, 30, 40])).unwrap(), + )?; + let logical_plan = LogicalPlanBuilder::copy_to( + data.logical_plan().clone(), + dir2.path().to_str().unwrap().to_string(), + format_as_file_type(Arc::new(VortexFormatFactory::new())), + Default::default(), + vec![], + )? + .build()?; + session + .execute_logical_plan(logical_plan) + .await? + .collect() + .await?; + + // Validate the output by reading back the written files + session + .sql(&format!( + "CREATE EXTERNAL TABLE t1 \ + (a INT NOT NULL, b STRING) \ + STORED AS vortex + LOCATION '{}/';", + dir1.path().to_str().unwrap() + )) + .await?; + session + .sql(&format!( + "CREATE EXTERNAL TABLE t2 \ + (a INT NOT NULL, c INT) \ + STORED AS vortex + LOCATION '{}/';", + dir2.path().to_str().unwrap() + )) + .await?; + + let result = session + .sql( + "EXPLAIN ANALYZE SELECT t1.a, t1.b FROM t1 JOIN t2 ON t1.a = t2.a WHERE t2.c > 30;", + ) + .await? + .collect() + .await?; + + let plan = format!("{}", pretty_format_batches(&result)?); + + println!("EXPLAIN ANALYZE:\n{}", plan); + + let data_source_line = plan + .lines() + .filter(|line| line.contains("DataSourceExec")) + .skip(1) + .next() + .ok_or_else(|| { + vortex_err!("Plan should have 2 DataSourceExec, the second is the probe side") + })?; + assert!(data_source_line.contains("DynamicFilterPhysicalExpr [ a@0 >= 1 AND a@0 <= 3 ]")); + + Ok(()) + } + #[tokio::test] async fn create_table_ordered_by() -> anyhow::Result<()> { let dir = TempDir::new().unwrap(); diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 278affe8cd4..a072e8490b5 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -37,13 +37,8 @@ pub(crate) struct VortexOpener { pub object_store: Arc, /// Projection by index of the file's columns pub projection: Option>, - /// Filter expression optimized for pushdown into Vortex scan operations. - /// This may be a subset of file_pruning_predicate containing only expressions - /// that Vortex can efficiently evaluate. - pub filter: Option, - /// Filter expression used by DataFusion's FilePruner to eliminate files based on - /// statistics and partition values without opening them. - pub file_pruning_predicate: Option, + /// Predicate expression optimized for pushdown into Vortex scan operations. + pub predicate: Option, pub expr_adapter_factory: Option>, pub schema_adapter_factory: Arc, /// Hive-style partitioning columns @@ -64,8 +59,7 @@ impl FileOpener for VortexOpener { fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> DFResult { let object_store = self.object_store.clone(); let projection = self.projection.clone(); - let mut filter = self.filter.clone(); - let file_pruning_predicate = self.file_pruning_predicate.clone(); + let mut predicate = self.predicate.clone(); let expr_adapter_factory = self.expr_adapter_factory.clone(); let partition_fields = self.partition_fields.clone(); let file_cache = self.file_cache.clone(); @@ -93,7 +87,8 @@ impl FileOpener for VortexOpener { // opening them based on: // - Partition column values (e.g., date=2024-01-01) // - File-level statistics (min/max values per column) - let mut file_pruner = file_pruning_predicate + let mut file_pruner = predicate + .clone() .map(|predicate| { // Only create pruner if we have dynamic expressions or file statistics // to work with. Static predicates without stats won't benefit from pruning. @@ -140,7 +135,7 @@ impl FileOpener for VortexOpener { // The adapter rewrites the expression to the local file schema, allowing // for schema evolution and divergence between the table's schema and individual files. - filter = filter + predicate = predicate .map(|filter| { let expr = expr_adapter_factory .create(logical_schema.clone(), physical_file_schema.clone()) @@ -206,20 +201,30 @@ impl FileOpener for VortexOpener { ); } - let filter = filter + let predicate = predicate .and_then(|f| { let exprs = split_conjunction(&f) .into_iter() + .filter_map(|e| { + if is_dynamic_physical_expr(&e) { + e.snapshot().ok().flatten() + } else { + Some(Arc::clone(e)) + } + }) .filter(|expr| can_be_pushed_down(expr, &predicate_file_schema)) .collect::>(); - make_vortex_predicate(&exprs).transpose() + // TODO: make `make_vortex_predicate` accept `&[Arc]` instead of `&[&Arc]` + // or maybe `impl Iterator>>` or something like that? + let expr_refs = exprs.iter().collect::>(); + make_vortex_predicate(&expr_refs).transpose() }) .transpose() .map_err(|e| DataFusionError::External(e.into()))?; if let Some(limit) = limit - && filter.is_none() + && predicate.is_none() { scan_builder = scan_builder.with_limit(limit); } @@ -227,7 +232,7 @@ impl FileOpener for VortexOpener { let stream = scan_builder .with_metrics(metrics) .with_projection(projection_expr) - .with_some_filter(filter) + .with_some_filter(predicate) .with_ordered(has_output_ordering) .map(|chunk| chunk.to_struct().into_record_batch()) .into_stream() @@ -263,6 +268,7 @@ impl FileOpener for VortexOpener { }) .try_flatten() .map(move |batch| batch.and_then(|b| schema_mapping.map_batch(b))) + .map(|b| b) .boxed(); Ok(stream) @@ -423,11 +429,10 @@ mod tests { Field::new("a", DataType::Int32, false), ])); - let make_opener = |filter| VortexOpener { + let make_opener = |predicate| VortexOpener { object_store: object_store.clone(), projection: Some([0].into()), - filter: Some(filter), - file_pruning_predicate: None, + predicate: Some(predicate), expr_adapter_factory: expr_adapter_factory.clone(), schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), partition_fields: vec![Arc::new(Field::new("part", DataType::Int32, false))], @@ -504,11 +509,10 @@ mod tests { // Table schema has can accommodate both files let table_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])); - let make_opener = |filter| VortexOpener { + let make_opener = |predicate| VortexOpener { object_store: object_store.clone(), projection: Some([0].into()), - filter: Some(filter), - file_pruning_predicate: None, + predicate: Some(predicate), expr_adapter_factory: expr_adapter_factory.clone(), schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), partition_fields: vec![], diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index fc1fa76071b..725107c213f 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -18,7 +18,7 @@ use datafusion_physical_expr_adapter::{ }; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_plan::filter_pushdown::{ - FilterPushdownPropagation, PushedDown, PushedDownPredicate, + FilterPushdownPropagation, PushedDown, }; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::{DisplayFormatType, PhysicalExpr}; @@ -42,11 +42,8 @@ use crate::convert::exprs::can_be_pushed_down; pub struct VortexSource { pub(crate) file_cache: VortexFileCache, /// Combined predicate expression containing all filters from DataFusion query planning. - /// Used with FilePruner to skip files based on statistics and partition values. - pub(crate) full_predicate: Option, - /// Subset of predicates that can be pushed down into Vortex scan operations. - /// These are expressions that Vortex can efficiently evaluate during scanning. - pub(crate) vortex_predicate: Option, + /// Used with FilePruner to skip files based on statistics and partition values as well as pushing down to Vortex. + pub(crate) predicate: Option, pub(crate) batch_size: Option, pub(crate) projected_statistics: Option, /// This is the file schema the table expects, which is the table's schema without partition columns, and **not** the file's physical schema. @@ -66,8 +63,7 @@ impl VortexSource { Self { file_cache, metrics, - full_predicate: None, - vortex_predicate: None, + predicate: None, batch_size: None, projected_statistics: None, arrow_file_schema: None, @@ -137,8 +133,7 @@ impl FileSource for VortexSource { let opener = VortexOpener { object_store, projection, - filter: self.vortex_predicate.clone(), - file_pruning_predicate: self.full_predicate.clone(), + predicate: self.predicate.clone(), expr_adapter_factory, schema_adapter_factory, partition_fields: base_config.table_partition_cols.clone(), @@ -181,7 +176,7 @@ impl FileSource for VortexSource { } fn filter(&self) -> Option> { - self.vortex_predicate.clone() + self.predicate.clone() } fn metrics(&self) -> &ExecutionPlanMetricsSet { @@ -194,7 +189,7 @@ impl FileSource for VortexSource { .clone() .vortex_expect("projected_statistics must be set"); - if self.vortex_predicate.is_some() { + if self.predicate.is_some() { Ok(statistics.to_inexact()) } else { Ok(statistics) @@ -208,13 +203,13 @@ impl FileSource for VortexSource { fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - if let Some(ref predicate) = self.vortex_predicate { + if let Some(ref predicate) = self.predicate { write!(f, ", predicate: {predicate}")?; } } // Use TreeRender style key=value formatting to display the predicate DisplayFormatType::TreeRender => { - if let Some(ref predicate) = self.vortex_predicate { + if let Some(ref predicate) = self.predicate { writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?; }; } @@ -233,59 +228,40 @@ impl FileSource for VortexSource { )); }; + if filters.is_empty() { + return Ok(FilterPushdownPropagation::with_parent_pushdown_result( + vec![], + )); + } + let mut source = self.clone(); // Combine new filters with existing predicate for file pruning. - // This full predicate is used by FilePruner to eliminate files. - source.full_predicate = match source.full_predicate { + // This full predicate is used by FilePruner to eliminate files as well as what we'll attempt to push down to Vortex. + source.predicate = match &source.predicate { Some(predicate) => Some(conjunction( - std::iter::once(predicate).chain(filters.clone()), + std::iter::once(predicate.clone()).chain(filters.clone()), )), None => Some(conjunction(filters.clone())), }; + // Collect which filters we can *definitely* push down to Vortex w/ full evaluation. + // For these filters we can tell our caller that we will handle them fully, they don't need to be evaluated upstream. let supported_filters = filters .into_iter() .map(|expr| { if can_be_pushed_down(&expr, schema) { - PushedDownPredicate::supported(expr) + PushedDown::Yes } else { - PushedDownPredicate::unsupported(expr) + PushedDown::No } }) .collect::>(); - if supported_filters - .iter() - .all(|p| matches!(p.discriminant, PushedDown::No)) - { - return Ok(FilterPushdownPropagation::with_parent_pushdown_result( - vec![PushedDown::No; supported_filters.len()], - ) - .with_updated_node(Arc::new(source) as _)); - } - - let supported = supported_filters - .iter() - .filter_map(|p| match p.discriminant { - PushedDown::Yes => Some(&p.predicate), - PushedDown::No => None, - }) - .cloned(); - - let predicate = match source.vortex_predicate { - Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)), - None => conjunction(supported), - }; - - tracing::debug!(%predicate, "Saving predicate"); - - source.vortex_predicate = Some(predicate); - - Ok(FilterPushdownPropagation::with_parent_pushdown_result( - supported_filters.iter().map(|f| f.discriminant).collect(), + Ok( + FilterPushdownPropagation::with_parent_pushdown_result(supported_filters) + .with_updated_node(Arc::new(source)), ) - .with_updated_node(Arc::new(source) as _)) } fn with_schema_adapter_factory(