Describe the bug
Optimizer crashs in InlineCTE or other rule due to a broken CTERelationRefs. This only happens for plans with nested WithCTE nodes.
Reproduction
Nested CTEs may easily be created with Temp Views and successive SQL execution. First we define a COMMON temp view with the following SQL, where INPUT is any dataset with columns A, B and VALUE:
WITH CTE_COMMON as (
SELECT A, B, SUM(VALUE) as VALUE from INPUT group by A, B)
SELECT * FROM CTE_COMMON
This is used to create dataset A:
WITH CTE_A as (
SELECT A, SUM(VALUE) as VALUE from COMMON group by A)
SELECT A as ID, VALUE FROM CTE_A
and dataset B:
WITH CTE_B as (
SELECT B, SUM(VALUE) as VALUE from COMMON group by B)
SELECT B as ID, VALUE FROM CTE_B
Then we do a UNION of both datasets (in Java): Dataset<Row> myUnion = a.union(b);
Now, any count or explain operation on myUnion will fail with:
org.apache.spark.SparkException: [PLAN_VALIDATION_FAILED_RULE_IN_BATCH] Rule org.apache.spark.sql.catalyst.optimizer.CombineUnions in batch Union generated an invalid plan: The plan was previously resolved and now became unresolved. SQLSTATE: XXKD0
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:272)
[...]
Environment
- Spark version: 4.1.1 or 4.2.0-preview5
- Java version: JDK 17.0.18
Root Cause
After some debugging we found that one of the CTE refs was broken in NormalizeCTEIds,
Before normalization (myUnion.queryExecution().analyzed()):
Union false, false
:- WithCTE
: :- CTERelationDef 1, false
: : +- SubqueryAlias CTE_A
: : +- Aggregate [A#6L], [A#6L, sum(VALUE#9) AS VALUE#12]
: : +- SubqueryAlias common
: : +- View (`COMMON`, [A#6L, B#7L, VALUE#9])
: : +- WithCTE
: : :- CTERelationDef 0, false
: : : +- SubqueryAlias CTE_COMMON
: : : +- Aggregate [A#6L, B#7L], [A#6L, B#7L, sum(VALUE#8) AS VALUE#9]
: : : +- SubqueryAlias input
: : : +- View (`INPUT`, [A#6L, B#7L, VALUE#8])
: : : +- Relation [A#6L,B#7L,VALUE#8] json
: : +- Project [A#6L, B#7L, VALUE#9]
: : +- SubqueryAlias CTE_COMMON
: : +- CTERelationRef 0, true, [A#6L, B#7L, VALUE#9], false, false
: +- Project [A#14L AS ID#11L, VALUE#15]
: +- SubqueryAlias CTE_A
: +- CTERelationRef 1, true, [A#14L, VALUE#15], false, false
+- WithCTE
:- CTERelationDef 2, false
: +- SubqueryAlias CTE_B
: +- Aggregate [B#36L], [B#36L, sum(VALUE#37) AS VALUE#22]
: +- SubqueryAlias common
: +- View (`COMMON`, [A#35L, B#36L, VALUE#37])
: +- WithCTE
: :- CTERelationDef 0, false
: : +- SubqueryAlias CTE_COMMON
: : +- Aggregate [A#31L, B#32L], [A#31L, B#32L, sum(VALUE#33) AS VALUE#34]
: : +- SubqueryAlias input
: : +- View (`INPUT`, [A#31L, B#32L, VALUE#33])
: : +- Relation [A#31L,B#32L,VALUE#33] json
: +- Project [A#35L, B#36L, VALUE#37]
: +- SubqueryAlias CTE_COMMON
: +- CTERelationRef 0, true, [A#35L, B#36L, VALUE#37], false, false
+- Project [B#24L AS ID#21L, VALUE#25]
+- SubqueryAlias CTE_B
+- CTERelationRef 2, true, [B#24L, VALUE#25], false, false
After normalization (myUnion.queryExecution().normalized()):
Union false, false
:- WithCTE
: :- CTERelationDef 0, false
: : +- SubqueryAlias CTE_A
: : +- Aggregate [A#6L], [A#6L, sum(VALUE#9) AS VALUE#12]
: : +- SubqueryAlias common
: : +- View (`COMMON`, [A#6L, B#7L, VALUE#9])
: : +- WithCTE
: : :- CTERelationDef 1, false
: : : +- SubqueryAlias CTE_COMMON
: : : +- Aggregate [A#6L, B#7L], [A#6L, B#7L, sum(VALUE#8) AS VALUE#9]
: : : +- SubqueryAlias input
: : : +- View (`INPUT`, [A#6L, B#7L, VALUE#8])
: : : +- Relation [A#6L,B#7L,VALUE#8] json
: : +- Project [A#6L, B#7L, VALUE#9]
: : +- SubqueryAlias CTE_COMMON
: : +- CTERelationRef 1, true, [A#6L, B#7L, VALUE#9], false, false
: +- Project [A#14L AS ID#11L, VALUE#15]
: +- SubqueryAlias CTE_A
: +- CTERelationRef 0, true, [A#14L, VALUE#15], false, false
+- WithCTE
:- CTERelationDef 2, false
: +- SubqueryAlias CTE_B
: +- Aggregate [B#36L], [B#36L, sum(VALUE#37) AS VALUE#22]
: +- SubqueryAlias common
: +- View (`COMMON`, [A#35L, B#36L, VALUE#37])
: +- WithCTE
: :- CTERelationDef 1, false
: : +- SubqueryAlias CTE_COMMON
: : +- Aggregate [A#31L, B#32L], [A#31L, B#32L, sum(VALUE#33) AS VALUE#34]
: : +- SubqueryAlias input
: : +- View (`INPUT`, [A#31L, B#32L, VALUE#33])
: : +- Relation [A#31L,B#32L,VALUE#33] json
: +- Project [A#35L, B#36L, VALUE#37]
: +- SubqueryAlias CTE_COMMON
: +- CTERelationRef 0, true, [A#35L, B#36L, VALUE#37], false, false ** broken, should be 1 **
+- Project [B#24L AS ID#21L, VALUE#25]
+- SubqueryAlias CTE_B
+- CTERelationRef 2, true, [B#24L, VALUE#25], false, false
NormalizeCTEIds switches the ids 0 and 1, but applies this mapping multiple times to nested WithCTE nodes. Therefore the second CTERelationRef 0 is changed two times, from 0 -> 1 -> 0.
Workaround
Use temp views or subqueries instead of CTEs. But we have a lot of quite complex SQL files and this would degrade maintainability.
Another solution would be to call canonicalizeCTE() in NormalizeCTEIds only once, maybe at the end of apply(). This works for us but may brake caching (the reason NorlalizeCTEIds was introduced). There also is another place where CTEs are renumbered in NormalizePlan, which may be a better place to fix this.
I am afraid that a more experienced Scala/Spark developer is needed here.
Describe the bug
Optimizer crashs in InlineCTE or other rule due to a broken CTERelationRefs. This only happens for plans with nested WithCTE nodes.
Reproduction
Nested CTEs may easily be created with Temp Views and successive SQL execution. First we define a COMMON temp view with the following SQL, where INPUT is any dataset with columns A, B and VALUE:
This is used to create dataset A:
and dataset B:
Then we do a UNION of both datasets (in Java):
Dataset<Row> myUnion = a.union(b);Now, any count or explain operation on
myUnionwill fail with:Environment
Root Cause
After some debugging we found that one of the CTE refs was broken in NormalizeCTEIds,
Before normalization (
myUnion.queryExecution().analyzed()):After normalization (
myUnion.queryExecution().normalized()):NormalizeCTEIds switches the ids 0 and 1, but applies this mapping multiple times to nested WithCTE nodes. Therefore the second
CTERelationRef 0is changed two times, from 0 -> 1 -> 0.Workaround
Use temp views or subqueries instead of CTEs. But we have a lot of quite complex SQL files and this would degrade maintainability.
Another solution would be to call
canonicalizeCTE()inNormalizeCTEIdsonly once, maybe at the end ofapply(). This works for us but may brake caching (the reason NorlalizeCTEIds was introduced). There also is another place where CTEs are renumbered inNormalizePlan, which may be a better place to fix this.I am afraid that a more experienced Scala/Spark developer is needed here.