Skip to content

Commit

Permalink
Fix missing exec-to-stageId mapping in Qual tool (#1437)
Browse files Browse the repository at this point in the history
* Fix missing exec-to-stageId mapping in Qual tool

Fixes #1156

This adds logic to walk the SparkGraph in order to assign execs to
stages. For nodes that have no AccumIDs, the clusterization processes
relies on adjacent nodes.

---------

Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>
  • Loading branch information
amahussein authored Dec 3, 2024
1 parent fb72520 commit 0eb5bf5
Show file tree
Hide file tree
Showing 13 changed files with 539 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package com.nvidia.spark.rapids.tool.analysis

import scala.collection.mutable.{AbstractSet, ArrayBuffer, HashMap, LinkedHashSet}

import com.nvidia.spark.rapids.tool.planparser.SQLPlanParser
import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults}
import com.nvidia.spark.rapids.tool.qualification.QualSQLPlanAnalyzer

Expand Down Expand Up @@ -88,7 +87,8 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
// Maps stages to operators by checking for non-zero intersection
// between nodeMetrics and stageAccumulateIDs
val nodeIdToStage = planGraph.allNodes.map { node =>
val mappedStages = SQLPlanParser.getStagesInSQLNode(node, app)
val nodeAccums = node.metrics.map(_.accumulatorId)
val mappedStages = app.getStageIDsFromAccumIds(nodeAccums)
((sqlId, node.id), mappedStages)
}.toMap
sqlPlanNodeIdToStageIds ++= nodeIdToStage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.SparkPlanInfo
//import org.apache.spark.sql.execution.joins.CartesianProductExec
import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphNode}
import org.apache.spark.sql.rapids.tool.{AppBase, BuildSide, ExecHelper, JoinType, RDDCheckHelper, ToolUtils, UnsupportedExpr}
import org.apache.spark.sql.rapids.tool.util.ToolsPlanGraph
Expand Down Expand Up @@ -433,22 +432,19 @@ object SQLPlanParser extends Logging {
sqlDesc: String,
checker: PluginTypeChecker,
app: AppBase): PlanInfo = {
val planGraph = ToolsPlanGraph(planInfo)
val toolsGraph = ToolsPlanGraph.createGraphWithStageClusters(planInfo, app)

// Find all the node graphs that should be excluded and send it to the parsePlanNode
val excludedNodes = buildSkippedReusedNodesForPlan(planGraph)
val excludedNodes = buildSkippedReusedNodesForPlan(toolsGraph.sparkGraph)
// we want the sub-graph nodes to be inside of the wholeStageCodeGen so use nodes
// vs allNodes
val execInfos = planGraph.nodes.flatMap { node =>
parsePlanNode(node, sqlID, checker, app, reusedNodeIds = excludedNodes)
val execInfos = toolsGraph.nodes.flatMap { node =>
parsePlanNode(node, sqlID, checker, app, reusedNodeIds = excludedNodes,
nodeIdToStagesFunc = toolsGraph.getNodeStageClusters)
}
PlanInfo(appID, sqlID, sqlDesc, execInfos)
}

def getStagesInSQLNode(node: SparkPlanGraphNode, app: AppBase): Set[Int] = {
val nodeAccums = node.metrics.map(_.accumulatorId)
nodeAccums.flatMap(app.accumManager.getAccStageIds).toSet
}

// Set containing execs that refers to other expressions. We need this to be a list to allow
// appending more execs in teh future as necessary.
// Note that Spark graph may create duplicate nodes when any of the following execs exists.
Expand Down Expand Up @@ -541,7 +537,8 @@ object SQLPlanParser extends Logging {
sqlID: Long,
checker: PluginTypeChecker,
app: AppBase,
reusedNodeIds: Set[Long]
reusedNodeIds: Set[Long],
nodeIdToStagesFunc: Long => Set[Int]
): Seq[ExecInfo] = {
// Avoid counting duplicate nodes. We mark them as shouldRemove to neutralize their impact on
// speedups.
Expand All @@ -560,9 +557,11 @@ object SQLPlanParser extends Logging {
// For WholeStageCodegen clusters, use PhotonStageExecParser if the cluster is of Photon type.
// Else, fall back to WholeStageExecParser to parse the cluster.
case photonCluster: PhotonSparkPlanGraphCluster =>
PhotonStageExecParser(photonCluster, checker, sqlID, app, reusedNodeIds).parse
PhotonStageExecParser(photonCluster, checker, sqlID, app, reusedNodeIds,
nodeIdToStagesFunc = nodeIdToStagesFunc).parse
case cluster: SparkPlanGraphCluster =>
WholeStageExecParser(cluster, checker, sqlID, app, reusedNodeIds).parse
WholeStageExecParser(cluster, checker, sqlID, app, reusedNodeIds,
nodeIdToStagesFunc = nodeIdToStagesFunc).parse
case _ =>
// For individual nodes, use PhotonPlanParser if the node is of Photon type.
// Else, fall back to the Spark node parsing logic to parse the node.
Expand All @@ -587,7 +586,7 @@ object SQLPlanParser extends Logging {
ExecInfo(node, sqlID, normalizedNodeName, expr = "", 1, duration = None, node.id,
isSupported = false, None)
}
val stagesInNode = getStagesInSQLNode(node, app)
val stagesInNode = nodeIdToStagesFunc(node.id)
execInfo.setStages(stagesInNode)
// shouldRemove is set to true if the exec is a member of "execsToBeRemoved" or if the node
// is a duplicate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ abstract class WholeStageExecParserBase(
checker: PluginTypeChecker,
sqlID: Long,
app: AppBase,
reusedNodeIds: Set[Long]) extends Logging {
reusedNodeIds: Set[Long],
nodeIdToStagesFunc: Long => Set[Int]) extends Logging {

val fullExecName = "WholeStageCodegenExec"

Expand All @@ -38,31 +39,27 @@ abstract class WholeStageExecParserBase(
// Perhaps take the max of those in Stage?
val accumId = node.metrics.find(_.name == "duration").map(_.accumulatorId)
val maxDuration = SQLPlanParser.getTotalDuration(accumId, app)
val stagesInNode = SQLPlanParser.getStagesInSQLNode(node, app)
val stagesInNode = nodeIdToStagesFunc.apply(node.id)
// We could skip the entire wholeStage if it is duplicate; but we will lose the information of
// the children nodes.
val isDupNode = reusedNodeIds.contains(node.id)
val childNodes = node.nodes.flatMap { c =>
SQLPlanParser.parsePlanNode(c, sqlID, checker, app, reusedNodeIds)
// Pass the nodeToStagesFunc to the child nodes so they can get the stages.
SQLPlanParser.parsePlanNode(c, sqlID, checker, app, reusedNodeIds,
nodeIdToStagesFunc = nodeIdToStagesFunc)
}
// For the childNodes, we need to append the stages. Otherwise, nodes without metrics won't be
// assigned to stage
childNodes.foreach(_.appendToStages(stagesInNode))
// if any of the execs in WholeStageCodegen supported mark this entire thing as supported
val anySupported = childNodes.exists(_.isSupported == true)
val unSupportedExprsArray =
childNodes.filter(_.unsupportedExprs.nonEmpty).flatMap(x => x.unsupportedExprs).toArray
// average speedup across the execs in the WholeStageCodegen for now
val supportedChildren = childNodes.filterNot(_.shouldRemove)
val avSpeedupFactor = SQLPlanParser.averageSpeedup(supportedChildren.map(_.speedupFactor))
// can't rely on the wholeStagecodeGen having a stage if children do so aggregate them together
// for now
val allStagesIncludingChildren = childNodes.flatMap(_.stages).toSet ++ stagesInNode.toSet
// Finally, the node should be marked as shouldRemove when all the children of the
// The node should be marked as shouldRemove when all the children of the
// wholeStageCodeGen are marked as shouldRemove.
val removeNode = isDupNode || childNodes.forall(_.shouldRemove)
val execInfo = ExecInfo(node, sqlID, node.name, node.name, avSpeedupFactor, maxDuration,
node.id, anySupported, Some(childNodes), allStagesIncludingChildren,
node.id, anySupported, Some(childNodes), stagesInNode,
shouldRemove = removeNode, unsupportedExprs = unSupportedExprsArray)
Seq(execInfo)
}
Expand All @@ -73,5 +70,6 @@ case class WholeStageExecParser(
checker: PluginTypeChecker,
sqlID: Long,
app: AppBase,
reusedNodeIds: Set[Long])
extends WholeStageExecParserBase(node, checker, sqlID, app, reusedNodeIds)
reusedNodeIds: Set[Long],
nodeIdToStagesFunc: Long => Set[Int])
extends WholeStageExecParserBase(node, checker, sqlID, app, reusedNodeIds, nodeIdToStagesFunc)
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,6 @@ case class PhotonStageExecParser(
checker: PluginTypeChecker,
sqlID: Long,
app: AppBase,
reusedNodeIds: Set[Long])
extends WholeStageExecParserBase(node, checker, sqlID, app, reusedNodeIds)
reusedNodeIds: Set[Long],
nodeIdToStagesFunc: Long => Set[Int])
extends WholeStageExecParserBase(node, checker, sqlID, app, reusedNodeIds, nodeIdToStagesFunc)
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.rapids.tool

/**
* Trait that defines the interface for retrieving stage IDs from accumulables.
* This is used to map accumulables to stages. We use it as interface in order to allow to separate
* the logic and use dummy different implementations and mocks for testing when needed.
*/
trait AccumToStageRetriever {
/**
* Given a sequence of accumIds, return a set of stage IDs that are associated with the
* accumIds. Note that this method can only be called after the accumulables have been fully
* processed.
*/
def getStageIDsFromAccumIds(accumIds: Seq[Long]): Set[Int]
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ import org.apache.spark.util.Utils

abstract class AppBase(
val eventLogInfo: Option[EventLogInfo],
val hadoopConf: Option[Configuration]) extends Logging with ClusterTagPropHandler {
val hadoopConf: Option[Configuration]) extends Logging
with ClusterTagPropHandler
with AccumToStageRetriever {

var appMetaData: Option[AppMetaData] = None

Expand Down Expand Up @@ -105,6 +107,10 @@ abstract class AppBase(

def sqlPlans: immutable.Map[Long, SparkPlanInfo] = sqlManager.getPlanInfos

def getStageIDsFromAccumIds(accumIds: Seq[Long]): Set[Int] = {
accumIds.flatMap(accumManager.getAccStageIds).toSet
}

// Returns the String value of the eventlog or empty if it is not defined. Note that the eventlog
// won't be defined for running applications
def getEventLogPath: String = {
Expand Down
Loading

0 comments on commit 0eb5bf5

Please sign in to comment.