-
Notifications
You must be signed in to change notification settings - Fork 1.9k
feat: adaptive filter selectivity tracking for Parquet row filters #19639
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
run benchmarks |
|
🤖 |
|
show benchmark queue |
|
🤖 Hi @adriangb, you asked to view the benchmark queue (#19639 (comment)).
|
|
🤖: Benchmark completed Details
|
|
run benchmarks tpch |
|
🤖 Hi @adriangb, thanks for the request (#19639 (comment)).
Please choose one or more of these with |
|
run benchmark tpch |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmark tpch |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
This is probably not a good issue to pick up. This is a draft PR for an unproven idea. |
|
run benchmark tpch |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmark tpch |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
Did you find any evidence that the selectivity of predicates changes over the course of the query (or put another way that reordering them during execution would help?) |
One case where that for sure happens is dynamic filters 😉 (although we treat each version of it as a different filter, the point is that we need to be dynamic about new filters showing up mid query). But I think it's also not unusual to have unevenly distributed data across files. I do agree that if we change how we apply a filter between files that probably captures 95% of the benefit (as opposed to within a scan of a single file). But the main point is that we start from "we know nothing" which we're treating as "nothing is selective" and then once we know a filter is selective enough we move it over to be a row filter. |
|
Hey @adriangb, I've been thinking about something like this since the New Year. It's really cool to see you putting together a draft for it. I haven't had a chance to give a full go at your code, but I wanted to share some research I've done earlier that might be relevant:
Before seeing your PR and comments in #3463 I was thinking about using more simple heuristics for sorting predicates.
From a quick skim of the clickhouse original PR, they still rely on some simple heuristics when columns statistics aren't available. I would like to give your PR a proper review once I'm home, but I already love the direction you're taking. |
|
run benchmark tpch10 |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
Benchmark script failed with exit code 1. Last 10 lines of output: Click to expand |
|
run benchmark tpch |
|
run benchmark tpch10 |
|
I've pushed a commit which has |
|
show benchmark queue |
|
🤖 Hi @adriangb, you asked to view the benchmark queue (#19639 (comment)).
|
@adriangb, upon further reading, PREWHERE is not performing row group pruning; it evaluates predicate expressions right after that at the row filter stage. However, I don't think it decides which columns to push to scan and which to demote to post-scan, so it's not that relevant here. Originally, I hadn't read your comments and PR carefully enough, so I was under the impression that you were trying to use row group statistics to improve filter ordering (what PREWHERE in clickhouse is now doing). I now understand that your approach is more like dynamic programming, where we estimate selectivity at runtime per predicate per file, which is indeed very different from the clickhouse implementation. |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
I wonder if a good strategy could be:
|
|
Another way of testing parallelization is the issue is decreasing |
|
Running some tests in #19694 |
|
Running some tests locally, it seems that the bad performance is mostly related to join pushdown. Setting |
|
Ok - that looks better (#19694 (comment)): |
|
So I guess the main factor is expressions like this being super expensive to evaluate (query 9): |
|
#19694 (comment) |
|
So I would suggest to take the following steps
|
I wonder if it's the expression being expensive to evaluate or if evaluating it where it is currently causes the issue. That is, if this was evaluated in a
I guess we need to test both of those to understand how each one impacts results... |
72b078a to
c0b86d7
Compare
I tested both, the join filter pushdown has the dramatic (bad) impact on tpch performance, making it overall slightly better than without predicate pushdown , the DATAFUSION_OPTIMIZER_REPARTITION_FILE_MIN_SIZE only has minimal impact. |
I am not sure that would help, I guess join pushdown mostly helps if a lot of IO can be avoided and the expression is not super expensive to evaluation, which I think won't be better by moving it inside |
Summary
This PR implements cross-file tracking of filter selectivity in ParquetSource to adaptively reorder and demote low-selectivity filters, as discussed in #3463 (comment).
Key changes:
SelectivityTrackerto track filter effectiveness across files usingExprKeywrapper for structural equalityParquetOpenerqueries shared stats to partition filters into row filters (push down) vs post-scan filters (inline application)apply_post_scan_filters(), then filter columns are removed from outputSelectivityUpdatingStreamwrapper updates tracker when stream completesbuild_row_filter_with_metrics()returns per-filter metrics for selectivity trackingConfiguration:
parquet_options.filter_effectiveness_threshold(default: 0.8)Files added:
datafusion/datasource-parquet/src/selectivity.rs- Core tracking infrastructureFiles modified:
opener.rs- Filter partitioning, post-scan application,SelectivityUpdatingStreamrow_filter.rs-FilterMetrics,RowFilterWithMetrics, effectiveness-based reorderingsource.rs-selectivity_trackerfield and builder methodsconfig.rs-filter_effectiveness_thresholdconfig optionTest plan
ExprKeyhash/eq consistencySelectivityStats::effectiveness()edge casesSelectivityTracker::partition_filters()threshold logic🤖 Generated with Claude Code