-
Notifications
You must be signed in to change notification settings - Fork 118
support DynamicFilterPhysicalExpr #4961
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,13 +37,8 @@ pub(crate) struct VortexOpener { | |
| pub object_store: Arc<dyn ObjectStore>, | ||
| /// Projection by index of the file's columns | ||
| pub projection: Option<Arc<[usize]>>, | ||
| /// Filter expression optimized for pushdown into Vortex scan operations. | ||
| /// This may be a subset of file_pruning_predicate containing only expressions | ||
| /// that Vortex can efficiently evaluate. | ||
| pub filter: Option<PhysicalExprRef>, | ||
| /// Filter expression used by DataFusion's FilePruner to eliminate files based on | ||
| /// statistics and partition values without opening them. | ||
| pub file_pruning_predicate: Option<PhysicalExprRef>, | ||
| /// Predicate expression optimized for pushdown into Vortex scan operations. | ||
| pub predicate: Option<PhysicalExprRef>, | ||
| pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>, | ||
| pub schema_adapter_factory: Arc<dyn SchemaAdapterFactory>, | ||
| /// Hive-style partitioning columns | ||
|
|
@@ -64,8 +59,7 @@ impl FileOpener for VortexOpener { | |
| fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> DFResult<FileOpenFuture> { | ||
| let object_store = self.object_store.clone(); | ||
| let projection = self.projection.clone(); | ||
| let mut filter = self.filter.clone(); | ||
| let file_pruning_predicate = self.file_pruning_predicate.clone(); | ||
| let mut predicate = self.predicate.clone(); | ||
| let expr_adapter_factory = self.expr_adapter_factory.clone(); | ||
| let partition_fields = self.partition_fields.clone(); | ||
| let file_cache = self.file_cache.clone(); | ||
|
|
@@ -93,7 +87,8 @@ impl FileOpener for VortexOpener { | |
| // opening them based on: | ||
| // - Partition column values (e.g., date=2024-01-01) | ||
| // - File-level statistics (min/max values per column) | ||
| let mut file_pruner = file_pruning_predicate | ||
| let mut file_pruner = predicate | ||
| .clone() | ||
| .map(|predicate| { | ||
| // Only create pruner if we have dynamic expressions or file statistics | ||
| // to work with. Static predicates without stats won't benefit from pruning. | ||
|
|
@@ -140,7 +135,7 @@ impl FileOpener for VortexOpener { | |
|
|
||
| // The adapter rewrites the expression to the local file schema, allowing | ||
| // for schema evolution and divergence between the table's schema and individual files. | ||
| filter = filter | ||
| predicate = predicate | ||
| .map(|filter| { | ||
| let expr = expr_adapter_factory | ||
| .create(logical_schema.clone(), physical_file_schema.clone()) | ||
|
|
@@ -206,28 +201,38 @@ impl FileOpener for VortexOpener { | |
| ); | ||
| } | ||
|
|
||
| let filter = filter | ||
| let predicate = predicate | ||
| .and_then(|f| { | ||
| let exprs = split_conjunction(&f) | ||
| .into_iter() | ||
| .filter_map(|e| { | ||
| if is_dynamic_physical_expr(&e) { | ||
| e.snapshot().ok().flatten() | ||
| } else { | ||
| Some(Arc::clone(e)) | ||
| } | ||
|
Comment on lines
+209
to
+213
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if it's possible or worth it but you could do this more lazily, or wrap the DataFusion dynamic filter in a Vortex dynamic filter and call
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I roughly tried to do that in some old PR, but I don't remember why I stopped pushing it. This seems like a good starting point, and we can experiment with it in the future to see which one is better. |
||
| }) | ||
| .filter(|expr| can_be_pushed_down(expr, &predicate_file_schema)) | ||
| .collect::<Vec<_>>(); | ||
|
|
||
| make_vortex_predicate(&exprs).transpose() | ||
| // TODO: make `make_vortex_predicate` accept `&[Arc<dyn PhysicalExpr>]` instead of `&[&Arc<dyn PhysicalExpr>]` | ||
| // or maybe `impl Iterator<Item=impl AsRef<Arc<dyn PhysicalExpr>>>` or something like that? | ||
| let expr_refs = exprs.iter().collect::<Vec<_>>(); | ||
| make_vortex_predicate(&expr_refs).transpose() | ||
| }) | ||
| .transpose() | ||
| .map_err(|e| DataFusionError::External(e.into()))?; | ||
|
|
||
| if let Some(limit) = limit | ||
| && filter.is_none() | ||
| && predicate.is_none() | ||
| { | ||
| scan_builder = scan_builder.with_limit(limit); | ||
| } | ||
|
|
||
| let stream = scan_builder | ||
| .with_metrics(metrics) | ||
| .with_projection(projection_expr) | ||
| .with_some_filter(filter) | ||
| .with_some_filter(predicate) | ||
| .with_ordered(has_output_ordering) | ||
| .map(|chunk| chunk.to_struct().into_record_batch()) | ||
| .into_stream() | ||
|
|
@@ -263,6 +268,7 @@ impl FileOpener for VortexOpener { | |
| }) | ||
| .try_flatten() | ||
| .map(move |batch| batch.and_then(|b| schema_mapping.map_batch(b))) | ||
| .map(|b| b) | ||
| .boxed(); | ||
|
|
||
| Ok(stream) | ||
|
|
@@ -423,11 +429,10 @@ mod tests { | |
| Field::new("a", DataType::Int32, false), | ||
| ])); | ||
|
|
||
| let make_opener = |filter| VortexOpener { | ||
| let make_opener = |predicate| VortexOpener { | ||
| object_store: object_store.clone(), | ||
| projection: Some([0].into()), | ||
| filter: Some(filter), | ||
| file_pruning_predicate: None, | ||
| predicate: Some(predicate), | ||
| expr_adapter_factory: expr_adapter_factory.clone(), | ||
| schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), | ||
| partition_fields: vec![Arc::new(Field::new("part", DataType::Int32, false))], | ||
|
|
@@ -504,11 +509,10 @@ mod tests { | |
| // Table schema has can accommodate both files | ||
| let table_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])); | ||
|
|
||
| let make_opener = |filter| VortexOpener { | ||
| let make_opener = |predicate| VortexOpener { | ||
| object_store: object_store.clone(), | ||
| projection: Some([0].into()), | ||
| filter: Some(filter), | ||
| file_pruning_predicate: None, | ||
| predicate: Some(predicate), | ||
| expr_adapter_factory: expr_adapter_factory.clone(), | ||
| schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), | ||
| partition_fields: vec![], | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what other assertions we could make. I don't think Vortex gives any metrics on rows pruned, etc. And the Vortex expression isn't saved anywhere / is not in the plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put in a print statement and can see that an appropriate Vortex predicate is being created, but can't verify that it's doing what's expected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I have a better idea here right now, I need to figure out how to improve the overall testability