diff --git a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala index 888ccf6ec0c9..a51f2d771756 100644 --- a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala +++ b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala @@ -21,6 +21,8 @@ import org.apache.gluten.tags.EnhancedFeaturesTest import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.execution.CommandResultExec +import org.apache.spark.sql.execution.GlutenImplicits._ +import org.apache.spark.sql.execution.datasources.v2.AppendDataExec import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.gluten.TestUtils @@ -383,4 +385,29 @@ class VeloxIcebergSuite extends IcebergSuite { } } } + + test("iceberg native write fallback when validation fails - sort order") { + withTable("iceberg_sorted_tbl") { + spark.sql("CREATE TABLE iceberg_sorted_tbl (a INT, b STRING) USING iceberg") + spark.sql("ALTER TABLE iceberg_sorted_tbl WRITE ORDERED BY a") + + val df = spark.sql("INSERT INTO iceberg_sorted_tbl VALUES (1, 'hello'), (2, 'world')") + + // Should fallback to vanilla Spark's AppendDataExec. + val commandPlan = + df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan + assert(commandPlan.isInstanceOf[AppendDataExec]) + assert(!commandPlan.isInstanceOf[VeloxIcebergAppendDataExec]) + + checkAnswer( + spark.sql("SELECT * FROM iceberg_sorted_tbl ORDER BY a"), + Seq(Row(1, "hello"), Row(2, "world"))) + + // Verify fallbackSummary reports the sort order fallback reason. + val summary = df.fallbackSummary() + assert( + summary.fallbackNodeToReason.exists( + _.values.exists(_.contains("Not support write table with sort order")))) + } + } } diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala index 29b047095135..6d2d4b88984b 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala @@ -345,7 +345,7 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPl val fallbackReasons = events.flatMap(_.fallbackNodeToReason.values) assert(fallbackReasons.nonEmpty) assert( - fallbackReasons.forall( + fallbackReasons.exists( _.contains("[FallbackByUserOptions] Validation failed on node Sort"))) } finally { spark.sparkContext.removeSparkListener(listener) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala index 89b90af91f16..e42aed58a1cb 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala @@ -28,6 +28,7 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.api.plugin.PluginContext import org.apache.spark.internal.Logging import org.apache.spark.softaffinity.SoftAffinityListener +import org.apache.spark.sql.execution.GlutenQueryExecutionListener import org.apache.spark.sql.execution.adaptive.GlutenCostEvaluator import org.apache.spark.sql.execution.ui.{GlutenSQLAppStatusListener, GlutenUIUtils} import org.apache.spark.sql.internal.SparkConfigUtil._ @@ -45,6 +46,7 @@ trait SubstraitBackend extends Backend with Logging { // Register Gluten listeners GlutenSQLAppStatusListener.register(sc) + GlutenQueryExecutionListener.register(sc) if (conf.get(GLUTEN_SOFT_AFFINITY_ENABLED)) { SoftAffinityListener.register(sc) } diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala index 1e5beeb7c1c3..dd115efa542b 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala @@ -91,7 +91,11 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { def collect(tmp: QueryPlan[_]): Unit = { tmp.foreachUp { case _: ExecutedCommandExec => - case _: CommandResultExec => + case cmd: CommandResultExec => collect(cmd.commandPhysicalPlan) + case p: V2CommandExec + if FallbackTags.nonEmpty(p) || + p.logicalLink.exists(FallbackTags.getOption(_).nonEmpty) => + handleVanillaSparkPlan(p, fallbackNodeToReason) case _: V2CommandExec => case _: DataWritingCommandExec => case _: WholeStageCodegenExec => @@ -307,6 +311,14 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { plan.foreachUp { case _: WholeStageCodegenExec => case _: InputAdapter => + case cmd: CommandResultExec => + currentOperationID = generateOperatorIDs( + cmd.commandPhysicalPlan, + currentOperationID, + visited, + reusedExchanges, + addReusedExchanges) + setOpId(cmd) case p: AdaptiveSparkPlanExec => currentOperationID = generateOperatorIDs( p.executedPlan, @@ -353,6 +365,8 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { getSubqueries(a.executedPlan, subqueries) case q: QueryStageExec => getSubqueries(q.plan, subqueries) + case cmd: CommandResultExec => + getSubqueries(cmd.commandPhysicalPlan, subqueries) case p: SparkPlan => p.expressions.foreach(_.collect { case e: PlanExpression[_] => @@ -383,6 +397,7 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { plan.foreach { case p: AdaptiveSparkPlanExec => remove(p, Seq(p.executedPlan, p.initialPlan)) case p: QueryStageExec => remove(p, Seq(p.plan)) + case cmd: CommandResultExec => remove(cmd, Seq(cmd.commandPhysicalPlan)) case plan: QueryPlan[_] => remove(plan, plan.innerChildren) } } diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala index 99c34122d2c3..0ef0b6a28c3b 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala @@ -55,19 +55,24 @@ case class GlutenFallbackReporter(glutenConf: GlutenConfig, spark: SparkSession) private def printFallbackReason(plan: SparkPlan): Unit = { val validationLogLevel = glutenConf.validationLogLevel - plan.foreachUp { - case _: GlutenPlan => // ignore - case p: SparkPlan if FallbackTags.nonEmpty(p) => - val tag = FallbackTags.get(p) - logFallbackReason(validationLogLevel, p.nodeName, tag.reason()) - // With in next round stage in AQE, the physical plan would be a new instance that - // can not preserve the tag, so we need to set the fallback reason to logical plan. - // Then we can be aware of the fallback reason for the whole plan. - // If a logical plan mapping to several physical plan, we add all reason into - // that logical plan to make sure we do not lose any fallback reason. - p.logicalLink.foreach(logicalPlan => FallbackTags.add(logicalPlan, tag)) - case _ => + def printPlan(p: SparkPlan): Unit = { + p.foreachUp { + case _: GlutenPlan => // ignore + case cmd: CommandResultExec => + printPlan(cmd.commandPhysicalPlan) + case p: SparkPlan if FallbackTags.nonEmpty(p) => + val tag = FallbackTags.get(p) + logFallbackReason(validationLogLevel, p.nodeName, tag.reason()) + // With in next round stage in AQE, the physical plan would be a new instance that + // can not preserve the tag, so we need to set the fallback reason to logical plan. + // Then we can be aware of the fallback reason for the whole plan. + // If a logical plan mapping to several physical plan, we add all reason into + // that logical plan to make sure we do not lose any fallback reason. + p.logicalLink.foreach(logicalPlan => FallbackTags.add(logicalPlan, tag)) + case _ => + } } + printPlan(plan) } private def postFallbackReason(plan: SparkPlan): Unit = { @@ -91,5 +96,3 @@ case class GlutenFallbackReporter(glutenConf: GlutenConfig, spark: SparkSession) GlutenUIUtils.postEvent(sc, event) } } - -object GlutenFallbackReporter {} diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala index 48710a9edff5..196de672f7f6 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import org.apache.gluten.exception.GlutenException import org.apache.gluten.execution.{GlutenPlan, WholeStageTransformer} +import org.apache.gluten.extension.columnar.FallbackTags import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.utils.PlanUtil import org.apache.spark.sql.{Column, Dataset, SparkSession} @@ -107,7 +108,10 @@ object GlutenImplicits { def collect(tmp: QueryPlan[_]): Unit = { tmp.foreachUp { case _: ExecutedCommandExec => - case _: CommandResultExec => + case cmd: CommandResultExec => collect(cmd.commandPhysicalPlan) + case p: V2CommandExec if FallbackTags.nonEmpty(p) || + p.logicalLink.exists(FallbackTags.getOption(_).nonEmpty) => + GlutenExplainUtils.handleVanillaSparkPlan(p, fallbackNodeToReason) case _: V2CommandExec => case _: DataWritingCommandExec => case _: WholeStageCodegenExec => diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenQueryExecutionListener.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenQueryExecutionListener.scala new file mode 100644 index 000000000000..32f7ef2cd002 --- /dev/null +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenQueryExecutionListener.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.gluten.events.GlutenPlanFallbackEvent + +import org.apache.spark.SparkContext +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} +import org.apache.spark.sql.execution.ui.{GlutenUIUtils, SparkListenerSQLExecutionEnd} + +/** A SparkListener that generates complete Gluten UI data after query execution completes. */ +class GlutenQueryExecutionListener(sc: SparkContext) extends SparkListener with Logging { + + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { + case e: SparkListenerSQLExecutionEnd => + onSQLExecutionEnd(e) + case _ => + } + + private def onSQLExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = { + try { + val qe = event.qe + if (qe == null) { + // History Server replay or edge case. Rely on per-stage events already in event log. + return + } + + val summary = + GlutenImplicits.collectQueryExecutionFallbackSummary(qe.sparkSession, qe) + + // Combine plan descriptions and fallback reasons from all segments. + val planStringBuilder = new StringBuilder() + planStringBuilder.append("== Physical Plan ==\n") + summary.physicalPlanDescription.foreach(planStringBuilder.append) + val combinedFallbackReasons = + summary.fallbackNodeToReason.foldLeft(Map.empty[String, String])(_ ++ _) + + // Post event to listener bus. The event is serialized to event log, so History Server + // can replay it. GlutenSQLAppStatusListener receives this event and writes to kvstore. + val fallbackEvent = GlutenPlanFallbackEvent( + event.executionId, + summary.numGlutenNodes, + combinedFallbackReasons.size, + planStringBuilder.toString(), + combinedFallbackReasons + ) + GlutenUIUtils.postEvent(sc, fallbackEvent) + } catch { + case e: Exception => + logWarning( + s"Failed to generate complete fallback data for execution ${event.executionId}", + e) + } + } +} + +object GlutenQueryExecutionListener { + + /** Register the listener on the status queue. Should be called once during driver start. */ + def register(sc: SparkContext): Unit = { + if (GlutenUIUtils.uiEnabled(sc)) { + sc.listenerBus.addToStatusQueue(new GlutenQueryExecutionListener(sc)) + } + } +} diff --git a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala index a2f608f64d2e..fe516f730058 100644 --- a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala +++ b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala @@ -42,7 +42,14 @@ private class GlutenSQLAppStatusListener(conf: SparkConf, kvstore: ElementTracki } private def onGlutenPlanFallback(event: GlutenPlanFallbackEvent): Unit = { - val description = executionIdToDescription.get(event.executionId) + // Resolve description: from memory cache, or from kvstore (for complete event after END). + val description = executionIdToDescription.get(event.executionId).orElse { + try { + Some(kvstore.read(classOf[GlutenSQLExecutionUIData], event.executionId).description) + } catch { + case _: NoSuchElementException => None + } + } if (description.isDefined) { val uiData = new GlutenSQLExecutionUIData( event.executionId, diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala index da6d58d34c4a..2da437c45dcf 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala @@ -233,7 +233,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp events.count( _.fallbackNodeToReason.values.toSet.exists(_.contains( "Could not find a valid substrait mapping name for max" - ))) == 2) + ))) == 3) } finally { spark.sparkContext.removeSparkListener(listener) } diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala index df84620716fe..f4e6e8203b80 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala @@ -224,7 +224,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp events.count( _.fallbackNodeToReason.values.toSet.exists(_.contains( "Could not find a valid substrait mapping name for max" - ))) == 2) + ))) == 3) } finally { spark.sparkContext.removeSparkListener(listener) } diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala index df84620716fe..f4e6e8203b80 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala @@ -224,7 +224,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp events.count( _.fallbackNodeToReason.values.toSet.exists(_.contains( "Could not find a valid substrait mapping name for max" - ))) == 2) + ))) == 3) } finally { spark.sparkContext.removeSparkListener(listener) } diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala index df84620716fe..f4e6e8203b80 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala @@ -224,7 +224,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp events.count( _.fallbackNodeToReason.values.toSet.exists(_.contains( "Could not find a valid substrait mapping name for max" - ))) == 2) + ))) == 3) } finally { spark.sparkContext.removeSparkListener(listener) } diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala index df84620716fe..f4e6e8203b80 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala @@ -224,7 +224,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp events.count( _.fallbackNodeToReason.values.toSet.exists(_.contains( "Could not find a valid substrait mapping name for max" - ))) == 2) + ))) == 3) } finally { spark.sparkContext.removeSparkListener(listener) }