Skip to content

Add late pruning of file based on file level statistics #16014

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

adriangb
Copy link
Contributor

@adriangb adriangb commented May 10, 2025

@github-actions github-actions bot added optimizer Optimizer rules core Core DataFusion crate datasource Changes to the datasource crate labels May 10, 2025
@adriangb
Copy link
Contributor Author

A couple of thoughts:

  1. Needs cleanup.
  2. Not sure how to construct the empty stream.
  3. It might be nice to implement pruning for Vec<Statistics> where each statistic represents an arbitrary container (e.g. partition or file).

@alamb
Copy link
Contributor

alamb commented May 11, 2025

It might be nice to implement pruning for Vec where each statistic represents an arbitrary container (e.g. partition or file).

Yes this would be super nice -- the more we can do to consolidate statistics / pruning the better off the code will be I think. Right now it is kind of scattered in several places

@alamb
Copy link
Contributor

alamb commented May 11, 2025

Not sure how to construct the empty stream.

You can use something like https://docs.rs/futures/latest/futures/stream/fn.iter.html perhaps -- like futures::stream::iter(vec![]) for example 🤔

@@ -367,7 +368,7 @@ impl Default for OnError {
pub trait FileOpener: Unpin + Send + Sync {
/// Asynchronously open the specified file and return a stream
/// of [`RecordBatch`]
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture>;
fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result<FileOpenFuture>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it sufficient to provide only file statistics? PartitionedFile seems like an overkill to me

Copy link
Contributor Author

@adriangb adriangb May 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe? But I feel like we have the partitioned file we might as well pass it in. Maybe we use it in the future to enable optimizations that use the partition values (eg late pruning based on partition values, including partition values in the scan so that more filters can be evaluated, etc)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using PartitionedFile as the "data we have at plan time" including statistics and potentially information about size, encryption, special indexes, etc makes a lot of sense

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe? But I feel like we have the partitioned file we might as well pass it in. Maybe we use it in the future to enable optimizations that use the partition values (eg late pruning based on partition values, including partition values in the scan so that more filters can be evaluated, etc)

I believe these can also be inferred from statistics in a more generalized fashion(don't know partition columns exist in column_statistics now) but not a big deal, we can keep this 👍🏻

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please update the documetnation for open() to mention that file has plan time per-file information (such as statistics) and leave a doc link back?

Copy link
Contributor

@berkaysynnada berkaysynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea makes a lot sense. I've one implementation suggestion. Thanks again @adriangb

@adriangb adriangb marked this pull request as ready for review May 11, 2025 23:09
@adriangb adriangb force-pushed the late-pruning-files branch from 0e03bdc to 94726cc Compare May 11, 2025 23:10
@adriangb
Copy link
Contributor Author

@alamb please review again I implemented and added a test 😄

(Some(stats), Some(predicate)) => {
let pruning_predicate = build_pruning_predicate(
Arc::clone(predicate),
&self.table_schema,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it use table_schema here?

Comment on lines 93 to 94
match (&file.statistics, &self.predicate) {
(Some(stats), Some(predicate)) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that there is only one branch, I suggest using if let (Some(_), Some(_)) = xxx {} here.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very cool -- I think this is very close

@@ -367,7 +368,7 @@ impl Default for OnError {
pub trait FileOpener: Unpin + Send + Sync {
/// Asynchronously open the specified file and return a stream
/// of [`RecordBatch`]
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture>;
fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result<FileOpenFuture>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please update the documetnation for open() to mention that file has plan time per-file information (such as statistics) and leave a doc link back?

}
}

/// Returns [`BooleanArray`] where each row represents information known
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment cna probably be trimmed with a link back to the original trait source

@@ -995,6 +996,184 @@ fn build_statistics_record_batch<S: PruningStatistics>(
})
}

/// Prune a set of containers represented by their statistics.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice structure -- I think it makes lots of sense and is 100%

Specifically, I thought there was already code that pruned individual files based on statistics but I cound not find any in LIstingTable (we have something like this in influxdb_iox).

My opinion is if we are going to this code it into the DataFusion codebase we should

  1. Ensure that it helps a as many users as possble
  2. Make sure it is executed as much as possible (to ensure test coverage)

Thus, what do you think about using the PrunableStatistics to prune the FileGroup in ListingTable here:

https://github.com/apache/datafusion/blob/55ba4cadce5ea99de4361929226f1c99cfc94450/datafusion/core/src/datasource/listing/table.rs#L1117-L1116

?

Pruning on statistics during plan time would potentially be redundant with also trying to prune again during opening, but it would reduce the files earlier int he plan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about I bundle in the PartitionValues somehow and then we can re-use and compose that?
Specifically:

  • TableProvider's use just the partition values
  • ParquetOpener combines both
  • Something else can use just the stats

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pruning on statistics during plan time would potentially be redundant with also trying to prune again during opening, but it would reduce the files earlier int he plan

Yeah I don't think it's redundant: you either prune or you don't. If we prune earlier the files don't make it this far. If we don't we may now be able to prune them. What's redundant is if there are no changes to the filters (i.e. no dynamic filters), but that sounds both hard to track and like a possible future optimization 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kk

/// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`],
/// and [`Self::row_counts`].
fn num_containers(&self) -> usize {
1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be self.statistics.len(), right?

@adriangb
Copy link
Contributor Author

@alamb I pushed 4607643 which adds some nice APIs for partition values. In particular I think it's important to have a way to prune based on partition values + file level statistics (#15935).

However I can't implement it for ListingTable since the trait is defined in physical-optimizer. Can we move the trait somewhere upstream?

@alamb
Copy link
Contributor

alamb commented May 13, 2025

However I can't implement it for ListingTable since the trait is defined in physical-optimizer. Can we move the trait somewhere upstream?

Maybe it is time to make a datafusion-pruning crate that has all the PruningPredicate and related infrastructure 🤔

@alamb
Copy link
Contributor

alamb commented May 13, 2025

FYI @xudong963 I think this is relevant to your work on statistics / partition pruning as well

@adriangb
Copy link
Contributor Author

However I can't implement it for ListingTable since the trait is defined in physical-optimizer. Can we move the trait somewhere upstream?

Maybe it is time to make a datafusion-pruning crate that has all the PruningPredicate and related infrastructure 🤔

Seems reasonable to me. I guess it'd be at the same level as PhysicalExpr and such.

@adriangb
Copy link
Contributor Author

Moving to datafusion_common works pretty well, I think that's easier than making a new crate.

Next hurdle: at this point we've long lost information on the actual table schema / partition files. ParquetOpener::table_schema is actually the file schema and we have no way to back out the partition columns.
Given that PartitionedFile carries around partition_values: Vec<ScalarValue> I'd recommend either:

  1. Changing PartitionedFile::partition_values to Vec<String, ScalarValue>.
  2. Adding PartitionedFile::partition_schema.
  3. Piping down table_schema into ParquetSource and later ParquetOpener.

I think any of these also sets us up to refactor how the partition filters actually get applied (i.e. we don't have to inject them in the FileScan. But maybe that's not desirable because every format would have to implement this on their own then. In that case we pipe them in to ParquetOpener for pruning and still inject them in the scan (it should be cheapish).

@alamb any preference?

@xudong963 xudong963 self-requested a review May 14, 2025 14:24
Copy link
Member

@xudong963 xudong963 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM, thank you

if let (Some(stats), Some(predicate)) = (&file.statistics, &self.predicate) {
let pruning_predicate = build_pruning_predicate(
Arc::clone(predicate),
&self.table_schema,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it reasonable to use table_schema here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the only schema we have. And it's not even really the table schema, the name is misleading for historical reasons.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be better to add some notes about it. (I often confused when I reading the parquet part code, all kinds of schema, lol)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Note about schemas: we are actually dealing with **3 different schemas** here:
// - The table schema as defined by the TableProvider. This is what the user sees, what they get when they `SELECT * FROM table`, etc.
// - The "virtual" file schema: this is the table schema minus any hive partition columns and projections. This is what the file schema is coerced to.
// - The physical file schema: this is the schema as defined by the parquet file. This is what the parquet file actually contains.
😄

@adriangb
Copy link
Contributor Author

I think the next step here is to resolve #16014 (comment)

In my mind it makes sense to both push down the information and continue to have the ability to do it after the scan.
The direction DataFusion seems to be heading in is to add whatever functionality is needed to specialize readers for the most optimal performance (in this case by doing late pruning of files / partitions and being able to evaluate filters that mix partition columns and file columns during the scan) but preserving the ability to fall back to more general approaches (FilterExec, evaluating mixed filters after the scan) for sources that don't support this advanced functionality.

@alamb
Copy link
Contributor

alamb commented May 14, 2025

Moving to datafusion_common works pretty well, I think that's easier than making a new crate.

I think we should try and avoid moving everything to datafusion_common. Since the pruning stuff relies on PhysicalExpr I don't think we can directly put it in datafusion_common

Next hurdle: at this point we've long lost information on the actual table schema / partition files. ParquetOpener::table_schema is actually the file schema and we have no way to back out the partition columns. Given that PartitionedFile carries around partition_values: Vec<ScalarValue> I'd recommend either:

  1. Changing PartitionedFile::partition_values to Vec<String, ScalarValue>.
  2. Adding PartitionedFile::partition_schema.
  3. Piping down table_schema into ParquetSource and later ParquetOpener.

I think any of these also sets us up to refactor how the partition filters actually get applied (i.e. we don't have to inject them in the FileScan. But maybe that's not desirable because every format would have to implement this on their own then. In that case we pipe them in to ParquetOpener for pruning and still inject them in the scan (it should be cheapish).

@alamb any preference?

  1. Changing PartitionedFile::partition_values to Vec<String, ScalarValue>.

I think this sounds like the most straightforward thing to me and the easiest way to get the required information

Seems like FileScanConfig already has table_partition_cols,

Maybe we can do something like this (change to use a FieldRef rather than Field to avoid copies):

pub struct PartitionedFile {
...
    pub partition_values: Vec<ScalarValue>,
...
}

to

pub struct PartitionedFile {
...
    pub partition_values: Vec<(FieldRef, ScalarValue)>,
...
}

@alamb
Copy link
Contributor

alamb commented May 14, 2025

BTW the other thing I somewhat worry about reapplying pruning during file opening is that it is in the critical path and directly will add to the query latency. I wonder if there is some way to ensure we have hidden it behind IO if possible (aka make sure we are applying the extra pruning while the next file is opened rather than waiting to do it before starting that IO

@adriangb
Copy link
Contributor Author

Since the pruning stuff relies on PhysicalExpr I don't think we can directly put it in datafusion_common
The stuff I'm moving doesn't 😄. It's basically just the PruningStatistics trait.

Maybe we can do something like this (change to use a FieldRef rather than Field to avoid copies):

That sounds good to me. It kinda makes sense that if you're carrying around partition values you'd carry around info on what columns they belong to. Maybe it will help resolve #13270 as well in the future.

BTW the other thing I somewhat worry about reapplying pruning during file opening is that it is in the critical path and directly will add to the query latency. I wonder if there is some way to ensure we have hidden it behind IO if possible (aka make sure we are applying the extra pruning while the next file is opened rather than waiting to do it before starting that IO

I think we can move it a couple lines lower into Ok(Box::pin(async move { and that will do the trick? As long as it happens before we load the Parquet metadata the overhead is minimal. There's probably other stuff we could move into there if that's a concern.

@github-actions github-actions bot added common Related to common crate proto Related to proto crate labels May 15, 2025
@adriangb adriangb force-pushed the late-pruning-files branch from e8eb87f to cc120d0 Compare May 15, 2025 03:30
@github-actions github-actions bot added the documentation Improvements or additions to documentation label May 15, 2025
@adriangb
Copy link
Contributor Author

@alamb @xudong963 I've pushed a change that:

  1. Moves PruningStatistics into common.
  2. Adds composable helpers to prune based on Vec<Statistics> (multiple files / partitions) and Vec<Vec<ScalarValue>> (multiple containers of partition values).
  3. Adds partition_fields: Vec<FieldRef> to ParquetOpener, with slight tweaks to FileScanConfig (the latter is a bit of a PITA because of how it's both a struct and it's own builder).
  4. Implements the pruning inside of the the IO work so that it's deferred as Andrew asked for.
  5. Sets us up nicely to pipe the partition values into the other stages of pruning (row group stats, page stats and row filters). Leaving this for future work though.

@alamb
Copy link
Contributor

alamb commented May 15, 2025

I will review this more carefully later today

@alamb alamb mentioned this pull request May 15, 2025
24 tasks
@adriangb
Copy link
Contributor Author

@alamb I took a look at the test failures and it seems to me that what is happening is the tests expected pruning at the row group stats level but it's now happening at the file level, which is a good thing 😄 ! But making the tests fail 😢. They're macro generated tests which is a bit wonky... may need a more careful eye to figure out how to rejig the tests to accept the new pruning. I'm at a conference all of this week and on PTO next week so not sure I'll be able to get to it but I'll try; if someone else can look that'd be great.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TLDR is I think this is really nice and very powerful @adriangb. The only thing I think is needed prior to merge is to figure out some way to avoid trying to prune files when we know an attempt has already been made at planning time.

Maybe we can break this PR up into some smaller chunks:

  1. Move PartitionStatistics to datafusion-common (that is an easy one)
  2. Potentially one for CompositePruningStatistics and PrunableStatistics
  3. The final one that hooks it all up on the FileOpener

}
}

pub struct CompositePruningStatistics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very fancy idea -- it probably needs some more comments about what it does (namely combines multiple sources together where if one pruning statistics doesn't have information for a particular column, tries the other PruningStatistics in turn

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some more docs 😄

let enable_page_index = self.enable_page_index;

Ok(Box::pin(async move {
// Prune this file using the file level statistics.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry that in the case when there aren't any dynamic predicates, trying to prune again on file opening is simply going to be pure overhead / wasteful.

Therefore, I think it would be good if we could somehow control / disable trying to apply this extra filtering when it is known it would not help

For example, maybe we can have a field on ParquetOpener with something like prune_on_open which can be set to true if there are dynamic predicates present.

This would also likely ensure the tests can pass again

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which can be set to true if there are dynamic predicates present

the issue is: how do we know the filters are dynamic? we've hidden dynamic filters behind PhysicalExpr so that the system can treat them as normal filters. we could do any filter pushdown but that doesn't seem like much of an improvement.

I also think this pruning should be quite cheap / the record batches being filtered are just a couple rows

vec![vec![]; partition_schema.fields().len()];
for partition_value in partition_values.iter() {
for (i, value) in partition_value.iter().enumerate() {
partition_valeus_by_column[i].push(value.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be great to avoid these clones if possible

parquet_file_reader_factory: Arc::new(
DefaultParquetFileReaderFactory::new(Arc::clone(&store)),
),
partition_fields: vec![],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably need a test for pruning on partition_fields as well

) -> Option<BooleanArray>;
}

pub struct PartitionPruningStatistics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should document this struct, specifically including information about how the partition values are mapped to the main schema


/// Prune a set of containers represented by their statistics.
/// Each [`Statistics`] represents a container (e.g. a file or a partition of files).
pub struct PrunableStatistics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good pattern -- it turns out we have something very similar in influxdb_iox:

https://github.com/influxdata/influxdb3_core/blob/af9fabea05e2135a094a69dc5b7d549e713420f9/iox_query/src/pruning.rs#L157

/// Each [`Statistics`] represents a container (e.g. a file or a partition of files).
pub struct PrunableStatistics {
/// Statistics for each container.
statistics: Vec<Arc<Statistics>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect we could just use references here and save a bunch of arcs (not a big deal) but something like

Suggested change
statistics: Vec<Arc<Statistics>>,
statistics: Vec<&'a Statistics>,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this but it turned out to be tricky given that all of this ends up in a boxed future, etc.

@adriangb
Copy link
Contributor Author

Let's start with #16069

@adriangb
Copy link
Contributor Author

My plan for this PR now is to first resolve blockers. In particular:

And then come back here and resolve the rest of the points of discussion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate core Core DataFusion crate datasource Changes to the datasource crate documentation Improvements or additions to documentation optimizer Optimizer rules proto Related to proto crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Pass PartitionedFile into FileSource for late file stats based pruning
4 participants