Skip to content

Commit

Permalink
addressed few review comments
Browse files Browse the repository at this point in the history
1. default Exec name
2. added argument to GenericParser constructor
  • Loading branch information
nartal1 committed Nov 4, 2024
1 parent 88e9c57 commit 2dc2146
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 74 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ class GenericExecParser(
val node: SparkPlanGraphNode,
val checker: PluginTypeChecker,
val sqlID: Long,
val execName: Option[String] = None,
val expressionFunction: Option[String => Array[String]] = None,
val app: Option[AppBase] = None
) extends ExecParser {

val fullExecName: String = node.name + "Exec"
val fullExecName: String = execName.getOrElse(node.name + "Exec")

override def parse: ExecInfo = {
val duration = computeDuration
Expand Down Expand Up @@ -99,9 +100,11 @@ object GenericExecParser {
node: SparkPlanGraphNode,
checker: PluginTypeChecker,
sqlID: Long,
execName: Option[String] = None,
expressionFunction: Option[String => Array[String]] = None,
app: Option[AppBase] = None
): GenericExecParser = {
new GenericExecParser(node, checker, sqlID, expressionFunction, app)
val fullExecName = execName.getOrElse(node.name + "Exec")
new GenericExecParser(node, checker, sqlID, Some(fullExecName), expressionFunction, app)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ case class ObjectHashAggregateExecParser(
override val checker: PluginTypeChecker,
override val sqlID: Long,
appParam: AppBase) extends
GenericExecParser(node, checker, sqlID) with Logging {
GenericExecParser(node, checker, sqlID, app = Some(appParam)) with Logging {
override def computeDuration: Option[Long] = {
val accumId = node.metrics.find(_.name == "time in aggregation build").map(_.accumulatorId)
SQLPlanParser.getTotalDuration(accumId, appParam)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.SparkPlanInfo
//import org.apache.spark.sql.execution.joins.CartesianProductExec
import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphNode}
import org.apache.spark.sql.rapids.tool.{AppBase, BuildSide, ExecHelper, JoinType, RDDCheckHelper, ToolUtils, UnsupportedExpr}
import org.apache.spark.sql.rapids.tool.util.ToolsPlanGraph
Expand Down Expand Up @@ -463,10 +464,12 @@ object SQLPlanParser extends Logging {
app: AppBase): ExecInfo = {
val normalizedNodeName = node.name.stripSuffix("$")
normalizedNodeName match {
case "AggregateInPandas" =>
GenericExecParser(node, checker, sqlID).parse
case "ArrowEvalPython" =>
GenericExecParser(node, checker, sqlID).parse
// Generalize all the execs that call GenericExecParser in one case
case "AggregateInPandas" | "ArrowEvalPython" | "CartesianProduct" | "Coalesce"
| "CollectLimit" | "FlatMapGroupsInPandas" | "GlobalLimit" | "LocalLimit"
| "InMemoryTableScan" | "MapInPandas" | "PythonMapInArrow" | "MapInArrow" | "Range"
| "Sample" | "Union" | "WindowInPandas" =>
GenericExecParser(node, checker, sqlID, app = Some(app)).parse
case "BatchScan" =>
BatchScanExecParser(node, checker, sqlID, app).parse
case "BroadcastExchange" =>
Expand All @@ -475,68 +478,53 @@ object SQLPlanParser extends Logging {
BroadcastHashJoinExecParser(node, checker, sqlID).parse
case "BroadcastNestedLoopJoin" =>
BroadcastNestedLoopJoinExecParser(node, checker, sqlID).parse
case "CartesianProduct" =>
GenericExecParser(node, checker, sqlID).parse
case "Coalesce" =>
GenericExecParser(node, checker, sqlID).parse
case "CollectLimit" =>
GenericExecParser(node, checker, sqlID).parse
// This is called either AQEShuffleRead and CustomShuffleReader depending
// on the Spark version, our supported ops list it as CustomShuffleReader
case "CustomShuffleReader" | "AQEShuffleRead" =>
CustomShuffleReaderExecParser(node, checker, sqlID).parse
GenericExecParser(
node, checker, sqlID, execName = Some("CustomShuffleReaderExec")).parse
case "Exchange" =>
ShuffleExchangeExecParser(node, checker, sqlID, app).parse
case "Expand" =>
GenericExecParser(node, checker, sqlID, Some(parseExpandExpressions)).parse
GenericExecParser(
node, checker, sqlID, expressionFunction = Some(parseExpandExpressions)).parse
case "Filter" =>
GenericExecParser(node, checker, sqlID, Some(parseFilterExpressions)).parse
case "FlatMapGroupsInPandas" =>
GenericExecParser(node, checker, sqlID).parse
GenericExecParser(
node, checker, sqlID, expressionFunction = Some(parseFilterExpressions)).parse
case "Generate" =>
GenericExecParser(node, checker, sqlID, Some(parseGenerateExpressions)).parse
case "GlobalLimit" =>
GenericExecParser(node, checker, sqlID).parse
GenericExecParser(
node, checker, sqlID, expressionFunction = Some(parseGenerateExpressions)).parse
case "HashAggregate" =>
HashAggregateExecParser(node, checker, sqlID, app).parse
case "LocalLimit" =>
GenericExecParser(node, checker, sqlID).parse
case "InMemoryTableScan" =>
GenericExecParser(node, checker, sqlID).parse
case i if DataWritingCommandExecParser.isWritingCmdExec(i) =>
DataWritingCommandExecParser.parseNode(node, checker, sqlID)
case "MapInPandas" =>
GenericExecParser(node, checker, sqlID).parse
case "ObjectHashAggregate" =>
ObjectHashAggregateExecParser(node, checker, sqlID, app).parse
ObjectHashAggregateExecParser(node, checker, sqlID, appParam = app).parse
case "Project" =>
GenericExecParser(node, checker, sqlID, Some(parseProjectExpressions)).parse
case "PythonMapInArrow" | "MapInArrow" =>
GenericExecParser(node, checker, sqlID).parse
case "Range" =>
GenericExecParser(node, checker, sqlID).parse
case "Sample" =>
GenericExecParser(node, checker, sqlID).parse
GenericExecParser(
node, checker, sqlID, expressionFunction = Some(parseProjectExpressions)).parse
case "ShuffledHashJoin" =>
ShuffledHashJoinExecParser(node, checker, sqlID, app).parse
case "Sort" =>
GenericExecParser(node, checker, sqlID, Some(parseSortExpressions)).parse
GenericExecParser(
node, checker, sqlID, expressionFunction = Some(parseSortExpressions)).parse
case s if ReadParser.isScanNode(s) =>
FileSourceScanExecParser(node, checker, sqlID, app).parse
case "SortAggregate" =>
GenericExecParser(node, checker, sqlID, Some(parseAggregateExpressions)).parse
GenericExecParser(
node, checker, sqlID, expressionFunction = Some(parseAggregateExpressions)).parse
case smj if SortMergeJoinExecParser.accepts(smj) =>
SortMergeJoinExecParser(node, checker, sqlID).parse
case "SubqueryBroadcast" =>
SubqueryBroadcastExecParser(node, checker, sqlID, app).parse
case sqe if SubqueryExecParser.accepts(sqe) =>
SubqueryExecParser.parseNode(node, checker, sqlID, app)
case "TakeOrderedAndProject" =>
GenericExecParser(node, checker, sqlID, Some(parseTakeOrderedExpressions)).parse
case "Union" =>
GenericExecParser(node, checker, sqlID).parse
GenericExecParser(
node, checker, sqlID, expressionFunction = Some(parseTakeOrderedExpressions)).parse
case "Window" =>
GenericExecParser(node, checker, sqlID, Some(parseWindowExpressions)).parse
case "WindowInPandas" =>
GenericExecParser(node, checker, sqlID).parse
GenericExecParser(
node, checker, sqlID, expressionFunction = Some(parseWindowExpressions)).parse
case "WindowGroupLimit" =>
WindowGroupLimitParser(node, checker, sqlID).parse
case wfe if WriteFilesExecParser.accepts(wfe) =>
Expand Down

0 comments on commit 2dc2146

Please sign in to comment.