Optimize group by with limit#22009
Conversation
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing optimize-clickbench-q17-limit (dbd05b5) to 948cd09 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing optimize-clickbench-q17-limit (dbd05b5) to 948cd09 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing optimize-clickbench-q17-limit (dbd05b5) to 948cd09 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing optimize-clickbench-q17-limit (d611742) to 948cd09 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing optimize-clickbench-q17-limit (d611742) to 948cd09 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing optimize-clickbench-q17-limit (d611742) to 948cd09 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
@Dandandan , wow ! |
Yeah this is a pretty nice find 😄 |
|
I vaguely remember there have been previous attempts to do this (optimize GROUP BY with a LIMIT without an ORDER BY) but I can't find it now While I think these queries are of limited practical import (it basically returns some arbitrary subset of the groups) given it has come up a few times, I think we should pursue this (especially since the code is relatively simple) |
| Limit: skip=0, fetch=2 | ||
| Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b)]] | ||
| TableScan: test | ||
| RightSemi Join: test.a = test.a |
There was a problem hiding this comment.
Did you consider adding a LIMIT directly to the GroupByHash operator? I am not sure how much extra complexity that is, but you could probably model similarly to a SUM(a FILTER key IN <hash table>)
Though it might add a lot more complexity 🤔
Or we could implement a special GroupBy operator, similar to what @avantgardnerio implemented for GROUPBY with a limit https://github.com/apache/datafusion/tree/main/datafusion/physical-plan/src/aggregates/topk)
| Ok(Transformed::yes(make_limit(skip, fetch, Arc::new(input)))) | ||
| } | ||
|
|
||
| /// Rewrite `LIMIT K (GROUP BY keys, aggs)` into a key preselection followed |
There was a problem hiding this comment.
I would probably describe this more like:
SELECT aggs(...) GROUP BY keys LIMIT k
Which issue does this PR close?
Rationale for this change
ClickBench q17 aggregates by
UserID, SearchPhraseand applies a small LIMIT. Building the full aggregate state can use substantially more memory than needed for queries where any limited set of grouping keys is sufficient.This PR preselects a limited set of grouping keys before running the original aggregate, allowing existing join dynamic filtering to push that key set into the second input scan.
What changes are included in this PR?
PushDownLimitto rewrite limited grouped aggregates with aggregate expressions whenenable_distinct_aggregation_soft_limitis enabled.LIMIT K (GROUP BY keys)as a key preselection plan, then right-semi joins the original input against those keys before the final aggregate.Are these changes tested?
Yes, existing tests
Are there any user-facing changes?
No API changes.