Skip to content

Conversation

@peter-toth
Copy link
Contributor

@peter-toth peter-toth commented Nov 12, 2025

What changes were proposed in this pull request?

This PR renames MergeScalarSubqueries rule to MergeSubplans and extends plan merging capabilities to non-grouping aggregate subplans, which are very similar to scalar subqueries in terms they return one row result.

Consider the following query that joins 2 non-grouping aggregates:

Join Inner
:- Aggregate [min(a) AS min(a)]
:  +- Relation [a, b, c]
+- Aggregate [sum(b) AS sum(b), avg(cast(c as double)) AS avg(c)]
   +- Relation [a, b, c]

with the improved rule the plan is optimized to:

WithCTE
:- CTERelationDef 0
:  +- Project [named_struct(min(a), min(a), sum(b), sum(b), avg(c), avg(c)) AS mergedValue]
:     +- Aggregate [min(a) AS min(a), sum(b) AS sum(b), avg(cast(c as double)) AS avg(c)]
:        +- Relation [a, b, c]
+- Join Inner
   :- Project [scalar-subquery [].min(a) AS min(a)]
   :  :  +- CTERelationRef 0
   :  +- OneRowRelation
   +- Project [scalar-subquery [].sum(b) AS sum(b), scalar-subquery [].avg(c) AS avg(c)]
      :  :- CTERelationRef 0
      :  +- CTERelationRef 0
      +- OneRowRelation

so as to scan Relation only once.

Please note that the above plan where the 2 aggregations are part of a "join group" could be rewritten as one aggregate without the need to introduce a CTE and keeping the join. But there are more complex cases when the proposed CTE based approach is the only viable option. Such cases include when the aggregates reside at different parts of plan, maybe even in diffrent subquery expressions.

E.g. the following query:

Join Inner
:- Project [scalar-subquery [] AS scalarsubquery()]
:  :  +- Aggregate [min(a) AS min(a)]
:  :     +- Relation [a, b, c]
:  +- OneRowRelation
+- Aggregate [sum(b) AS sum(b), avg(cast(c as double)) AS avg(c)]
   +- Relation [a, b, c]

can be optimized to:

WithCTE
:- CTERelationDef 0
:  +- Project [named_struct(min(a), min(a), sum(b), sum(b), avg(c), avg(c)) AS mergedValue]
:     +- Aggregate [min(a) AS min(a), sum(b) AS sum(b), avg(cast(c as double)) AS avg(c)]
:        +- Relation [a, b, c]
+- Join Inner
   :- Project [scalar-subquery [].min(a) AS scalarsubquery()]
   :  :  +- CTERelationRef 0
   :  +- OneRowRelation
   +- Project [scalar-subquery [].sum(b) AS sum(b), scalar-subquery [].avg(c) AS avg(c)]
      :  :- CTERelationRef 0
      :  +- CTERelationRef 0
      +- OneRowRelation

Why are the changes needed?

To improve plan merging logic to further reduce redundant IO.

Please also note that TPCDS q28 and q88 contain non-grouping aggregates, but this PR can't deal with them yet. Those queries will improve once SPARK-40193 / #37630 lands in Spark.

Does this PR introduce any user-facing change?

Yes.

How was this patch tested?

Existing and new UTs.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Nov 12, 2025
@peter-toth
Copy link
Contributor Author

@cloud-fan , @dongjoon-hyun, @beliefer, this is the next step I would like to improve Spark's plan merging logic with.

It is similar to the previous #42223 attempt, but uses the common plan merging logic I extracted to PlanMerger in #52835 and the improvement it brings is not limited to "join groups" only.

@@ -1,241 +0,0 @@
/*
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately GitHub doesn't recognize this rule file rename from MergeScalarSubqueries.scala to MergeSubplans.scala, but the 2 share a lot in common.

Copy link
Contributor Author

@peter-toth peter-toth Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted the scala docs change of MergeSubplans in 335b593 for now to make the 2 files much similar. GitHub recognizes the file rename now and shows the real diff. This way the review should be much easier.

I can update the scala docs to the following in a follow-up PR:

/**
 * This rule tries to merge multiple subplans that have one row result. This can be either the plan
 * tree of a [[ScalarSubquery]] expression or the plan tree starting at a non-grouping [[Aggregate]]
 * node.
 *
 * The process is the following:
 * - While traversing through the plan each one row returning subplan is tried to merge into already
 *   seen one row returning subplans using `PlanMerger`s.
 *   During this first traversal each [[ScalarSubquery]] expression is replaced to a temporal
 *   [[ScalarSubqueryReference]] and each non-grouping [[Aggregate]] node is replaced to a temporal
 *   [[NonGroupingAggregateReference]] pointing to its possible merged version in `PlanMerger`s.
 *   `PlanMerger`s keep track of whether a plan is a result of merging 2 or more subplans, or is an
 *   original unmerged plan.
 *   [[ScalarSubqueryReference]]s and [[NonGroupingAggregateReference]]s contain all the required
 *   information to either restore the original subplan or create a reference to a merged CTE.
 * - Once the first traversal is complete and all possible merging have been done, a second
 *   traversal removes the references to either restore the original subplans or to replace the
 *   original to a modified ones that reference a CTE with a merged plan.
 *   A modified [[ScalarSubquery]] is constructed like:
 *   `GetStructField(ScalarSubquery(CTERelationRef to the merged plan), merged output index)`
 *   ans a modified [[Aggregate]] is constructed like:
 *   ```
 *   Project(
 *     Seq(
 *       GetStructField(
 *         ScalarSubquery(CTERelationRef to the merged plan),
 *         merged output index 1),
 *       GetStructField(
 *         ScalarSubquery(CTERelationRef to the merged plan),
 *         merged output index 2),
 *       ...),
 *     OneRowRelation)
 *   ```
 *   where `merged output index`s are the index of the output attributes (of the CTE) that
 *   correspond to the output of the original node.
 * - If there are merged subqueries in `PlanMerger`s then a `WithCTE` node is built from these
 *   queries. The `CTERelationDef` nodes contain the merged subplans in the following form:
 *   `Project(Seq(CreateNamedStruct(name1, attribute1, ...) AS mergedValue), mergedSubplan)`.
 *   The definitions are flagged that they host a subplan, that can return maximum one row.
 *
 * Here are a few examples:
 *
 * 1. a query with 2 subqueries:
 * ```
 * Project [scalar-subquery [] AS scalarsubquery(), scalar-subquery [] AS scalarsubquery()]
 * :  :- Aggregate [min(a) AS min(a)]
 * :  :  +- Relation [a, b, c]
 * :  +- Aggregate [sum(b) AS sum(b)]
 * :     +- Relation [a, b, c]
 * +- OneRowRelation
 * ```
 * is optimized to:
 * ```
 * WithCTE
 * :- CTERelationDef 0
 * :  +- Project [named_struct(min(a), min(a), sum(b), sum(b)) AS mergedValue]
 * :     +- Aggregate [min(a) AS min(a), sum(b) AS sum(b)]
 * :        +- Relation [a, b, c]
 * +- Project [scalar-subquery [].min(a) AS scalarsubquery(),
 *             scalar-subquery [].sum(b) AS scalarsubquery()]
 *    :  :- CTERelationRef 0
 *    :  +- CTERelationRef 0
 *    +- OneRowRelation
 * ```
 *
 * 2. a query with 2 non-grouping aggregates:
 * ```
 * Join Inner
 * :- Aggregate [min(a) AS min(a)]
 * :  +- Relation [a, b, c]
 * +- Aggregate [sum(b) AS sum(b), avg(cast(c as double)) AS avg(c)]
 *    +- Relation [a, b, c]
 * ```
 * is optimized to:
 * ```
 * WithCTE
 * :- CTERelationDef 0
 * :  +- Project [named_struct(min(a), min(a), sum(b), sum(b), avg(c), avg(c)) AS mergedValue]
 * :     +- Aggregate [min(a) AS min(a), sum(b) AS sum(b), avg(cast(c as double)) AS avg(c)]
 * :        +- Relation [a, b, c]
 * +- Join Inner
 *    :- Project [scalar-subquery [].min(a) AS min(a)]
 *    :  :  +- CTERelationRef 0
 *    :  +- OneRowRelation
 *    +- Project [scalar-subquery [].sum(b) AS sum(b), scalar-subquery [].avg(c) AS avg(c)]
 *       :  :- CTERelationRef 0
 *       :  +- CTERelationRef 0
 *       +- OneRowRelation
 * ```
 *
 * 3. a query with a subquery and a non-grouping aggregate:
 * ```
 * Join Inner
 * :- Project [scalar-subquery [] AS scalarsubquery()]
 * :  :  +- Aggregate [min(a) AS min(a)]
 * :  :     +- Relation [a, b, c]
 * :  +- OneRowRelation
 * +- Aggregate [sum(b) AS sum(b), avg(cast(c as double)) AS avg(c)]
 *    +- Relation [a, b, c]
 * ```
 * is optimized to:
 * ```
 * WithCTE
 * :- CTERelationDef 0
 * :  +- Project [named_struct(min(a), min(a), sum(b), sum(b), avg(c), avg(c)) AS mergedValue]
 * :     +- Aggregate [min(a) AS min(a), sum(b) AS sum(b), avg(cast(c as double)) AS avg(c)]
 * :        +- Relation [a, b, c]
 * +- Join Inner
 *    :- Project [scalar-subquery [].min(a) AS scalarsubquery()]
 *    :  :  +- CTERelationRef 0
 *    :  +- OneRowRelation
 *    +- Project [scalar-subquery [].sum(b) AS sum(b), scalar-subquery [].avg(c) AS avg(c)]
 *       :  :- CTERelationRef 0
 *       :  +- CTERelationRef 0
 *       +- OneRowRelation
 * ```
 */

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pinging me, @peter-toth .

@dongjoon-hyun
Copy link
Member

cc @yaooqinn, @LuciferYang , @wangyum , too.

Batch("MergeScalarSubqueries", Once,
MergeScalarSubqueries,
Batch("MergeSubplans", Once,
MergeSubplans,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While looking at the code, I become curious. Can we have this improvement as a new rule, @peter-toth ?

Copy link
Contributor Author

@peter-toth peter-toth Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very good question and I too was thinking about it lot. Actually, initially I was trying to add a new rule to handle Aggregate nodes without grouping expressions in join groups only, but then I realized that having more rules just adds more complexity.
Our goal is to recognize mergeable subparts of the whole plan and extract it to CTEs and just referece them at the original place. It doesn't matter if such a subplan is the whole plan of a scalar subquery expression or it is a non-grouping aggegate node somewhere in the middle of the plan. Actually these 2 do overlap in many cases as the root node of a scalar subquery expression is an Aggregate node in most of the cases. So it doesn't make sense to handle them differently in 2 rules.

Copy link
Contributor Author

@peter-toth peter-toth Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I deliberately chose a generic new rule name like MergeSubplans instead of something specific like MergeOneRowResultSubplans, as I'm hoping that I can continue improving the rule in the future with SPARK-40193 / #37630 to be able to merge different filters; and then extend the rule to handle subplans that return multiple rows.

// `MergeScalarSubqueries` can duplicate subqueries in the optimized plan and would make testing
// complicated.
conf.setConfString(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, MergeScalarSubqueries.ruleName)
conf.setConfString(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, MergeSubplans.ruleName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also rings a bell to me. Is there a fallback plan for this improvement? For example, can we exclude this new implement without a regression which means keeping the existing MergeScalarSubqueries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I can add a flag to restore the old behaviour, but having one common rule seems more reasonable to me.

}
}

test("Merge non-correlated scalar subqueries") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plan merging related tests are moved to sql/PlanMergeSuite.scala.

}
}

test("Merge non-grouping aggregates") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following 3 tests are new.

@dongjoon-hyun
Copy link
Member

I merged the following. Could you rebase once more?

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @peter-toth .

@peter-toth
Copy link
Contributor Author

Thank you @dongjoon-hyun and @yaooqinn for the review.

I let the PR open for a few more days and plan on merging it early next week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants