-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add late pruning of file based on file level statistics #16014
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
A couple of thoughts:
|
Yes this would be super nice -- the more we can do to consolidate statistics / pruning the better off the code will be I think. Right now it is kind of scattered in several places |
You can use something like https://docs.rs/futures/latest/futures/stream/fn.iter.html perhaps -- like |
@@ -367,7 +368,7 @@ impl Default for OnError { | |||
pub trait FileOpener: Unpin + Send + Sync { | |||
/// Asynchronously open the specified file and return a stream | |||
/// of [`RecordBatch`] | |||
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture>; | |||
fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result<FileOpenFuture>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it sufficient to provide only file statistics? PartitionedFile seems like an overkill to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe? But I feel like we have the partitioned file we might as well pass it in. Maybe we use it in the future to enable optimizations that use the partition values (eg late pruning based on partition values, including partition values in the scan so that more filters can be evaluated, etc)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using PartitionedFile as the "data we have at plan time" including statistics and potentially information about size, encryption, special indexes, etc makes a lot of sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe? But I feel like we have the partitioned file we might as well pass it in. Maybe we use it in the future to enable optimizations that use the partition values (eg late pruning based on partition values, including partition values in the scan so that more filters can be evaluated, etc)
I believe these can also be inferred from statistics in a more generalized fashion(don't know partition columns exist in column_statistics now) but not a big deal, we can keep this 👍🏻
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please update the documetnation for open() to mention that file
has plan time per-file information (such as statistics) and leave a doc link back?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea makes a lot sense. I've one implementation suggestion. Thanks again @adriangb
0e03bdc
to
94726cc
Compare
@alamb please review again I implemented and added a test 😄 |
(Some(stats), Some(predicate)) => { | ||
let pruning_predicate = build_pruning_predicate( | ||
Arc::clone(predicate), | ||
&self.table_schema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it use table_schema
here?
match (&file.statistics, &self.predicate) { | ||
(Some(stats), Some(predicate)) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that there is only one branch, I suggest using if let (Some(_), Some(_)) = xxx {}
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very cool -- I think this is very close
@@ -367,7 +368,7 @@ impl Default for OnError { | |||
pub trait FileOpener: Unpin + Send + Sync { | |||
/// Asynchronously open the specified file and return a stream | |||
/// of [`RecordBatch`] | |||
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture>; | |||
fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result<FileOpenFuture>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please update the documetnation for open() to mention that file
has plan time per-file information (such as statistics) and leave a doc link back?
} | ||
} | ||
|
||
/// Returns [`BooleanArray`] where each row represents information known |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment cna probably be trimmed with a link back to the original trait source
@@ -995,6 +996,184 @@ fn build_statistics_record_batch<S: PruningStatistics>( | |||
}) | |||
} | |||
|
|||
/// Prune a set of containers represented by their statistics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a nice structure -- I think it makes lots of sense and is 100%
Specifically, I thought there was already code that pruned individual files based on statistics but I cound not find any in LIstingTable (we have something like this in influxdb_iox
).
My opinion is if we are going to this code it into the DataFusion codebase we should
- Ensure that it helps a as many users as possble
- Make sure it is executed as much as possible (to ensure test coverage)
Thus, what do you think about using the PrunableStatistics to prune the FileGroup in ListingTable here:
?
Pruning on statistics during plan time would potentially be redundant with also trying to prune again during opening, but it would reduce the files earlier int he plan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about I bundle in the PartitionValues somehow and then we can re-use and compose that?
Specifically:
- TableProvider's use just the partition values
- ParquetOpener combines both
- Something else can use just the stats
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pruning on statistics during plan time would potentially be redundant with also trying to prune again during opening, but it would reduce the files earlier int he plan
Yeah I don't think it's redundant: you either prune or you don't. If we prune earlier the files don't make it this far. If we don't we may now be able to prune them. What's redundant is if there are no changes to the filters (i.e. no dynamic filters), but that sounds both hard to track and like a possible future optimization 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kk
/// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`], | ||
/// and [`Self::row_counts`]. | ||
fn num_containers(&self) -> usize { | ||
1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be self.statistics.len(), right?
@alamb I pushed 4607643 which adds some nice APIs for partition values. In particular I think it's important to have a way to prune based on partition values + file level statistics (#15935). However I can't implement it for |
Maybe it is time to make a |
FYI @xudong963 I think this is relevant to your work on statistics / partition pruning as well |
Seems reasonable to me. I guess it'd be at the same level as |
Moving to Next hurdle: at this point we've long lost information on the actual table schema / partition files.
I think any of these also sets us up to refactor how the partition filters actually get applied (i.e. we don't have to inject them in the @alamb any preference? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM, thank you
if let (Some(stats), Some(predicate)) = (&file.statistics, &self.predicate) { | ||
let pruning_predicate = build_pruning_predicate( | ||
Arc::clone(predicate), | ||
&self.table_schema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it reasonable to use table_schema
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the only schema we have. And it's not even really the table schema, the name is misleading for historical reasons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be better to add some notes about it. (I often confused when I reading the parquet part code, all kinds of schema, lol)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
datafusion/datafusion/datasource-parquet/src/opener.rs
Lines 182 to 185 in 4607643
// Note about schemas: we are actually dealing with **3 different schemas** here: | |
// - The table schema as defined by the TableProvider. This is what the user sees, what they get when they `SELECT * FROM table`, etc. | |
// - The "virtual" file schema: this is the table schema minus any hive partition columns and projections. This is what the file schema is coerced to. | |
// - The physical file schema: this is the schema as defined by the parquet file. This is what the parquet file actually contains. |
I think the next step here is to resolve #16014 (comment) In my mind it makes sense to both push down the information and continue to have the ability to do it after the scan. |
I think we should try and avoid moving everything to datafusion_common. Since the pruning stuff relies on PhysicalExpr I don't think we can directly put it in datafusion_common
I think this sounds like the most straightforward thing to me and the easiest way to get the required information Seems like Maybe we can do something like this (change to use a FieldRef rather than Field to avoid copies): pub struct PartitionedFile {
...
pub partition_values: Vec<ScalarValue>,
...
} to pub struct PartitionedFile {
...
pub partition_values: Vec<(FieldRef, ScalarValue)>,
...
} |
BTW the other thing I somewhat worry about reapplying pruning during file opening is that it is in the critical path and directly will add to the query latency. I wonder if there is some way to ensure we have hidden it behind IO if possible (aka make sure we are applying the extra pruning while the next file is opened rather than waiting to do it before starting that IO |
That sounds good to me. It kinda makes sense that if you're carrying around partition values you'd carry around info on what columns they belong to. Maybe it will help resolve #13270 as well in the future.
I think we can move it a couple lines lower into |
e8eb87f
to
cc120d0
Compare
@alamb @xudong963 I've pushed a change that:
|
I will review this more carefully later today |
@alamb I took a look at the test failures and it seems to me that what is happening is the tests expected pruning at the row group stats level but it's now happening at the file level, which is a good thing 😄 ! But making the tests fail 😢. They're macro generated tests which is a bit wonky... may need a more careful eye to figure out how to rejig the tests to accept the new pruning. I'm at a conference all of this week and on PTO next week so not sure I'll be able to get to it but I'll try; if someone else can look that'd be great. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TLDR is I think this is really nice and very powerful @adriangb. The only thing I think is needed prior to merge is to figure out some way to avoid trying to prune files when we know an attempt has already been made at planning time.
Maybe we can break this PR up into some smaller chunks:
- Move
PartitionStatistics
to datafusion-common (that is an easy one) - Potentially one for
CompositePruningStatistics
andPrunableStatistics
- The final one that hooks it all up on the FileOpener
} | ||
} | ||
|
||
pub struct CompositePruningStatistics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very fancy idea -- it probably needs some more comments about what it does (namely combines multiple sources together where if one pruning statistics doesn't have information for a particular column, tries the other PruningStatistics
in turn
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some more docs 😄
let enable_page_index = self.enable_page_index; | ||
|
||
Ok(Box::pin(async move { | ||
// Prune this file using the file level statistics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I worry that in the case when there aren't any dynamic predicates, trying to prune again on file opening is simply going to be pure overhead / wasteful.
Therefore, I think it would be good if we could somehow control / disable trying to apply this extra filtering when it is known it would not help
For example, maybe we can have a field on ParquetOpener
with something like prune_on_open
which can be set to true if there are dynamic predicates present.
This would also likely ensure the tests can pass again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which can be set to true if there are dynamic predicates present
the issue is: how do we know the filters are dynamic? we've hidden dynamic filters behind PhysicalExpr
so that the system can treat them as normal filters. we could do any filter pushdown but that doesn't seem like much of an improvement.
I also think this pruning should be quite cheap / the record batches being filtered are just a couple rows
datafusion/common/src/pruning.rs
Outdated
vec![vec![]; partition_schema.fields().len()]; | ||
for partition_value in partition_values.iter() { | ||
for (i, value) in partition_value.iter().enumerate() { | ||
partition_valeus_by_column[i].push(value.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be great to avoid these clones if possible
parquet_file_reader_factory: Arc::new( | ||
DefaultParquetFileReaderFactory::new(Arc::clone(&store)), | ||
), | ||
partition_fields: vec![], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we probably need a test for pruning on partition_fields as well
) -> Option<BooleanArray>; | ||
} | ||
|
||
pub struct PartitionPruningStatistics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should document this struct, specifically including information about how the partition values are mapped to the main schema
|
||
/// Prune a set of containers represented by their statistics. | ||
/// Each [`Statistics`] represents a container (e.g. a file or a partition of files). | ||
pub struct PrunableStatistics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a good pattern -- it turns out we have something very similar in influxdb_iox:
/// Each [`Statistics`] represents a container (e.g. a file or a partition of files). | ||
pub struct PrunableStatistics { | ||
/// Statistics for each container. | ||
statistics: Vec<Arc<Statistics>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect we could just use references here and save a bunch of arcs (not a big deal) but something like
statistics: Vec<Arc<Statistics>>, | |
statistics: Vec<&'a Statistics>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this but it turned out to be tricky given that all of this ends up in a boxed future, etc.
Let's start with #16069 |
My plan for this PR now is to first resolve blockers. In particular:
And then come back here and resolve the rest of the points of discussion. |
PartitionedFile
intoFileSource
for late file stats based pruning #16000