From 61aca57a2733d85e57a07330829c6e70ac9527a6 Mon Sep 17 00:00:00 2001 From: shrirangmhalgi Date: Fri, 15 May 2026 00:45:32 -0700 Subject: [PATCH 1/2] [SPARK-48091][SQL] Preserve aliases inside lambda when ExtractGenerator restructures plan ExtractGenerator called trimNonTopLevelAliases on all project list items before extracting the generator. This stripped aliases inside lambda functions (e.g., struct(x.as("data"))) before they could be resolved into struct field names by CreateStruct. Now only uses trimNonTopLevelAliases for pattern matching to detect generators, but preserves the original untrimmed expression for non-generator project items. --- .../sql/catalyst/analysis/Analyzer.scala | 10 +++--- .../spark/sql/GeneratorFunctionSuite.scala | 31 +++++++++++++++++++ 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f31354179674e..c7fe055ec2f6c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3260,9 +3260,11 @@ class Analyzer( // The star will be expanded differently if we insert `Generate` under `Project` too early. case p @ Project(projectList, child) if !projectList.exists(_.exists(_.isInstanceOf[Star])) => val (resolvedGenerator, newProjectList) = projectList - .map(trimNonTopLevelAliases) .foldLeft((None: Option[Generate], Nil: Seq[NamedExpression])) { (res, e) => - e match { + // SPARK-48091: Only trim aliases on the generator expression itself. Trimming + // non-generator expressions strips aliases inside lambda functions (e.g., + // struct(x.as("data"))) before they can be resolved into struct field names. + trimNonTopLevelAliases(e) match { // If there are more than one generator, we only rewrite the first one and wait for // the next analyzer iteration to rewrite the next one. case AliasedGenerator(generator, names, outer) if res._1.isEmpty && @@ -3275,8 +3277,8 @@ class Analyzer( generatorOutput = GeneratorResolution.makeGeneratorOutput(generator, names), child) (Some(g), res._2 ++ g.nullableOutput) - case other => - (res._1, res._2 :+ other) + case _ => + (res._1, res._2 :+ e) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala index 015ea9defae94..8469397247d98 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala @@ -765,6 +765,37 @@ class GeneratorFunctionSuite extends SharedSparkSession { Seq(Row(0, 10, 0, 10), Row(1, 20, 1, 20)) ) } + + test("SPARK-48091: explode with transform should preserve struct field aliases") { + val df = spark.createDataFrame(Seq((1, Array(1, 2, 3), Array(4, 5, 6)))) + .toDF("id", "my_array", "my_array2") + + // Without explode - aliases should work (baseline) + val good = df.select( + transform(col("my_array2"), x => struct(x.as("data"))).as("my_struct") + ) + assert(good.schema("my_struct").dataType.asInstanceOf[types.ArrayType] + .elementType.asInstanceOf[StructType].fieldNames.toSeq === Seq("data")) + + // With explode in same select - aliases should still be preserved + val result = df.select( + explode(col("my_array")).as("exploded"), + transform(col("my_array2"), x => struct(x.as("data"))).as("my_struct") + ) + assert(result.schema("my_struct").dataType.asInstanceOf[types.ArrayType] + .elementType.asInstanceOf[StructType].fieldNames.toSeq === Seq("data")) + + // Multiple aliases inside struct + val result2 = df.select( + explode(col("my_array")).as("exploded"), + transform(col("my_array2"), + x => struct(x.as("value"), col("id").as("key")) + ).as("my_struct") + ) + val fields2 = result2.schema("my_struct").dataType.asInstanceOf[types.ArrayType] + .elementType.asInstanceOf[StructType].fieldNames.toSeq + assert(fields2 === Seq("value", "key")) + } } case class EmptyGenerator() extends Generator with LeafLike[Expression] { From dcd5fae3c4ad74e3eaf3db556dfff99b2cd74a81 Mon Sep 17 00:00:00 2001 From: shrirangmhalgi Date: Thu, 21 May 2026 14:48:41 -0700 Subject: [PATCH 2/2] Address review: move fix to trimAliases, revert local ExtractGenerator workaround Per cloud-fan suggestion, moved the fix from ExtractGenerator to AliasHelper.trimAliases. Added UnresolvedFunction skip case to preserve alias children that carry struct field names. Also fixed ArrayType import nit in test. --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 ++++------ .../spark/sql/catalyst/expressions/AliasHelper.scala | 6 +++++- .../org/apache/spark/sql/GeneratorFunctionSuite.scala | 8 ++++---- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index c7fe055ec2f6c..f31354179674e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3260,11 +3260,9 @@ class Analyzer( // The star will be expanded differently if we insert `Generate` under `Project` too early. case p @ Project(projectList, child) if !projectList.exists(_.exists(_.isInstanceOf[Star])) => val (resolvedGenerator, newProjectList) = projectList + .map(trimNonTopLevelAliases) .foldLeft((None: Option[Generate], Nil: Seq[NamedExpression])) { (res, e) => - // SPARK-48091: Only trim aliases on the generator expression itself. Trimming - // non-generator expressions strips aliases inside lambda functions (e.g., - // struct(x.as("data"))) before they can be resolved into struct field names. - trimNonTopLevelAliases(e) match { + e match { // If there are more than one generator, we only rewrite the first one and wait for // the next analyzer iteration to rewrite the next one. case AliasedGenerator(generator, names, outer) if res._1.isEmpty && @@ -3277,8 +3275,8 @@ class Analyzer( generatorOutput = GeneratorResolution.makeGeneratorOutput(generator, names), child) (Some(g), res._2 ++ g.nullableOutput) - case _ => - (res._1, res._2 :+ e) + case other => + (res._1, res._2 :+ other) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AliasHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AliasHelper.scala index 2340385dcdd66..f1cb20ca40619 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AliasHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AliasHelper.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import scala.annotation.tailrec -import org.apache.spark.sql.catalyst.analysis.MultiAlias +import org.apache.spark.sql.catalyst.analysis.{MultiAlias, UnresolvedFunction} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Project} import org.apache.spark.sql.catalyst.trees.CurrentOrigin @@ -112,6 +112,10 @@ trait AliasHelper { } protected def trimAliases(e: Expression): Expression = e match { + // SPARK-48091: Do not descend into unresolved function calls. Aliases inside them + // (e.g., UnresolvedFunction("struct", Seq(Alias(x, "data")))) carry semantic information + // that ResolveFunctions -> CreateStruct.apply consumes to produce field names. + case u: UnresolvedFunction => u // The children of `CreateNamedStruct` may use `Alias` to carry metadata and we should not // trim them. case c: CreateNamedStruct => c.mapChildren { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala index 8469397247d98..58f399bf797f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.trees.LeafLike import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.types.{ArrayType, IntegerType, StructType} class GeneratorFunctionSuite extends SharedSparkSession { import testImplicits._ @@ -774,7 +774,7 @@ class GeneratorFunctionSuite extends SharedSparkSession { val good = df.select( transform(col("my_array2"), x => struct(x.as("data"))).as("my_struct") ) - assert(good.schema("my_struct").dataType.asInstanceOf[types.ArrayType] + assert(good.schema("my_struct").dataType.asInstanceOf[ArrayType] .elementType.asInstanceOf[StructType].fieldNames.toSeq === Seq("data")) // With explode in same select - aliases should still be preserved @@ -782,7 +782,7 @@ class GeneratorFunctionSuite extends SharedSparkSession { explode(col("my_array")).as("exploded"), transform(col("my_array2"), x => struct(x.as("data"))).as("my_struct") ) - assert(result.schema("my_struct").dataType.asInstanceOf[types.ArrayType] + assert(result.schema("my_struct").dataType.asInstanceOf[ArrayType] .elementType.asInstanceOf[StructType].fieldNames.toSeq === Seq("data")) // Multiple aliases inside struct @@ -792,7 +792,7 @@ class GeneratorFunctionSuite extends SharedSparkSession { x => struct(x.as("value"), col("id").as("key")) ).as("my_struct") ) - val fields2 = result2.schema("my_struct").dataType.asInstanceOf[types.ArrayType] + val fields2 = result2.schema("my_struct").dataType.asInstanceOf[ArrayType] .elementType.asInstanceOf[StructType].fieldNames.toSeq assert(fields2 === Seq("value", "key")) }