Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import org.apache.gluten.tags.EnhancedFeaturesTest

import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.execution.CommandResultExec
import org.apache.spark.sql.execution.GlutenImplicits._
import org.apache.spark.sql.execution.datasources.v2.AppendDataExec
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.gluten.TestUtils

Expand Down Expand Up @@ -383,4 +385,29 @@ class VeloxIcebergSuite extends IcebergSuite {
}
}
}

test("iceberg native write fallback when validation fails - sort order") {
withTable("iceberg_sorted_tbl") {
spark.sql("CREATE TABLE iceberg_sorted_tbl (a INT, b STRING) USING iceberg")
spark.sql("ALTER TABLE iceberg_sorted_tbl WRITE ORDERED BY a")

val df = spark.sql("INSERT INTO iceberg_sorted_tbl VALUES (1, 'hello'), (2, 'world')")

// Should fallback to vanilla Spark's AppendDataExec.
val commandPlan =
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan
assert(commandPlan.isInstanceOf[AppendDataExec])
assert(!commandPlan.isInstanceOf[VeloxIcebergAppendDataExec])

checkAnswer(
spark.sql("SELECT * FROM iceberg_sorted_tbl ORDER BY a"),
Seq(Row(1, "hello"), Row(2, "world")))

// Verify fallbackSummary reports the sort order fallback reason.
val summary = df.fallbackSummary()
assert(
summary.fallbackNodeToReason.exists(
_.values.exists(_.contains("Not support write table with sort order"))))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPl
val fallbackReasons = events.flatMap(_.fallbackNodeToReason.values)
assert(fallbackReasons.nonEmpty)
assert(
fallbackReasons.forall(
fallbackReasons.exists(
_.contains("[FallbackByUserOptions] Validation failed on node Sort")))
} finally {
spark.sparkContext.removeSparkListener(listener)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.plugin.PluginContext
import org.apache.spark.internal.Logging
import org.apache.spark.softaffinity.SoftAffinityListener
import org.apache.spark.sql.execution.GlutenQueryExecutionListener
import org.apache.spark.sql.execution.adaptive.GlutenCostEvaluator
import org.apache.spark.sql.execution.ui.{GlutenSQLAppStatusListener, GlutenUIUtils}
import org.apache.spark.sql.internal.SparkConfigUtil._
Expand All @@ -45,6 +46,7 @@ trait SubstraitBackend extends Backend with Logging {

// Register Gluten listeners
GlutenSQLAppStatusListener.register(sc)
GlutenQueryExecutionListener.register(sc)
if (conf.get(GLUTEN_SOFT_AFFINITY_ENABLED)) {
SoftAffinityListener.register(sc)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
def collect(tmp: QueryPlan[_]): Unit = {
tmp.foreachUp {
case _: ExecutedCommandExec =>
case _: CommandResultExec =>
case cmd: CommandResultExec => collect(cmd.commandPhysicalPlan)
case p: V2CommandExec
if FallbackTags.nonEmpty(p) ||
p.logicalLink.exists(FallbackTags.getOption(_).nonEmpty) =>
handleVanillaSparkPlan(p, fallbackNodeToReason)
case _: V2CommandExec =>
case _: DataWritingCommandExec =>
case _: WholeStageCodegenExec =>
Expand Down Expand Up @@ -307,6 +311,14 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
plan.foreachUp {
case _: WholeStageCodegenExec =>
case _: InputAdapter =>
case cmd: CommandResultExec =>
currentOperationID = generateOperatorIDs(
cmd.commandPhysicalPlan,
currentOperationID,
visited,
reusedExchanges,
addReusedExchanges)
setOpId(cmd)
case p: AdaptiveSparkPlanExec =>
currentOperationID = generateOperatorIDs(
p.executedPlan,
Expand Down Expand Up @@ -353,6 +365,8 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
getSubqueries(a.executedPlan, subqueries)
case q: QueryStageExec =>
getSubqueries(q.plan, subqueries)
case cmd: CommandResultExec =>
getSubqueries(cmd.commandPhysicalPlan, subqueries)
case p: SparkPlan =>
p.expressions.foreach(_.collect {
case e: PlanExpression[_] =>
Expand Down Expand Up @@ -383,6 +397,7 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
plan.foreach {
case p: AdaptiveSparkPlanExec => remove(p, Seq(p.executedPlan, p.initialPlan))
case p: QueryStageExec => remove(p, Seq(p.plan))
case cmd: CommandResultExec => remove(cmd, Seq(cmd.commandPhysicalPlan))
case plan: QueryPlan[_] => remove(plan, plan.innerChildren)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,24 @@ case class GlutenFallbackReporter(glutenConf: GlutenConfig, spark: SparkSession)

private def printFallbackReason(plan: SparkPlan): Unit = {
val validationLogLevel = glutenConf.validationLogLevel
plan.foreachUp {
case _: GlutenPlan => // ignore
case p: SparkPlan if FallbackTags.nonEmpty(p) =>
val tag = FallbackTags.get(p)
logFallbackReason(validationLogLevel, p.nodeName, tag.reason())
// With in next round stage in AQE, the physical plan would be a new instance that
// can not preserve the tag, so we need to set the fallback reason to logical plan.
// Then we can be aware of the fallback reason for the whole plan.
// If a logical plan mapping to several physical plan, we add all reason into
// that logical plan to make sure we do not lose any fallback reason.
p.logicalLink.foreach(logicalPlan => FallbackTags.add(logicalPlan, tag))
case _ =>
def printPlan(p: SparkPlan): Unit = {
p.foreachUp {
case _: GlutenPlan => // ignore
case cmd: CommandResultExec =>
printPlan(cmd.commandPhysicalPlan)
case p: SparkPlan if FallbackTags.nonEmpty(p) =>
val tag = FallbackTags.get(p)
logFallbackReason(validationLogLevel, p.nodeName, tag.reason())
// With in next round stage in AQE, the physical plan would be a new instance that
// can not preserve the tag, so we need to set the fallback reason to logical plan.
// Then we can be aware of the fallback reason for the whole plan.
// If a logical plan mapping to several physical plan, we add all reason into
// that logical plan to make sure we do not lose any fallback reason.
p.logicalLink.foreach(logicalPlan => FallbackTags.add(logicalPlan, tag))
case _ =>
}
}
printPlan(plan)
}

private def postFallbackReason(plan: SparkPlan): Unit = {
Expand All @@ -91,5 +96,3 @@ case class GlutenFallbackReporter(glutenConf: GlutenConfig, spark: SparkSession)
GlutenUIUtils.postEvent(sc, event)
}
}

object GlutenFallbackReporter {}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution

import org.apache.gluten.exception.GlutenException
import org.apache.gluten.execution.{GlutenPlan, WholeStageTransformer}
import org.apache.gluten.extension.columnar.FallbackTags
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.utils.PlanUtil
import org.apache.spark.sql.{Column, Dataset, SparkSession}
Expand Down Expand Up @@ -107,7 +108,10 @@ object GlutenImplicits {
def collect(tmp: QueryPlan[_]): Unit = {
tmp.foreachUp {
case _: ExecutedCommandExec =>
case _: CommandResultExec =>
case cmd: CommandResultExec => collect(cmd.commandPhysicalPlan)
case p: V2CommandExec if FallbackTags.nonEmpty(p) ||
p.logicalLink.exists(FallbackTags.getOption(_).nonEmpty) =>
GlutenExplainUtils.handleVanillaSparkPlan(p, fallbackNodeToReason)
case _: V2CommandExec =>
case _: DataWritingCommandExec =>
case _: WholeStageCodegenExec =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import org.apache.gluten.events.GlutenPlanFallbackEvent

import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.execution.ui.{GlutenUIUtils, SparkListenerSQLExecutionEnd}

/** A SparkListener that generates complete Gluten UI data after query execution completes. */
class GlutenQueryExecutionListener(sc: SparkContext) extends SparkListener with Logging {

override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
case e: SparkListenerSQLExecutionEnd =>
onSQLExecutionEnd(e)
case _ =>
}

private def onSQLExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
try {
val qe = event.qe
if (qe == null) {
// History Server replay or edge case. Rely on per-stage events already in event log.
return
}

val summary =
GlutenImplicits.collectQueryExecutionFallbackSummary(qe.sparkSession, qe)

// Combine plan descriptions and fallback reasons from all segments.
val planStringBuilder = new StringBuilder()
planStringBuilder.append("== Physical Plan ==\n")
summary.physicalPlanDescription.foreach(planStringBuilder.append)
val combinedFallbackReasons =
summary.fallbackNodeToReason.foldLeft(Map.empty[String, String])(_ ++ _)

// Post event to listener bus. The event is serialized to event log, so History Server
// can replay it. GlutenSQLAppStatusListener receives this event and writes to kvstore.
val fallbackEvent = GlutenPlanFallbackEvent(
event.executionId,
summary.numGlutenNodes,
combinedFallbackReasons.size,
planStringBuilder.toString(),
combinedFallbackReasons
)
GlutenUIUtils.postEvent(sc, fallbackEvent)
} catch {
case e: Exception =>
logWarning(
s"Failed to generate complete fallback data for execution ${event.executionId}",
e)
}
}
}

object GlutenQueryExecutionListener {

/** Register the listener on the status queue. Should be called once during driver start. */
def register(sc: SparkContext): Unit = {
if (GlutenUIUtils.uiEnabled(sc)) {
sc.listenerBus.addToStatusQueue(new GlutenQueryExecutionListener(sc))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@ private class GlutenSQLAppStatusListener(conf: SparkConf, kvstore: ElementTracki
}

private def onGlutenPlanFallback(event: GlutenPlanFallbackEvent): Unit = {
val description = executionIdToDescription.get(event.executionId)
// Resolve description: from memory cache, or from kvstore (for complete event after END).
val description = executionIdToDescription.get(event.executionId).orElse {
try {
Some(kvstore.read(classOf[GlutenSQLExecutionUIData], event.executionId).description)
} catch {
case _: NoSuchElementException => None
}
}
if (description.isDefined) {
val uiData = new GlutenSQLExecutionUIData(
event.executionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp
events.count(
_.fallbackNodeToReason.values.toSet.exists(_.contains(
"Could not find a valid substrait mapping name for max"
))) == 2)
))) == 3)
} finally {
spark.sparkContext.removeSparkListener(listener)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp
events.count(
_.fallbackNodeToReason.values.toSet.exists(_.contains(
"Could not find a valid substrait mapping name for max"
))) == 2)
))) == 3)
} finally {
spark.sparkContext.removeSparkListener(listener)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp
events.count(
_.fallbackNodeToReason.values.toSet.exists(_.contains(
"Could not find a valid substrait mapping name for max"
))) == 2)
))) == 3)
} finally {
spark.sparkContext.removeSparkListener(listener)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp
events.count(
_.fallbackNodeToReason.values.toSet.exists(_.contains(
"Could not find a valid substrait mapping name for max"
))) == 2)
))) == 3)
} finally {
spark.sparkContext.removeSparkListener(listener)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp
events.count(
_.fallbackNodeToReason.values.toSet.exists(_.contains(
"Could not find a valid substrait mapping name for max"
))) == 2)
))) == 3)
} finally {
spark.sparkContext.removeSparkListener(listener)
}
Expand Down
Loading