-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-44571][SQL] Merge subplans with one row result #53019
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[SPARK-44571][SQL] Merge subplans with one row result #53019
Conversation
|
@cloud-fan , @dongjoon-hyun, @beliefer, this is the next step I would like to improve Spark's plan merging logic with. It is similar to the previous #42223 attempt, but uses the common plan merging logic I extracted to |
| @@ -1,241 +0,0 @@ | |||
| /* | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately GitHub doesn't recognize this rule file rename from MergeScalarSubqueries.scala to MergeSubplans.scala, but the 2 share a lot in common.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reverted the scala docs change of MergeSubplans in 335b593 for now to make the 2 files much similar. GitHub recognizes the file rename now and shows the real diff. This way the review should be much easier.
I can update the scala docs to the following in a follow-up PR:
/**
* This rule tries to merge multiple subplans that have one row result. This can be either the plan
* tree of a [[ScalarSubquery]] expression or the plan tree starting at a non-grouping [[Aggregate]]
* node.
*
* The process is the following:
* - While traversing through the plan each one row returning subplan is tried to merge into already
* seen one row returning subplans using `PlanMerger`s.
* During this first traversal each [[ScalarSubquery]] expression is replaced to a temporal
* [[ScalarSubqueryReference]] and each non-grouping [[Aggregate]] node is replaced to a temporal
* [[NonGroupingAggregateReference]] pointing to its possible merged version in `PlanMerger`s.
* `PlanMerger`s keep track of whether a plan is a result of merging 2 or more subplans, or is an
* original unmerged plan.
* [[ScalarSubqueryReference]]s and [[NonGroupingAggregateReference]]s contain all the required
* information to either restore the original subplan or create a reference to a merged CTE.
* - Once the first traversal is complete and all possible merging have been done, a second
* traversal removes the references to either restore the original subplans or to replace the
* original to a modified ones that reference a CTE with a merged plan.
* A modified [[ScalarSubquery]] is constructed like:
* `GetStructField(ScalarSubquery(CTERelationRef to the merged plan), merged output index)`
* ans a modified [[Aggregate]] is constructed like:
* ```
* Project(
* Seq(
* GetStructField(
* ScalarSubquery(CTERelationRef to the merged plan),
* merged output index 1),
* GetStructField(
* ScalarSubquery(CTERelationRef to the merged plan),
* merged output index 2),
* ...),
* OneRowRelation)
* ```
* where `merged output index`s are the index of the output attributes (of the CTE) that
* correspond to the output of the original node.
* - If there are merged subqueries in `PlanMerger`s then a `WithCTE` node is built from these
* queries. The `CTERelationDef` nodes contain the merged subplans in the following form:
* `Project(Seq(CreateNamedStruct(name1, attribute1, ...) AS mergedValue), mergedSubplan)`.
* The definitions are flagged that they host a subplan, that can return maximum one row.
*
* Here are a few examples:
*
* 1. a query with 2 subqueries:
* ```
* Project [scalar-subquery [] AS scalarsubquery(), scalar-subquery [] AS scalarsubquery()]
* : :- Aggregate [min(a) AS min(a)]
* : : +- Relation [a, b, c]
* : +- Aggregate [sum(b) AS sum(b)]
* : +- Relation [a, b, c]
* +- OneRowRelation
* ```
* is optimized to:
* ```
* WithCTE
* :- CTERelationDef 0
* : +- Project [named_struct(min(a), min(a), sum(b), sum(b)) AS mergedValue]
* : +- Aggregate [min(a) AS min(a), sum(b) AS sum(b)]
* : +- Relation [a, b, c]
* +- Project [scalar-subquery [].min(a) AS scalarsubquery(),
* scalar-subquery [].sum(b) AS scalarsubquery()]
* : :- CTERelationRef 0
* : +- CTERelationRef 0
* +- OneRowRelation
* ```
*
* 2. a query with 2 non-grouping aggregates:
* ```
* Join Inner
* :- Aggregate [min(a) AS min(a)]
* : +- Relation [a, b, c]
* +- Aggregate [sum(b) AS sum(b), avg(cast(c as double)) AS avg(c)]
* +- Relation [a, b, c]
* ```
* is optimized to:
* ```
* WithCTE
* :- CTERelationDef 0
* : +- Project [named_struct(min(a), min(a), sum(b), sum(b), avg(c), avg(c)) AS mergedValue]
* : +- Aggregate [min(a) AS min(a), sum(b) AS sum(b), avg(cast(c as double)) AS avg(c)]
* : +- Relation [a, b, c]
* +- Join Inner
* :- Project [scalar-subquery [].min(a) AS min(a)]
* : : +- CTERelationRef 0
* : +- OneRowRelation
* +- Project [scalar-subquery [].sum(b) AS sum(b), scalar-subquery [].avg(c) AS avg(c)]
* : :- CTERelationRef 0
* : +- CTERelationRef 0
* +- OneRowRelation
* ```
*
* 3. a query with a subquery and a non-grouping aggregate:
* ```
* Join Inner
* :- Project [scalar-subquery [] AS scalarsubquery()]
* : : +- Aggregate [min(a) AS min(a)]
* : : +- Relation [a, b, c]
* : +- OneRowRelation
* +- Aggregate [sum(b) AS sum(b), avg(cast(c as double)) AS avg(c)]
* +- Relation [a, b, c]
* ```
* is optimized to:
* ```
* WithCTE
* :- CTERelationDef 0
* : +- Project [named_struct(min(a), min(a), sum(b), sum(b), avg(c), avg(c)) AS mergedValue]
* : +- Aggregate [min(a) AS min(a), sum(b) AS sum(b), avg(cast(c as double)) AS avg(c)]
* : +- Relation [a, b, c]
* +- Join Inner
* :- Project [scalar-subquery [].min(a) AS scalarsubquery()]
* : : +- CTERelationRef 0
* : +- OneRowRelation
* +- Project [scalar-subquery [].sum(b) AS sum(b), scalar-subquery [].avg(c) AS avg(c)]
* : :- CTERelationRef 0
* : +- CTERelationRef 0
* +- OneRowRelation
* ```
*/
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for pinging me, @peter-toth .
|
cc @yaooqinn, @LuciferYang , @wangyum , too. |
| Batch("MergeScalarSubqueries", Once, | ||
| MergeScalarSubqueries, | ||
| Batch("MergeSubplans", Once, | ||
| MergeSubplans, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While looking at the code, I become curious. Can we have this improvement as a new rule, @peter-toth ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very good question and I too was thinking about it lot. Actually, initially I was trying to add a new rule to handle Aggregate nodes without grouping expressions in join groups only, but then I realized that having more rules just adds more complexity.
Our goal is to recognize mergeable subparts of the whole plan and extract it to CTEs and just referece them at the original place. It doesn't matter if such a subplan is the whole plan of a scalar subquery expression or it is a non-grouping aggegate node somewhere in the middle of the plan. Actually these 2 do overlap in many cases as the root node of a scalar subquery expression is an Aggregate node in most of the cases. So it doesn't make sense to handle them differently in 2 rules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I deliberately chose a generic new rule name like MergeSubplans instead of something specific like MergeOneRowResultSubplans, as I'm hoping that I can continue improving the rule in the future with SPARK-40193 / #37630 to be able to merge different filters; and then extend the rule to handle subplans that return multiple rows.
| // `MergeScalarSubqueries` can duplicate subqueries in the optimized plan and would make testing | ||
| // complicated. | ||
| conf.setConfString(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, MergeScalarSubqueries.ruleName) | ||
| conf.setConfString(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, MergeSubplans.ruleName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also rings a bell to me. Is there a fallback plan for this improvement? For example, can we exclude this new implement without a regression which means keeping the existing MergeScalarSubqueries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I can add a flag to restore the old behaviour, but having one common rule seems more reasonable to me.
sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Outdated
Show resolved
Hide resolved
...alyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala
Show resolved
Hide resolved
| } | ||
| } | ||
|
|
||
| test("Merge non-correlated scalar subqueries") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plan merging related tests are moved to sql/PlanMergeSuite.scala.
| } | ||
| } | ||
|
|
||
| test("Merge non-grouping aggregates") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The following 3 tests are new.
|
I merged the following. Could you rebase once more? |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @peter-toth .
|
Thank you @dongjoon-hyun and @yaooqinn for the review. I let the PR open for a few more days and plan on merging it early next week. |
What changes were proposed in this pull request?
This PR renames
MergeScalarSubqueriesrule toMergeSubplansand extends plan merging capabilities to non-grouping aggregate subplans, which are very similar to scalar subqueries in terms they return one row result.Consider the following query that joins 2 non-grouping aggregates:
with the improved rule the plan is optimized to:
so as to scan
Relationonly once.Please note that the above plan where the 2 aggregations are part of a "join group" could be rewritten as one aggregate without the need to introduce a CTE and keeping the join. But there are more complex cases when the proposed CTE based approach is the only viable option. Such cases include when the aggregates reside at different parts of plan, maybe even in diffrent subquery expressions.
E.g. the following query:
can be optimized to:
Why are the changes needed?
To improve plan merging logic to further reduce redundant IO.
Please also note that TPCDS q28 and q88 contain non-grouping aggregates, but this PR can't deal with them yet. Those queries will improve once SPARK-40193 / #37630 lands in Spark.
Does this PR introduce any user-facing change?
Yes.
How was this patch tested?
Existing and new UTs.
Was this patch authored or co-authored using generative AI tooling?
No.