Skip to content

Optimizer failure: NormalizeCTEIds brakes CTE references for queries with nested CTEs #55799

@hkeller7kfw

Description

@hkeller7kfw

Describe the bug

Optimizer crashs in InlineCTE or other rule due to a broken CTERelationRefs. This only happens for plans with nested WithCTE nodes.

Reproduction

Nested CTEs may easily be created with Temp Views and successive SQL execution. First we define a COMMON temp view with the following SQL, where INPUT is any dataset with columns A, B and VALUE:

            WITH CTE_COMMON as (
                SELECT A, B, SUM(VALUE) as VALUE from INPUT group by A, B)
            SELECT * FROM CTE_COMMON

This is used to create dataset A:

            WITH CTE_A as (
                SELECT A, SUM(VALUE) as VALUE from COMMON group by A)
            SELECT A as ID, VALUE FROM CTE_A

and dataset B:

            WITH CTE_B as (
                SELECT B, SUM(VALUE) as VALUE from COMMON group by B)
            SELECT B as ID, VALUE FROM CTE_B

Then we do a UNION of both datasets (in Java): Dataset<Row> myUnion = a.union(b);
Now, any count or explain operation on myUnion will fail with:

org.apache.spark.SparkException: [PLAN_VALIDATION_FAILED_RULE_IN_BATCH] Rule org.apache.spark.sql.catalyst.optimizer.CombineUnions in batch Union generated an invalid plan: The plan was previously resolved and now became unresolved. SQLSTATE: XXKD0
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:272)
        [...]

Environment

  • Spark version: 4.1.1 or 4.2.0-preview5
  • Java version: JDK 17.0.18

Root Cause

After some debugging we found that one of the CTE refs was broken in NormalizeCTEIds,
Before normalization (myUnion.queryExecution().analyzed()):

Union false, false
:- WithCTE
:  :- CTERelationDef 1, false
:  :  +- SubqueryAlias CTE_A
:  :     +- Aggregate [A#6L], [A#6L, sum(VALUE#9) AS VALUE#12]
:  :        +- SubqueryAlias common
:  :           +- View (`COMMON`, [A#6L, B#7L, VALUE#9])
:  :              +- WithCTE
:  :                 :- CTERelationDef 0, false
:  :                 :  +- SubqueryAlias CTE_COMMON
:  :                 :     +- Aggregate [A#6L, B#7L], [A#6L, B#7L, sum(VALUE#8) AS VALUE#9]
:  :                 :        +- SubqueryAlias input
:  :                 :           +- View (`INPUT`, [A#6L, B#7L, VALUE#8])
:  :                 :              +- Relation [A#6L,B#7L,VALUE#8] json
:  :                 +- Project [A#6L, B#7L, VALUE#9]
:  :                    +- SubqueryAlias CTE_COMMON
:  :                       +- CTERelationRef 0, true, [A#6L, B#7L, VALUE#9], false, false
:  +- Project [A#14L AS ID#11L, VALUE#15]
:     +- SubqueryAlias CTE_A
:        +- CTERelationRef 1, true, [A#14L, VALUE#15], false, false
+- WithCTE
   :- CTERelationDef 2, false
   :  +- SubqueryAlias CTE_B
   :     +- Aggregate [B#36L], [B#36L, sum(VALUE#37) AS VALUE#22]
   :        +- SubqueryAlias common
   :           +- View (`COMMON`, [A#35L, B#36L, VALUE#37])
   :              +- WithCTE
   :                 :- CTERelationDef 0, false
   :                 :  +- SubqueryAlias CTE_COMMON
   :                 :     +- Aggregate [A#31L, B#32L], [A#31L, B#32L, sum(VALUE#33) AS VALUE#34]
   :                 :        +- SubqueryAlias input
   :                 :           +- View (`INPUT`, [A#31L, B#32L, VALUE#33])
   :                 :              +- Relation [A#31L,B#32L,VALUE#33] json
   :                 +- Project [A#35L, B#36L, VALUE#37]
   :                    +- SubqueryAlias CTE_COMMON
   :                       +- CTERelationRef 0, true, [A#35L, B#36L, VALUE#37], false, false
   +- Project [B#24L AS ID#21L, VALUE#25]
      +- SubqueryAlias CTE_B
         +- CTERelationRef 2, true, [B#24L, VALUE#25], false, false

After normalization (myUnion.queryExecution().normalized()):

Union false, false
:- WithCTE
:  :- CTERelationDef 0, false
:  :  +- SubqueryAlias CTE_A
:  :     +- Aggregate [A#6L], [A#6L, sum(VALUE#9) AS VALUE#12]
:  :        +- SubqueryAlias common
:  :           +- View (`COMMON`, [A#6L, B#7L, VALUE#9])
:  :              +- WithCTE
:  :                 :- CTERelationDef 1, false
:  :                 :  +- SubqueryAlias CTE_COMMON
:  :                 :     +- Aggregate [A#6L, B#7L], [A#6L, B#7L, sum(VALUE#8) AS VALUE#9]
:  :                 :        +- SubqueryAlias input
:  :                 :           +- View (`INPUT`, [A#6L, B#7L, VALUE#8])
:  :                 :              +- Relation [A#6L,B#7L,VALUE#8] json
:  :                 +- Project [A#6L, B#7L, VALUE#9]
:  :                    +- SubqueryAlias CTE_COMMON
:  :                       +- CTERelationRef 1, true, [A#6L, B#7L, VALUE#9], false, false
:  +- Project [A#14L AS ID#11L, VALUE#15]
:     +- SubqueryAlias CTE_A
:        +- CTERelationRef 0, true, [A#14L, VALUE#15], false, false
+- WithCTE
   :- CTERelationDef 2, false
   :  +- SubqueryAlias CTE_B
   :     +- Aggregate [B#36L], [B#36L, sum(VALUE#37) AS VALUE#22]
   :        +- SubqueryAlias common
   :           +- View (`COMMON`, [A#35L, B#36L, VALUE#37])
   :              +- WithCTE
   :                 :- CTERelationDef 1, false
   :                 :  +- SubqueryAlias CTE_COMMON
   :                 :     +- Aggregate [A#31L, B#32L], [A#31L, B#32L, sum(VALUE#33) AS VALUE#34]
   :                 :        +- SubqueryAlias input
   :                 :           +- View (`INPUT`, [A#31L, B#32L, VALUE#33])
   :                 :              +- Relation [A#31L,B#32L,VALUE#33] json
   :                 +- Project [A#35L, B#36L, VALUE#37]
   :                    +- SubqueryAlias CTE_COMMON
   :                       +- CTERelationRef 0, true, [A#35L, B#36L, VALUE#37], false, false             ** broken, should be 1 **
   +- Project [B#24L AS ID#21L, VALUE#25]
      +- SubqueryAlias CTE_B
         +- CTERelationRef 2, true, [B#24L, VALUE#25], false, false

NormalizeCTEIds switches the ids 0 and 1, but applies this mapping multiple times to nested WithCTE nodes. Therefore the second CTERelationRef 0 is changed two times, from 0 -> 1 -> 0.

Workaround

Use temp views or subqueries instead of CTEs. But we have a lot of quite complex SQL files and this would degrade maintainability.
Another solution would be to call canonicalizeCTE() in NormalizeCTEIds only once, maybe at the end of apply(). This works for us but may brake caching (the reason NorlalizeCTEIds was introduced). There also is another place where CTEs are renumbered in NormalizePlan, which may be a better place to fix this.
I am afraid that a more experienced Scala/Spark developer is needed here.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions