Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 99 additions & 1 deletion vortex-datafusion/src/persistent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ mod tests {
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::SessionContext;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::record_batch;
use datafusion_datasource::file_format::format_as_file_type;
use datafusion_expr::LogicalPlanBuilder;
use datafusion_physical_plan::display::DisplayableExecutionPlan;
Expand Down Expand Up @@ -190,6 +191,103 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_dynamic_filter_pushdown() -> anyhow::Result<()> {
let factory = VortexFormatFactory::new();
let config = SessionConfig::default().with_target_partitions(1);
let mut session_state_builder = SessionStateBuilder::new()
.with_default_features()
.with_config(config);
register_vortex_format_factory(factory, &mut session_state_builder);
let session = SessionContext::new_with_state(session_state_builder.build());

// Write the first table
let dir1 = TempDir::new()?;
let data = session.read_batch(
record_batch!(
("a", Int32, vec![1, 2, 3]),
("b", Utf8, vec!["x", "y", "z"])
)
.unwrap(),
)?;
let logical_plan = LogicalPlanBuilder::copy_to(
data.logical_plan().clone(),
dir1.path().to_str().unwrap().to_string(),
format_as_file_type(Arc::new(VortexFormatFactory::new())),
Default::default(),
vec![],
)?
.build()?;
session
.execute_logical_plan(logical_plan)
.await?
.collect()
.await?;

// Write the second table
let dir2 = TempDir::new()?;
let data = session.read_batch(
record_batch!(("a", Int32, vec![2, 3, 4]), ("c", Int32, vec![20, 30, 40])).unwrap(),
)?;
let logical_plan = LogicalPlanBuilder::copy_to(
data.logical_plan().clone(),
dir2.path().to_str().unwrap().to_string(),
format_as_file_type(Arc::new(VortexFormatFactory::new())),
Default::default(),
vec![],
)?
.build()?;
session
.execute_logical_plan(logical_plan)
.await?
.collect()
.await?;

// Validate the output by reading back the written files
session
.sql(&format!(
"CREATE EXTERNAL TABLE t1 \
(a INT NOT NULL, b STRING) \
STORED AS vortex
LOCATION '{}/';",
dir1.path().to_str().unwrap()
))
.await?;
session
.sql(&format!(
"CREATE EXTERNAL TABLE t2 \
(a INT NOT NULL, c INT) \
STORED AS vortex
LOCATION '{}/';",
dir2.path().to_str().unwrap()
))
.await?;

let result = session
.sql(
"EXPLAIN ANALYZE SELECT t1.a, t1.b FROM t1 JOIN t2 ON t1.a = t2.a WHERE t2.c > 30;",
)
.await?
.collect()
.await?;

let plan = format!("{}", pretty_format_batches(&result)?);

println!("EXPLAIN ANALYZE:\n{}", plan);

let data_source_line = plan
.lines()
.filter(|line| line.contains("DataSourceExec"))
.skip(1)
.next()
.ok_or_else(|| {
vortex_err!("Plan should have 2 DataSourceExec, the second is the probe side")
})?;
assert!(data_source_line.contains("DynamicFilterPhysicalExpr [ a@0 >= 1 AND a@0 <= 3 ]"));
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what other assertions we could make. I don't think Vortex gives any metrics on rows pruned, etc. And the Vortex expression isn't saved anywhere / is not in the plan.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put in a print statement and can see that an appropriate Vortex predicate is being created, but can't verify that it's doing what's expected

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I have a better idea here right now, I need to figure out how to improve the overall testability


Ok(())
}

#[tokio::test]
async fn create_table_ordered_by() -> anyhow::Result<()> {
let dir = TempDir::new().unwrap();
Expand Down
46 changes: 25 additions & 21 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,8 @@ pub(crate) struct VortexOpener {
pub object_store: Arc<dyn ObjectStore>,
/// Projection by index of the file's columns
pub projection: Option<Arc<[usize]>>,
/// Filter expression optimized for pushdown into Vortex scan operations.
/// This may be a subset of file_pruning_predicate containing only expressions
/// that Vortex can efficiently evaluate.
pub filter: Option<PhysicalExprRef>,
/// Filter expression used by DataFusion's FilePruner to eliminate files based on
/// statistics and partition values without opening them.
pub file_pruning_predicate: Option<PhysicalExprRef>,
/// Predicate expression optimized for pushdown into Vortex scan operations.
pub predicate: Option<PhysicalExprRef>,
pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
pub schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
/// Hive-style partitioning columns
Expand All @@ -64,8 +59,7 @@ impl FileOpener for VortexOpener {
fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> DFResult<FileOpenFuture> {
let object_store = self.object_store.clone();
let projection = self.projection.clone();
let mut filter = self.filter.clone();
let file_pruning_predicate = self.file_pruning_predicate.clone();
let mut predicate = self.predicate.clone();
let expr_adapter_factory = self.expr_adapter_factory.clone();
let partition_fields = self.partition_fields.clone();
let file_cache = self.file_cache.clone();
Expand Down Expand Up @@ -93,7 +87,8 @@ impl FileOpener for VortexOpener {
// opening them based on:
// - Partition column values (e.g., date=2024-01-01)
// - File-level statistics (min/max values per column)
let mut file_pruner = file_pruning_predicate
let mut file_pruner = predicate
.clone()
.map(|predicate| {
// Only create pruner if we have dynamic expressions or file statistics
// to work with. Static predicates without stats won't benefit from pruning.
Expand Down Expand Up @@ -140,7 +135,7 @@ impl FileOpener for VortexOpener {

// The adapter rewrites the expression to the local file schema, allowing
// for schema evolution and divergence between the table's schema and individual files.
filter = filter
predicate = predicate
.map(|filter| {
let expr = expr_adapter_factory
.create(logical_schema.clone(), physical_file_schema.clone())
Expand Down Expand Up @@ -206,28 +201,38 @@ impl FileOpener for VortexOpener {
);
}

let filter = filter
let predicate = predicate
.and_then(|f| {
let exprs = split_conjunction(&f)
.into_iter()
.filter_map(|e| {
if is_dynamic_physical_expr(&e) {
e.snapshot().ok().flatten()
} else {
Some(Arc::clone(e))
}
Comment on lines +209 to +213
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's possible or worth it but you could do this more lazily, or wrap the DataFusion dynamic filter in a Vortex dynamic filter and call DynamicFilterPhysicalExpr::current() or PhysicalExpr::snapshot() on each RecordBatch (that would be more overhead but also will kick in sooner on scans that touch very large files).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I roughly tried to do that in some old PR, but I don't remember why I stopped pushing it. This seems like a good starting point, and we can experiment with it in the future to see which one is better.

})
.filter(|expr| can_be_pushed_down(expr, &predicate_file_schema))
.collect::<Vec<_>>();

make_vortex_predicate(&exprs).transpose()
// TODO: make `make_vortex_predicate` accept `&[Arc<dyn PhysicalExpr>]` instead of `&[&Arc<dyn PhysicalExpr>]`
// or maybe `impl Iterator<Item=impl AsRef<Arc<dyn PhysicalExpr>>>` or something like that?
let expr_refs = exprs.iter().collect::<Vec<_>>();
make_vortex_predicate(&expr_refs).transpose()
})
.transpose()
.map_err(|e| DataFusionError::External(e.into()))?;

if let Some(limit) = limit
&& filter.is_none()
&& predicate.is_none()
{
scan_builder = scan_builder.with_limit(limit);
}

let stream = scan_builder
.with_metrics(metrics)
.with_projection(projection_expr)
.with_some_filter(filter)
.with_some_filter(predicate)
.with_ordered(has_output_ordering)
.map(|chunk| chunk.to_struct().into_record_batch())
.into_stream()
Expand Down Expand Up @@ -263,6 +268,7 @@ impl FileOpener for VortexOpener {
})
.try_flatten()
.map(move |batch| batch.and_then(|b| schema_mapping.map_batch(b)))
.map(|b| b)
.boxed();

Ok(stream)
Expand Down Expand Up @@ -423,11 +429,10 @@ mod tests {
Field::new("a", DataType::Int32, false),
]));

let make_opener = |filter| VortexOpener {
let make_opener = |predicate| VortexOpener {
object_store: object_store.clone(),
projection: Some([0].into()),
filter: Some(filter),
file_pruning_predicate: None,
predicate: Some(predicate),
expr_adapter_factory: expr_adapter_factory.clone(),
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
partition_fields: vec![Arc::new(Field::new("part", DataType::Int32, false))],
Expand Down Expand Up @@ -504,11 +509,10 @@ mod tests {
// Table schema has can accommodate both files
let table_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));

let make_opener = |filter| VortexOpener {
let make_opener = |predicate| VortexOpener {
object_store: object_store.clone(),
projection: Some([0].into()),
filter: Some(filter),
file_pruning_predicate: None,
predicate: Some(predicate),
expr_adapter_factory: expr_adapter_factory.clone(),
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
partition_fields: vec![],
Expand Down
74 changes: 25 additions & 49 deletions vortex-datafusion/src/persistent/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use datafusion_physical_expr_adapter::{
};
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use datafusion_physical_plan::filter_pushdown::{
FilterPushdownPropagation, PushedDown, PushedDownPredicate,
FilterPushdownPropagation, PushedDown,
};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::{DisplayFormatType, PhysicalExpr};
Expand All @@ -42,11 +42,8 @@ use crate::convert::exprs::can_be_pushed_down;
pub struct VortexSource {
pub(crate) file_cache: VortexFileCache,
/// Combined predicate expression containing all filters from DataFusion query planning.
/// Used with FilePruner to skip files based on statistics and partition values.
pub(crate) full_predicate: Option<PhysicalExprRef>,
/// Subset of predicates that can be pushed down into Vortex scan operations.
/// These are expressions that Vortex can efficiently evaluate during scanning.
pub(crate) vortex_predicate: Option<PhysicalExprRef>,
/// Used with FilePruner to skip files based on statistics and partition values as well as pushing down to Vortex.
pub(crate) predicate: Option<PhysicalExprRef>,
pub(crate) batch_size: Option<usize>,
pub(crate) projected_statistics: Option<Statistics>,
/// This is the file schema the table expects, which is the table's schema without partition columns, and **not** the file's physical schema.
Expand All @@ -66,8 +63,7 @@ impl VortexSource {
Self {
file_cache,
metrics,
full_predicate: None,
vortex_predicate: None,
predicate: None,
batch_size: None,
projected_statistics: None,
arrow_file_schema: None,
Expand Down Expand Up @@ -137,8 +133,7 @@ impl FileSource for VortexSource {
let opener = VortexOpener {
object_store,
projection,
filter: self.vortex_predicate.clone(),
file_pruning_predicate: self.full_predicate.clone(),
predicate: self.predicate.clone(),
expr_adapter_factory,
schema_adapter_factory,
partition_fields: base_config.table_partition_cols.clone(),
Expand Down Expand Up @@ -181,7 +176,7 @@ impl FileSource for VortexSource {
}

fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
self.vortex_predicate.clone()
self.predicate.clone()
}

fn metrics(&self) -> &ExecutionPlanMetricsSet {
Expand All @@ -194,7 +189,7 @@ impl FileSource for VortexSource {
.clone()
.vortex_expect("projected_statistics must be set");

if self.vortex_predicate.is_some() {
if self.predicate.is_some() {
Ok(statistics.to_inexact())
} else {
Ok(statistics)
Expand All @@ -208,13 +203,13 @@ impl FileSource for VortexSource {
fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
if let Some(ref predicate) = self.vortex_predicate {
if let Some(ref predicate) = self.predicate {
write!(f, ", predicate: {predicate}")?;
}
}
// Use TreeRender style key=value formatting to display the predicate
DisplayFormatType::TreeRender => {
if let Some(ref predicate) = self.vortex_predicate {
if let Some(ref predicate) = self.predicate {
writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
};
}
Expand All @@ -233,59 +228,40 @@ impl FileSource for VortexSource {
));
};

if filters.is_empty() {
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![],
));
}

let mut source = self.clone();

// Combine new filters with existing predicate for file pruning.
// This full predicate is used by FilePruner to eliminate files.
source.full_predicate = match source.full_predicate {
// This full predicate is used by FilePruner to eliminate files as well as what we'll attempt to push down to Vortex.
source.predicate = match &source.predicate {
Some(predicate) => Some(conjunction(
std::iter::once(predicate).chain(filters.clone()),
std::iter::once(predicate.clone()).chain(filters.clone()),
)),
None => Some(conjunction(filters.clone())),
};

// Collect which filters we can *definitely* push down to Vortex w/ full evaluation.
// For these filters we can tell our caller that we will handle them fully, they don't need to be evaluated upstream.
let supported_filters = filters
.into_iter()
.map(|expr| {
if can_be_pushed_down(&expr, schema) {
PushedDownPredicate::supported(expr)
PushedDown::Yes
} else {
PushedDownPredicate::unsupported(expr)
PushedDown::No
}
})
.collect::<Vec<_>>();

if supported_filters
.iter()
.all(|p| matches!(p.discriminant, PushedDown::No))
{
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![PushedDown::No; supported_filters.len()],
)
.with_updated_node(Arc::new(source) as _));
}

let supported = supported_filters
.iter()
.filter_map(|p| match p.discriminant {
PushedDown::Yes => Some(&p.predicate),
PushedDown::No => None,
})
.cloned();

let predicate = match source.vortex_predicate {
Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)),
None => conjunction(supported),
};

tracing::debug!(%predicate, "Saving predicate");

source.vortex_predicate = Some(predicate);

Ok(FilterPushdownPropagation::with_parent_pushdown_result(
supported_filters.iter().map(|f| f.discriminant).collect(),
Ok(
FilterPushdownPropagation::with_parent_pushdown_result(supported_filters)
.with_updated_node(Arc::new(source)),
)
.with_updated_node(Arc::new(source) as _))
}

fn with_schema_adapter_factory(
Expand Down
Loading