From 2bb2643e53b2bada1e86b866c29f734a2d5f8f6a Mon Sep 17 00:00:00 2001 From: "Ahmed Hussein (amahussein)" Date: Mon, 2 Dec 2024 11:35:50 -0600 Subject: [PATCH 1/8] first iterations all the UTs pass Signed-off-by: Ahmed Hussein (amahussein) --- .../tool/planparser/BatchScanExecParser.scala | 4 +- .../BroadcastExchangeExecParser.scala | 2 +- .../BroadcastHashJoinExecParser.scala | 3 +- .../BroadcastNestedLoopJoinExecParser.scala | 3 +- .../DataWritingCommandExecParser.scala | 5 +- .../tool/planparser/DeltaLakeHelper.scala | 2 +- .../rapids/tool/planparser/ExecParser.scala | 5 +- .../planparser/FileSourceScanExecParser.scala | 4 +- .../tool/planparser/GenericExecParser.scala | 14 ++++-- .../tool/planparser/SQLPlanParser.scala | 42 ++++++++++------ .../ShuffleExchangeExecParser.scala | 3 +- .../ShuffledHashJoinExecParser.scala | 2 +- .../planparser/SortMergeJoinExecParser.scala | 3 +- .../SubqueryBroadcastExecParser.scala | 4 +- .../tool/planparser/SubqueryExecParser.scala | 3 +- .../planparser/WholeStageExecParser.scala | 8 +++- .../planparser/WindowGroupLimitParser.scala | 9 ++-- .../planparser/WriteFilesExecParser.scala | 2 +- .../rapids/tool/planparser/ops/OpRef.scala | 48 +++++++++++++++++++ .../tool/planparser/ops/OperatorRefBase.scala | 31 ++++++++++++ .../planparser/ops/OperatorRefTrait.scala | 24 ++++++++++ .../planparser/ops/UnsupportedExprOpRef.scala | 27 +++++++++++ .../qualification/PluginTypeChecker.scala | 6 +-- .../spark/sql/rapids/tool/ToolUtils.scala | 2 - .../qualification/QualificationAppInfo.scala | 2 +- .../tool/planparser/SqlPlanParserSuite.scala | 5 +- .../PluginTypeCheckerSuite.scala | 2 +- 27 files changed, 211 insertions(+), 54 deletions(-) create mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OpRef.scala create mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorRefBase.scala create mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorRefTrait.scala create mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/UnsupportedExprOpRef.scala diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BatchScanExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BatchScanExecParser.scala index a8188065c..ac126396b 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BatchScanExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BatchScanExecParser.scala @@ -41,6 +41,8 @@ case class BatchScanExecParser( // TODO - add in parsing expressions - average speedup across? ExecInfo(node, sqlID, s"${node.name} ${readInfo.format}", s"Format: ${readInfo.format}", - overallSpeedup, maxDuration, node.id, score > 0, None) + overallSpeedup, maxDuration, node.id, score > 0, + children = None, + expressions = Seq.empty) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastExchangeExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastExchangeExecParser.scala index f9c348ea6..ea06cf7ab 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastExchangeExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastExchangeExecParser.scala @@ -46,6 +46,6 @@ case class BroadcastExchangeExecParser( } // TODO - add in parsing expressions - average speedup across? ExecInfo(node, sqlID, node.name, "", filterSpeedupFactor, - duration, node.id, isSupported, None) + duration, node.id, isSupported, children = None, expressions = Seq.empty) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastHashJoinExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastHashJoinExecParser.scala index 8426f1047..4c3a6fbd1 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastHashJoinExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastHashJoinExecParser.scala @@ -40,6 +40,7 @@ case class BroadcastHashJoinExecParser( (1.0, false) } // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) + ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, + children = None, expressions = expressions) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastNestedLoopJoinExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastNestedLoopJoinExecParser.scala index 49f242efc..488f1118e 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastNestedLoopJoinExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastNestedLoopJoinExecParser.scala @@ -52,7 +52,8 @@ abstract class BroadcastNestedLoopJoinExecParserBase( (1.0, false) } // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) + ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, + children = None, expressions = expressions) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala index fedbd2ceb..5b799c2d8 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala @@ -41,7 +41,8 @@ case class DataWritingCommandExecParser( // We do not want to parse the node description to avoid mistakenly marking the node as RDD/UDF ExecInfo.createExecNoNode(sqlID, s"${node.name.trim} ${wStub.dataFormat.toLowerCase.trim}", s"Format: ${wStub.dataFormat.toLowerCase.trim}", - finalSpeedup, duration, node.id, opType = OpTypes.WriteExec, writeSupported, None) + finalSpeedup, duration, node.id, opType = OpTypes.WriteExec, writeSupported, + children = None, expressions = Seq.empty) } } @@ -175,4 +176,4 @@ object DataWritingCommandExecParser { parsedString.split(",")(0) // return third parameter from the input string } } -} \ No newline at end of file +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DeltaLakeHelper.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DeltaLakeHelper.scala index 82ece4f45..5d25c1705 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DeltaLakeHelper.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DeltaLakeHelper.scala @@ -48,7 +48,7 @@ class DLWriteWithFormatAndSchemaParser(node: SparkPlanGraphNode, val nodeName = node.name.replace("Execute ", "") ExecInfo.createExecNoNode(sqlID, nodeName, s"Format: $dataFormat", finalSpeedupFactor, None, node.id, OpTypes.WriteExec, - isSupported = writeSupported && isExecSupported, children = None) + isSupported = writeSupported && isExecSupported, children = None, expressions = Seq.empty) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExecParser.scala index b43d9943f..c0ea0f369 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExecParser.scala @@ -16,7 +16,7 @@ package com.nvidia.spark.rapids.tool.planparser -import org.apache.spark.sql.rapids.tool.UnsupportedExpr +import com.nvidia.spark.rapids.tool.planparser.ops.UnsupportedExprOpRef trait ExecParser { def parse: ExecInfo @@ -32,5 +32,6 @@ trait ExecParser { * @param expressions Array of expression strings to evaluate for support. * @return Empty Seq[UnsupportedExpr], indicating no unsupported expressions by default. */ - def getUnsupportedExprReasonsForExec(expressions: Array[String]): Seq[UnsupportedExpr] = Seq.empty + def getUnsupportedExprReasonsForExec( + expressions: Array[String]): Seq[UnsupportedExprOpRef] = Seq.empty } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FileSourceScanExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FileSourceScanExecParser.scala index 85387872f..bcda8a1a9 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FileSourceScanExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FileSourceScanExecParser.scala @@ -46,7 +46,7 @@ case class FileSourceScanExecParser( nodeName } ExecInfo.createExecNoNode(sqlID, newNodeName, "", 1.0, duration = None, - node.id, OpTypes.ReadRDD, false, None) + node.id, OpTypes.ReadRDD, false, children = None, expressions = Seq.empty) } else { val accumId = node.metrics.find(_.name == "scan time").map(_.accumulatorId) val maxDuration = SQLPlanParser.getTotalDuration(accumId, app) @@ -64,7 +64,7 @@ case class FileSourceScanExecParser( // TODO - add in parsing expressions - average speedup across? ExecInfo.createExecNoNode(sqlID, nodeName, "", overallSpeedup, maxDuration, - node.id, OpTypes.ReadExec, score > 0, None) + node.id, OpTypes.ReadExec, score > 0, children = None, expressions = Seq.empty) } } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala index 62ffb5eaa..6295c5533 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala @@ -16,10 +16,11 @@ package com.nvidia.spark.rapids.tool.planparser +import com.nvidia.spark.rapids.tool.planparser.ops.UnsupportedExprOpRef import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker import org.apache.spark.sql.execution.ui.SparkPlanGraphNode -import org.apache.spark.sql.rapids.tool.{AppBase, UnsupportedExpr} +import org.apache.spark.sql.rapids.tool.AppBase class GenericExecParser( val node: SparkPlanGraphNode, @@ -46,7 +47,8 @@ class GenericExecParser( (1.0, false) } - createExecInfo(speedupFactor, isSupported, duration, notSupportedExprs) + createExecInfo(speedupFactor, isSupported, duration, + notSupportedExprs = notSupportedExprs, expressions = expressions) } protected def parseExpressions(): Array[String] = { @@ -63,7 +65,7 @@ class GenericExecParser( node.desc.replaceFirst(s"^${node.name}\\s*", "") } - protected def getNotSupportedExprs(expressions: Array[String]): Seq[UnsupportedExpr] = { + protected def getNotSupportedExprs(expressions: Array[String]): Seq[UnsupportedExprOpRef] = { checker.getNotSupportedExprs(expressions) } @@ -83,7 +85,8 @@ class GenericExecParser( speedupFactor: Double, isSupported: Boolean, duration: Option[Long], - notSupportedExprs: Seq[UnsupportedExpr] + notSupportedExprs: Seq[UnsupportedExprOpRef], + expressions: Array[String] ): ExecInfo = { ExecInfo( node, @@ -95,7 +98,8 @@ class GenericExecParser( node.id, isSupported, None, - unsupportedExprs = notSupportedExprs + unsupportedExprs = notSupportedExprs, + expressions = expressions ) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala index ad2634c41..274ce2d73 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala @@ -21,16 +21,18 @@ import scala.collection.mutable.{ArrayBuffer, WeakHashMap} import scala.util.control.NonFatal import scala.util.matching.Regex +import com.nvidia.spark.rapids.tool.planparser.ops.{OpRef, UnsupportedExprOpRef} import com.nvidia.spark.rapids.tool.planparser.photon.{PhotonPlanParser, PhotonStageExecParser} import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.SparkPlanInfo import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphNode} -import org.apache.spark.sql.rapids.tool.{AppBase, BuildSide, ExecHelper, JoinType, RDDCheckHelper, ToolUtils, UnsupportedExpr} +import org.apache.spark.sql.rapids.tool.{AppBase, BuildSide, ExecHelper, JoinType, RDDCheckHelper, ToolUtils} import org.apache.spark.sql.rapids.tool.util.ToolsPlanGraph import org.apache.spark.sql.rapids.tool.util.plangraph.{PhotonSparkPlanGraphCluster, PhotonSparkPlanGraphNode} + object OpActions extends Enumeration { type OpAction = Value val NONE, IgnoreNoPerf, IgnorePerf, Triage = Value @@ -92,7 +94,7 @@ case class UnsupportedExecSummary( case class ExecInfo( sqlID: Long, - exec: String, + execRef: OpRef, expr: String, speedupFactor: Double, duration: Option[Long], @@ -103,10 +105,11 @@ case class ExecInfo( var stages: Set[Int], var shouldRemove: Boolean, var unsupportedExecReason: String, - unsupportedExprs: Seq[UnsupportedExpr], + unsupportedExprs: Seq[UnsupportedExprOpRef], dataSet: Boolean, udf: Boolean, - shouldIgnore: Boolean) { + shouldIgnore: Boolean, + expressions: Seq[OpRef]) { private def childrenToString = { val str = children.map { c => @@ -119,6 +122,8 @@ case class ExecInfo( } } + def exec: String = execRef.value + override def toString: String = { s"exec: $exec, expr: $expr, sqlID: $sqlID , speedupFactor: $speedupFactor, " + s"duration: $duration, nodeId: $nodeId, " + @@ -211,7 +216,7 @@ case class ExecInfo( unsupportedExprs.foreach { expr => val exprUnsupportedReason = determineUnsupportedReason(expr.unsupportedReason, exprKnownReason) - res += UnsupportedExecSummary(sqlID, execId, expr.exprName, OpTypes.Expr, + res += UnsupportedExecSummary(sqlID, execId, expr.getOpNameCSV, OpTypes.Expr, exprUnsupportedReason, getOpAction, isExpression = true) } } @@ -235,9 +240,10 @@ object ExecInfo { stages: Set[Int] = Set.empty, shouldRemove: Boolean = false, unsupportedExecReason: String = "", - unsupportedExprs: Seq[UnsupportedExpr] = Seq.empty, + unsupportedExprs: Seq[UnsupportedExprOpRef] = Seq.empty, dataSet: Boolean = false, - udf: Boolean = false): ExecInfo = { + udf: Boolean = false, + expressions: Seq[String]): ExecInfo = { // Set the ignoreFlag // 1- we ignore any exec with UDF // 2- we ignore any exec with dataset @@ -259,7 +265,7 @@ object ExecInfo { val supportedFlag = isSupported && !udf && !finalDataSet ExecInfo( sqlID, - exec, + OpRef.fromExec(exec), expr, speedupFactor, duration, @@ -273,7 +279,9 @@ object ExecInfo { unsupportedExprs, finalDataSet, udf, - shouldIgnore + shouldIgnore, + // convert array of string expressions to OpRefs + expressions = expressions.map(OpRef.fromExpr) ) } @@ -290,10 +298,11 @@ object ExecInfo { stages: Set[Int] = Set.empty, shouldRemove: Boolean = false, unsupportedExecReason:String = "", - unsupportedExprs: Seq[UnsupportedExpr] = Seq.empty, + unsupportedExprs: Seq[UnsupportedExprOpRef] = Seq.empty, dataSet: Boolean = false, udf: Boolean = false, - opType: OpTypes.OpType = OpTypes.Exec): ExecInfo = { + opType: OpTypes.OpType = OpTypes.Exec, + expressions: Seq[String]): ExecInfo = { // Some execs need to be trimmed such as "Scan" // Example: Scan parquet . -> Scan parquet. // scan nodes needs trimming @@ -307,7 +316,7 @@ object ExecInfo { // if the expression is RDD because of the node name, then we do not want to add the // unsupportedExpressions because it becomes bogus. val finalUnsupportedExpr = if (rddCheckRes.nodeDescRDD) { - Seq.empty[UnsupportedExpr] + Seq.empty[UnsupportedExprOpRef] } else { unsupportedExprs } @@ -326,7 +335,8 @@ object ExecInfo { unsupportedExecReason, finalUnsupportedExpr, ds, - containsUDF + containsUDF, + expressions = expressions ) } } @@ -528,7 +538,8 @@ object SQLPlanParser extends Logging { // supported but with shouldRemove flag set to True. // Setting the "shouldRemove" is handled at the end of the function. ExecInfo(node, sqlID, normalizedNodeName, expr = "", 1, duration = None, node.id, - isSupported = reuseExecs.contains(normalizedNodeName), None) + isSupported = reuseExecs.contains(normalizedNodeName), children = None, + expressions = Seq.empty) } } @@ -584,7 +595,8 @@ object SQLPlanParser extends Logging { logWarning(s"Unexpected error parsing plan node ${normalizedNodeName}. " + s" sqlID = ${sqlID}", e) ExecInfo(node, sqlID, normalizedNodeName, expr = "", 1, duration = None, node.id, - isSupported = false, None) + isSupported = false, children = None, + expressions = Seq.empty) } val stagesInNode = nodeIdToStagesFunc(node.id) execInfo.setStages(stagesInNode) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffleExchangeExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffleExchangeExecParser.scala index 2a1315718..f79ffeff0 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffleExchangeExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffleExchangeExecParser.scala @@ -45,6 +45,7 @@ case class ShuffleExchangeExecParser( (1.0, false) } // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", filterSpeedupFactor, duration, node.id, isSupported, None) + ExecInfo(node, sqlID, node.name, "", filterSpeedupFactor, duration, node.id, isSupported, + children = None, expressions = Seq.empty) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffledHashJoinExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffledHashJoinExecParser.scala index b81f33fd6..303705de5 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffledHashJoinExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffledHashJoinExecParser.scala @@ -46,6 +46,6 @@ case class ShuffledHashJoinExecParser( // TODO - add in parsing expressions - average speedup across? ExecInfo(node, sqlID, node.name, "", speedupFactor, - maxDuration, node.id, isSupported, None) + maxDuration, node.id, isSupported, children = None, expressions = expressions) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortMergeJoinExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortMergeJoinExecParser.scala index f98a0542b..d85123afe 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortMergeJoinExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortMergeJoinExecParser.scala @@ -43,7 +43,8 @@ case class SortMergeJoinExecParser( (1.0, false) } // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, opName, "", speedupFactor, duration, node.id, isSupported, None) + ExecInfo(node, sqlID, opName, "", speedupFactor, duration, node.id, isSupported, + children = None, expressions = expressions) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SubqueryBroadcastExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SubqueryBroadcastExecParser.scala index 64fc5ff5e..45fabc47e 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SubqueryBroadcastExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SubqueryBroadcastExecParser.scala @@ -39,7 +39,7 @@ case class SubqueryBroadcastExecParser( (1.0, false) } // TODO - check is broadcast associated can be replaced - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", filterSpeedupFactor, duration, node.id, isSupported, None) + ExecInfo(node, sqlID, node.name, "", filterSpeedupFactor, duration, node.id, isSupported, + children = None, expressions = Seq.empty) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SubqueryExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SubqueryExecParser.scala index c60eff7ac..e01f7a27a 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SubqueryExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SubqueryExecParser.scala @@ -39,7 +39,8 @@ case class SubqueryExecParser( // TODO: Should we also collect the "data size" metric? val duration = SQLPlanParser.getDriverTotalDuration(collectTimeId, app) // should remove is kept in 1 place. So no need to set it here. - ExecInfo(node, sqlID, node.name, "", 1.0, duration, node.id, isSupported = false, None) + ExecInfo(node, sqlID, node.name, "", 1.0, duration, node.id, isSupported = false, + children = None, expressions = Seq.empty) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala index cdef3b225..a6da873cf 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala @@ -59,8 +59,12 @@ abstract class WholeStageExecParserBase( // wholeStageCodeGen are marked as shouldRemove. val removeNode = isDupNode || childNodes.forall(_.shouldRemove) val execInfo = ExecInfo(node, sqlID, node.name, node.name, avSpeedupFactor, maxDuration, - node.id, anySupported, Some(childNodes), stagesInNode, - shouldRemove = removeNode, unsupportedExprs = unSupportedExprsArray) + node.id, anySupported, + Some(childNodes), stages = stagesInNode, + shouldRemove = removeNode, + unsupportedExprs = unSupportedExprsArray, + // expressions of wholeStageCodeGen should not be set. They belong to the children nodes. + expressions = Seq.empty) Seq(execInfo) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowGroupLimitParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowGroupLimitParser.scala index 01b5edc3d..89f76e6c9 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowGroupLimitParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowGroupLimitParser.scala @@ -16,10 +16,10 @@ package com.nvidia.spark.rapids.tool.planparser +import com.nvidia.spark.rapids.tool.planparser.ops.UnsupportedExprOpRef import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker import org.apache.spark.sql.execution.ui.SparkPlanGraphNode -import org.apache.spark.sql.rapids.tool.UnsupportedExpr case class WindowGroupLimitParser( node: SparkPlanGraphNode, @@ -36,10 +36,10 @@ case class WindowGroupLimitParser( } override def getUnsupportedExprReasonsForExec( - expressions: Array[String]): Seq[UnsupportedExpr] = { + expressions: Array[String]): Seq[UnsupportedExprOpRef] = { expressions.flatMap { expr => if (!supportedRankingExprs.contains(expr)) { - Some(UnsupportedExpr(expr, + Some(UnsupportedExprOpRef(expr, s"Ranking function $expr is not supported in $fullExecName")) } else { None @@ -69,8 +69,7 @@ case class WindowGroupLimitParser( } else { (1.0, false) } - // TODO - add in parsing expressions - average speedup across? ExecInfo(node, sqlID, node.name, "", speedupFactor, None, node.id, isSupported, None, - unsupportedExprs = notSupportedExprs) + unsupportedExprs = notSupportedExprs, expressions = expressions) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WriteFilesExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WriteFilesExecParser.scala index bcd2272e0..d226df19c 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WriteFilesExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WriteFilesExecParser.scala @@ -44,7 +44,7 @@ case class WriteFilesExecParser( "", speedupFactor, duration, - node.id, opType = OpTypes.WriteExec, true, None) + node.id, opType = OpTypes.WriteExec, true, children = None, expressions = Seq.empty) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OpRef.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OpRef.scala new file mode 100644 index 000000000..18c6613a9 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OpRef.scala @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.planparser.ops + +import java.util.concurrent.ConcurrentHashMap + +import com.nvidia.spark.rapids.tool.planparser.OpTypes + + +case class OpRef(override val value: String, + override val opType: OpTypes.OpType) extends OperatorRefBase(value, opType) + +object OpRef { + // Dummy OpNameRef to represent None accumulator names. This is an optimization to avoid + // storing an option[string] for all operator names which leads to "get-or-else" everywhere. + private val EMPTY_OP_NAME_REF: OpRef = new OpRef("", OpTypes.Exec) + // A global table to store reference to all operator names. The map is accessible by all + // threads (different applications) running in parallel. This avoids duplicate work across + // different threads. + val OP_NAMES: ConcurrentHashMap[String, OpRef] = { + val initMap = new ConcurrentHashMap[String, OpRef]() + initMap.put(EMPTY_OP_NAME_REF.value, EMPTY_OP_NAME_REF) + // Add the accum to the map because it is being used internally. + initMap + } + + def fromExpr(name: String): OpRef = { + OP_NAMES.computeIfAbsent(name, OpRef.apply(_, OpTypes.Expr)) + } + + def fromExec(name: String): OpRef = { + OP_NAMES.computeIfAbsent(name, OpRef.apply(_, OpTypes.Exec)) + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorRefBase.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorRefBase.scala new file mode 100644 index 000000000..c4c54a692 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorRefBase.scala @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.planparser.ops + +import com.nvidia.spark.rapids.tool.planparser.OpTypes + +import org.apache.spark.sql.rapids.tool.util.StringUtils + +class OperatorRefBase(val value: String, val opType: OpTypes.OpType) extends OperatorRefTrait { + val csvValue: String = StringUtils.reformatCSVString(value) + val csvOpType: String = StringUtils.reformatCSVString(opType.toString) + + override def getOpName: String = value + override def getOpNameCSV: String = csvValue + override def getOpType: String = opType.toString + override def getOpTypeCSV: String = csvOpType +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorRefTrait.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorRefTrait.scala new file mode 100644 index 000000000..40385ba7a --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorRefTrait.scala @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.planparser.ops + +trait OperatorRefTrait { + def getOpName: String + def getOpNameCSV: String + def getOpType: String + def getOpTypeCSV: String +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/UnsupportedExprOpRef.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/UnsupportedExprOpRef.scala new file mode 100644 index 000000000..ef03faa64 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/UnsupportedExprOpRef.scala @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.planparser.ops + +case class UnsupportedExprOpRef(opRef: OpRef, + unsupportedReason: String) extends OperatorRefBase(opRef.value, opRef.opType) + +object UnsupportedExprOpRef { + def apply(exprName: String, unsupportedReason: String): UnsupportedExprOpRef = { + val opRef = OpRef.fromExpr(exprName) + UnsupportedExprOpRef(opRef, unsupportedReason) + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala index f04ddcac7..44bbf9fa3 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala @@ -21,11 +21,11 @@ import scala.io.BufferedSource import scala.util.control.NonFatal import com.nvidia.spark.rapids.tool.{Platform, PlatformFactory} +import com.nvidia.spark.rapids.tool.planparser.ops.UnsupportedExprOpRef import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.internal.Logging -import org.apache.spark.sql.rapids.tool.UnsupportedExpr import org.apache.spark.sql.rapids.tool.util.UTF8Source object OpSuppLevel extends Enumeration { @@ -387,11 +387,11 @@ class PluginTypeChecker(platform: Platform = PlatformFactory.createInstance(), exprSupported.isSupported } - def getNotSupportedExprs(exprs: Seq[String]): Seq[UnsupportedExpr] = { + def getNotSupportedExprs(exprs: Seq[String]): Seq[UnsupportedExprOpRef] = { exprs.collect { case expr if !isExprSupported(expr) => val reason = unsupportedOpsReasons.getOrElse(expr, "") - UnsupportedExpr(expr, reason) + UnsupportedExprOpRef(expr, reason) } } diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala index fc276a064..021337495 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala @@ -401,8 +401,6 @@ object ExecHelper { } } -case class UnsupportedExpr(exprName: String, unsupportedReason: String) - object MlOps { val sparkml = "spark.ml." val xgBoost = "spark.XGBoost" diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index 887075c8c..9c291114d 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -582,7 +582,7 @@ class QualificationAppInfo( // Get all the unsupported Expressions from the plan val unSupportedExprs = origPlanInfos.map(_.execInfo.flatMap( - _.unsupportedExprs.map(_.exprName))).flatten.filter(_.nonEmpty).toSet.mkString(";") + _.unsupportedExprs.map(_.getOpName))).flatten.filter(_.nonEmpty).toSet.mkString(";") .trim.replaceAll("\n", "").replace(",", ":") // TODO - this is not correct as this is using the straight stage wall diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala index 6fc9357cf..b51250827 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala @@ -1224,7 +1224,8 @@ class SQLPlanParserSuite extends BasePlanParserSuite { val projects = allExecInfo.filter(_.exec.contains("Project")) assertSizeAndNotSupported(1, projects) val expectedExprss = Seq("parse_url_ref", "parse_url_userinfo") - projects(0).unsupportedExprs.map(_.exprName) should contain theSameElementsAs expectedExprss + projects(0) + .unsupportedExprs.map(_.getOpName) should contain theSameElementsAs expectedExprss } } } @@ -1802,7 +1803,7 @@ class SQLPlanParserSuite extends BasePlanParserSuite { val allExecInfo = getAllExecsFromPlan(parsedPlans.toSeq) val windowExecNotSupportedExprs = allExecInfo.filter( _.exec.contains(windowGroupLimitExecCmd)).flatMap(x => x.unsupportedExprs) - windowExecNotSupportedExprs.head.exprName shouldEqual "row_number" + windowExecNotSupportedExprs.head.getOpName shouldEqual "row_number" windowExecNotSupportedExprs.head.unsupportedReason shouldEqual "Ranking function row_number is not supported in WindowGroupLimitExec" val windowGroupLimitExecs = allExecInfo.filter(_.exec.contains(windowGroupLimitExecCmd)) diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala index 3e9988c41..6b7821d3f 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala @@ -241,6 +241,6 @@ class PluginTypeCheckerSuite extends FunSuite with Logging { val expressions = SQLPlanParser.parseAggregateExpressions(hashAggregateExpr) assert(expressions.contains("decimalsum")) val notSupportedExprs = checker.getNotSupportedExprs(expressions) - assert(notSupportedExprs.find(_.exprName == "decimalsum").isEmpty) + assert(notSupportedExprs.find(_.getOpName == "decimalsum").isEmpty) } } From 344bfa9268fccfafbac94990b0e08b1070d31182 Mon Sep 17 00:00:00 2001 From: "Ahmed Hussein (amahussein)" Date: Mon, 2 Dec 2024 15:46:03 -0600 Subject: [PATCH 2/8] running end-to-end Signed-off-by: Ahmed Hussein (amahussein) --- .../tool/planparser/SQLPlanParser.scala | 35 +++++--- .../planparser/WholeStageExecParser.scala | 7 +- .../tool/planparser/ops/OperatorCounter.scala | 81 +++++++++++++++++++ .../tool/qualification/QualOutputWriter.scala | 73 ++++++++++++++++- .../tool/qualification/Qualification.scala | 1 + .../qualification/QualificationAppInfo.scala | 9 +-- 6 files changed, 185 insertions(+), 21 deletions(-) create mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorCounter.scala diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala index 274ce2d73..17159842b 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala @@ -21,7 +21,7 @@ import scala.collection.mutable.{ArrayBuffer, WeakHashMap} import scala.util.control.NonFatal import scala.util.matching.Regex -import com.nvidia.spark.rapids.tool.planparser.ops.{OpRef, UnsupportedExprOpRef} +import com.nvidia.spark.rapids.tool.planparser.ops.{OperatorRefBase, OpRef, UnsupportedExprOpRef} import com.nvidia.spark.rapids.tool.planparser.photon.{PhotonPlanParser, PhotonStageExecParser} import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker @@ -75,11 +75,10 @@ object UnsupportedReasons extends Enumeration { case class UnsupportedExecSummary( sqlId: Long, execId: Long, - execValue: String, + execRef: OperatorRefBase, opType: OpTypes.OpType, reason: UnsupportedReasons.UnsupportedReason, - opAction: OpActions.OpAction, - isExpression: Boolean = false) { + opAction: OpActions.OpAction) { val finalOpType: String = if (opType.equals(OpTypes.UDF) || opType.equals(OpTypes.DataSet)) { s"${OpTypes.Exec.toString}" @@ -87,9 +86,11 @@ case class UnsupportedExecSummary( s"${opType.toString}" } - val unsupportedOperator: String = execValue + val unsupportedOperatorCSVFormat: String = execRef.getOpNameCSV val details: String = UnsupportedReasons.reportUnsupportedReason(reason) + + def isExpression: Boolean = execRef.isInstanceOf[UnsupportedExprOpRef] } case class ExecInfo( @@ -124,6 +125,10 @@ case class ExecInfo( def exec: String = execRef.value + def isClusterNode: Boolean = { + execRef.getOpName.contains("StageCodegen") || execRef.getOpName.contains("PhotonResultStage") + } + override def toString: String = { s"exec: $exec, expr: $expr, sqlID: $sqlID , speedupFactor: $speedupFactor, " + s"duration: $duration, nodeId: $nodeId, " + @@ -199,7 +204,7 @@ case class ExecInfo( getUnsupportedReason) // Initialize the result with the exec summary - val res = ArrayBuffer(UnsupportedExecSummary(sqlID, execId, exec, opType, + val res = ArrayBuffer(UnsupportedExecSummary(sqlID, execId, execRef, opType, execUnsupportedReason, getOpAction)) // TODO: Should we iterate on exec children? @@ -216,8 +221,8 @@ case class ExecInfo( unsupportedExprs.foreach { expr => val exprUnsupportedReason = determineUnsupportedReason(expr.unsupportedReason, exprKnownReason) - res += UnsupportedExecSummary(sqlID, execId, expr.getOpNameCSV, OpTypes.Expr, - exprUnsupportedReason, getOpAction, isExpression = true) + res += UnsupportedExecSummary(sqlID, execId, expr, OpTypes.Expr, + exprUnsupportedReason, getOpAction) } } res @@ -345,8 +350,18 @@ case class PlanInfo( appID: String, sqlID: Long, sqlDesc: String, - execInfo: Seq[ExecInfo] -) + execInfo: Seq[ExecInfo]) { + def getUnsupportedExpressions: Seq[OperatorRefBase] = { + execInfo.flatMap { e => + if (e.isClusterNode) { + // wholeStageCodeGen does not have expressions/unsupported-expressions + e.children.getOrElse(Seq.empty).flatMap(_.unsupportedExprs) + } else { + e.unsupportedExprs + } + } + } +} object SQLPlanParser extends Logging { diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala index a6da873cf..da4e100c9 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala @@ -50,8 +50,8 @@ abstract class WholeStageExecParserBase( } // if any of the execs in WholeStageCodegen supported mark this entire thing as supported val anySupported = childNodes.exists(_.isSupported == true) - val unSupportedExprsArray = - childNodes.filter(_.unsupportedExprs.nonEmpty).flatMap(x => x.unsupportedExprs).toArray +// val unSupportedExprsArray = +// childNodes.filter(_.unsupportedExprs.nonEmpty).flatMap(x => x.unsupportedExprs).toArray // average speedup across the execs in the WholeStageCodegen for now val supportedChildren = childNodes.filterNot(_.shouldRemove) val avSpeedupFactor = SQLPlanParser.averageSpeedup(supportedChildren.map(_.speedupFactor)) @@ -62,7 +62,8 @@ abstract class WholeStageExecParserBase( node.id, anySupported, Some(childNodes), stages = stagesInNode, shouldRemove = removeNode, - unsupportedExprs = unSupportedExprsArray, + // unsupportyed expressions should not be set for the cluster nodes. + unsupportedExprs = Seq.empty, // expressions of wholeStageCodeGen should not be set. They belong to the children nodes. expressions = Seq.empty) Seq(execInfo) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorCounter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorCounter.scala new file mode 100644 index 000000000..9e5a76308 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorCounter.scala @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.planparser.ops + +import scala.collection.mutable + +import com.nvidia.spark.rapids.tool.planparser.{ExecInfo, PlanInfo} + +case class OperatorCounter(planInfo: PlanInfo) { + case class OperatorData( + opRef: OperatorRefBase, + var count: Int = 0, + var stages: Set[Int] = Set()) + + case class OperatorCountSummary( + opData: OperatorData, + isSupported: Boolean) + + private val supportedMap: mutable.Map[OperatorRefBase, OperatorData] = mutable.Map() + private val unsupportedMap: mutable.Map[OperatorRefBase, OperatorData] = mutable.Map() + + def getOpsCountSummary(): Seq[OperatorCountSummary] = { + supportedMap.values.map(OperatorCountSummary(_, isSupported = true)).toSeq ++ + unsupportedMap.values.map(OperatorCountSummary(_, isSupported = false)).toSeq + } + + private def updateOpRefEntry(opRef: OperatorRefBase, stages: Set[Int], + targetMap: mutable.Map[OperatorRefBase, OperatorData]): Unit = { + val operatorData = targetMap.getOrElseUpdate(opRef, OperatorData(opRef)) + operatorData.count += 1 + operatorData.stages ++= stages + } + + private def processExecInfo(execInfo: ExecInfo): Unit = { + val opMap = execInfo.isSupported match { + case true => supportedMap + case false => unsupportedMap + } + updateOpRefEntry(execInfo.execRef, execInfo.stages, opMap) + // update the map for supported expressions. We should exclude the unsupported expressions. + execInfo.expressions.filterNot( + e => execInfo.unsupportedExprs.exists(exp => exp.opRef.equals(e))).foreach { expr => + updateOpRefEntry(expr, execInfo.stages, supportedMap) + } + // update the map for unsupported expressions + execInfo.unsupportedExprs.foreach { expr => + updateOpRefEntry(expr, execInfo.stages, unsupportedMap) + } + } + + private def countOperators(): Unit = { + planInfo.execInfo.foreach { exec => + exec.isClusterNode match { + // we do not want to count the cluster nodes in that aggregation + case true => + if (exec.children.nonEmpty) { + exec.children.get.foreach { child => + processExecInfo(child) + } + } + case false => processExecInfo(exec) + } + } + } + + countOperators() +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala index ed101d820..9dbe7c920 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala @@ -23,6 +23,7 @@ import scala.collection.mutable.{Buffer, LinkedHashMap, ListBuffer} import com.nvidia.spark.rapids.tool.ToolTextFileWriter import com.nvidia.spark.rapids.tool.planparser.{DatabricksParseHelper, ExecInfo, PlanInfo, UnsupportedExecSummary} +import com.nvidia.spark.rapids.tool.planparser.ops.OperatorCounter import com.nvidia.spark.rapids.tool.profiling.AppStatusResult import com.nvidia.spark.rapids.tool.profiling.ProfileUtils.replaceDelimiter import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.{CLUSTER_ID, CLUSTER_ID_STR_SIZE, JOB_ID, JOB_ID_STR_SIZE, RUN_NAME, RUN_NAME_STR_SIZE, TEXT_DELIMITER} @@ -172,6 +173,24 @@ class QualOutputWriter(outputDir: String, reportReadSchema: Boolean, } } + def writeAllOpsSummaryCSVReport( + sums: Seq[QualificationSummaryInfo]): Unit = { + val csvFileWriter = new ToolTextFileWriter(outputDir, + s"${QualOutputWriter.LOGFILE_NAME}_operatorsStats.csv", + "All Operators CSV Report", hadoopConf) + try { + val headersAndSizes = QualOutputWriter.getAllOperatorsHeaderStrings + csvFileWriter.write(QualOutputWriter.constructOutputRowFromMap(headersAndSizes, + QualOutputWriter.CSV_DELIMITER)) + sums.foreach { sum => + QualOutputWriter.constructAllOperatorsInfo(csvFileWriter, sum.planInfo, sum.appId, + QualOutputWriter.CSV_DELIMITER) + } + } finally { + csvFileWriter.close() + } + } + def writePerSqlCSVReport(sums: Seq[QualificationSummaryInfo], maxSQLDescLength: Int): Unit = { val csvFileWriter = new ToolTextFileWriter(outputDir, s"${QualOutputWriter.LOGFILE_NAME}_persql.csv", @@ -282,7 +301,7 @@ class QualOutputWriter(outputDir: String, reportReadSchema: Boolean, } else { ("", "") } - Seq(appIDCSVStr, sqlIDStr, StringUtils.reformatCSVString(info.exec), + Seq(appIDCSVStr, sqlIDStr, info.execRef.getOpNameCSV, StringUtils.reformatCSVString(info.expr), if (info.duration.isDefined) info.duration.get.toString else zeroDurationStr, info.nodeId.toString, @@ -445,6 +464,11 @@ object QualOutputWriter { val UNSUPPORTED_TASK_DURATION_STR = "Unsupported Task Duration" val SUPPORTED_SQL_TASK_DURATION_STR = "Supported SQL DF Task Duration" val LONGEST_SQL_DURATION_STR = "Longest SQL Duration" + val OPERATOR_TYPE = "Operator Type" + val OPERATOR_NAME = "Operator Name" + val COUNT = "Count" + val IS_SUPPORTED = "Supported" + val STAGES = "Stages" val EXEC_STR = "Exec Name" val EXPR_STR = "Expression Name" val EXEC_DURATION = "Exec Duration" @@ -838,6 +862,49 @@ object QualOutputWriter { mutable.LinkedHashMap(headersAndFields: _*) } + private def getAllOperatorsHeaderStrings: LinkedHashMap[String, Int] = { + val detailedHeaderAndFields = LinkedHashMap[String, Int]( + APP_ID_STR -> APP_ID_STR.size, + SQL_ID_STR -> SQL_ID_STR.size, + OPERATOR_TYPE -> OPERATOR_TYPE.size, + OPERATOR_NAME -> OPERATOR_NAME.size, + COUNT -> COUNT.size, + IS_SUPPORTED -> IS_SUPPORTED.size, + STAGES -> STAGES.size + ) + detailedHeaderAndFields + } + + private def constructAllOperatorsInfo( + csvWriter: ToolTextFileWriter, + planInfos: Seq[PlanInfo], + appId: String, + delimiter: String): Unit = { + // This method iterates on PlanInfo which are sorted by SqlID. It constructs the operators per + // SQLPlan and sumps it to the CSV as a bufferedString instead of writing one row at a time. + val appIDCSVStr = StringUtils.reformatCSVString(appId) + val supportedCSVStr = "true" + val unsupportedCSVStr = "false" + planInfos.foreach { planInfo => + val sqlIDCSVStr = planInfo.sqlID.toString + val allOpsCount = OperatorCounter(planInfo) + .getOpsCountSummary().sortBy(oInfo => (-oInfo.opData.count, oInfo.opData.opRef.getOpName)) + if (allOpsCount.nonEmpty) { + val planBuffer = allOpsCount.map { opInfo => + val supportFlag = if (opInfo.isSupported) supportedCSVStr else unsupportedCSVStr + val stageStr = StringUtils.reformatCSVString(opInfo.opData.stages.mkString(":")) + s"$appIDCSVStr$delimiter" + + s"$sqlIDCSVStr$delimiter" + + s"${opInfo.opData.opRef.getOpTypeCSV}$delimiter" + + s"${opInfo.opData.opRef.getOpNameCSV}$delimiter${opInfo.opData.count}$delimiter" + + s"$supportFlag$delimiter" + + s"$stageStr" + } + csvWriter.write(s"${planBuffer.mkString("\n")}\n") + } + } + } + def constructClusterInfo( sumInfo: QualificationSummaryInfo, headersAndSizes: LinkedHashMap[String, Int], @@ -940,7 +1007,7 @@ object QualOutputWriter { // need to remove the WholeStageCodegen wrappers since they aren't actual // execs that we want to get timings of execs.flatMap { e => - if (e.exec.contains("WholeStageCodegen")) { + if (e.isClusterNode) { e.children.getOrElse(Seq.empty) } else { e.children.getOrElse(Seq.empty) :+ e @@ -1048,7 +1115,7 @@ object QualOutputWriter { stageId.toString -> headersAndSizes(STAGE_ID_STR), reformatCSVFunc(unSupExecInfo.execId.toString) -> headersAndSizes(EXEC_ID), reformatCSVFunc(unSupExecInfo.finalOpType) -> headersAndSizes(UNSUPPORTED_TYPE), - reformatCSVFunc(unSupExecInfo.unsupportedOperator) -> headersAndSizes(UNSUPPORTED_OPERATOR), + unSupExecInfo.unsupportedOperatorCSVFormat -> headersAndSizes(UNSUPPORTED_OPERATOR), reformatCSVFunc(unSupExecInfo.details) -> headersAndSizes(DETAILS), stageAppDuration.toString -> headersAndSizes(STAGE_WALLCLOCK_DUR_STR), appDuration.toString -> headersAndSizes(APP_DUR_STR), diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala index 49d70b80e..fa59e8b3e 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala @@ -255,6 +255,7 @@ class Qualification(outputPath: String, numRows: Int, hadoopConf: Configuration, qWriter.writeExecReport(allAppsSum) qWriter.writeStageReport(allAppsSum, order) qWriter.writeUnsupportedOpsSummaryCSVReport(allAppsSum) + qWriter.writeAllOpsSummaryCSVReport(allAppsSum) val appStatusResult = generateStatusResults(appStatusReporter.asScala.values.toSeq) logOutputPath() qWriter.writeStatusReport(appStatusResult, order) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index 9c291114d..1ef8b7315 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -281,7 +281,7 @@ class QualificationAppInfo( // need to remove the WholeStageCodegen wrappers since they aren't actual // execs that we want to get timings of execs.flatMap { e => - if (e.exec.contains("WholeStageCodegen")) { + if (e.isClusterNode) { e.children.getOrElse(Seq.empty) } else { e.children.getOrElse(Seq.empty) :+ e @@ -573,7 +573,7 @@ class QualificationAppInfo( val unSupportedExecs = planInfos.flatMap { p => // WholeStageCodeGen is excluded from the result. val topLevelExecs = p.execInfo.filterNot(_.isSupported).filterNot( - x => x.exec.startsWith("WholeStage")) + x => x.isClusterNode) val childrenExecs = p.execInfo.flatMap { e => e.children.map(x => x.filterNot(_.isSupported)) }.flatten @@ -581,9 +581,8 @@ class QualificationAppInfo( }.map(_.exec).toSet.mkString(";").trim.replaceAll("\n", "").replace(",", ":") // Get all the unsupported Expressions from the plan - val unSupportedExprs = origPlanInfos.map(_.execInfo.flatMap( - _.unsupportedExprs.map(_.getOpName))).flatten.filter(_.nonEmpty).toSet.mkString(";") - .trim.replaceAll("\n", "").replace(",", ":") + val unSupportedExprs = origPlanInfos.flatMap(p => p.getUnsupportedExpressions) + .map(s => s.getOpName).toSet.mkString(";").trim.replaceAll("\n", "").replace(",", ":") // TODO - this is not correct as this is using the straight stage wall // clock time and hasn't been adjusted to the app duration wall clock From d86283874a6cc42e94d0a05f0f0b077938255bff Mon Sep 17 00:00:00 2001 From: "Ahmed Hussein (amahussein)" Date: Tue, 3 Dec 2024 11:21:41 -0600 Subject: [PATCH 3/8] cleaned-up exec parsers Signed-off-by: Ahmed Hussein (amahussein) --- .../tool/planparser/BatchScanExecParser.scala | 26 ++++++++++++--- .../BroadcastExchangeExecParser.scala | 1 - .../BroadcastHashJoinExecParser.scala | 3 +- .../BroadcastNestedLoopJoinExecParser.scala | 3 +- .../DataWritingCommandExecParser.scala | 1 - .../tool/planparser/DeltaLakeHelper.scala | 2 +- .../planparser/FileSourceScanExecParser.scala | 33 ++++++++++++++++--- .../rapids/tool/planparser/ReadParser.scala | 8 +++++ .../ShuffleExchangeExecParser.scala | 1 - .../ShuffledHashJoinExecParser.scala | 2 -- .../planparser/SortMergeJoinExecParser.scala | 1 - .../SubqueryBroadcastExecParser.scala | 1 - .../planparser/WholeStageExecParser.scala | 27 +++++++++++---- .../planparser/WindowGroupLimitParser.scala | 2 +- .../planparser/WriteFilesExecParser.scala | 5 ++- .../db_sim_test_expectation.csv | 2 +- .../directory_test_expectation.csv | 2 +- .../jdbc_expectation.csv | 2 +- .../qual_test_missing_sql_end_expectation.csv | 2 +- .../qual_test_simple_expectation.csv | 2 +- .../truncated_1_end_expectation.csv | 2 +- .../tool/planparser/SqlPlanParserSuite.scala | 14 ++++---- .../qualification/QualificationSuite.scala | 4 +-- 23 files changed, 102 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BatchScanExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BatchScanExecParser.scala index ac126396b..7c591b372 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BatchScanExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BatchScanExecParser.scala @@ -27,8 +27,8 @@ case class BatchScanExecParser( checker: PluginTypeChecker, sqlID: Long, app: AppBase) extends ExecParser with Logging { - - val fullExecName = "BatchScanExec" + val nodeName = "BatchScan" + val fullExecName = nodeName + "Exec" override def parse: ExecInfo = { val accumId = node.metrics.find(_.name == "scan time").map(_.accumulatorId) @@ -39,9 +39,25 @@ case class BatchScanExecParser( val speedupFactor = checker.getSpeedupFactor(fullExecName) val overallSpeedup = Math.max((speedupFactor * score), 1.0) - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, s"${node.name} ${readInfo.format}", s"Format: ${readInfo.format}", - overallSpeedup, maxDuration, node.id, score > 0, + // 1- Set the exec name to be the batchScan + format + // 2- If the format cannot be found, then put the entire node description to make it easy to + // troubleshoot by reading the output files. + val readFormat = readInfo.getReadFormatLC + val execExpression = if (readInfo.hasUnknownFormat) { + node.desc + } else { + s"Format: $readFormat" + } + + ExecInfo.createExecNoNode( + sqlID = sqlID, + exec = s"$nodeName $readFormat", + expr = execExpression, + speedupFactor = overallSpeedup, + duration = maxDuration, + nodeId = node.id, + opType = OpTypes.ReadExec, + isSupported = score > 0.0, children = None, expressions = Seq.empty) } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastExchangeExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastExchangeExecParser.scala index ea06cf7ab..e6bbc99d8 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastExchangeExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastExchangeExecParser.scala @@ -44,7 +44,6 @@ case class BroadcastExchangeExecParser( } else { (1.0, false) } - // TODO - add in parsing expressions - average speedup across? ExecInfo(node, sqlID, node.name, "", filterSpeedupFactor, duration, node.id, isSupported, children = None, expressions = Seq.empty) } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastHashJoinExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastHashJoinExecParser.scala index 4c3a6fbd1..8acd2db7b 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastHashJoinExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastHashJoinExecParser.scala @@ -30,7 +30,7 @@ case class BroadcastHashJoinExecParser( override def parse: ExecInfo = { // BroadcastHashJoin doesn't have duration val duration = None - val exprString = node.desc.replaceFirst("BroadcastHashJoin ", "") + val exprString = node.desc.replaceFirst("^BroadcastHashJoin\\s*", "") val (expressions, supportedJoinType) = SQLPlanParser.parseEquijoinsExpressions(exprString) val notSupportedExprs = expressions.filterNot(expr => checker.isExprSupported(expr)) val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && @@ -39,7 +39,6 @@ case class BroadcastHashJoinExecParser( } else { (1.0, false) } - // TODO - add in parsing expressions - average speedup across? ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, children = None, expressions = expressions) } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastNestedLoopJoinExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastNestedLoopJoinExecParser.scala index 488f1118e..f6be5decd 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastNestedLoopJoinExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastNestedLoopJoinExecParser.scala @@ -39,7 +39,7 @@ abstract class BroadcastNestedLoopJoinExecParserBase( override def parse: ExecInfo = { // BroadcastNestedLoopJoin doesn't have duration - val exprString = node.desc.replaceFirst("BroadcastNestedLoopJoin ", "") + val exprString = node.desc.replaceFirst("^BroadcastNestedLoopJoin\\s*", "") val (buildSide, joinType) = extractBuildAndJoinTypes(exprString) val (expressions, supportedJoinType) = SQLPlanParser.parseNestedLoopJoinExpressions(exprString, buildSide, joinType) @@ -51,7 +51,6 @@ abstract class BroadcastNestedLoopJoinExecParserBase( } else { (1.0, false) } - // TODO - add in parsing expressions - average speedup across? ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, children = None, expressions = expressions) } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala index 5b799c2d8..86e553ac1 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala @@ -37,7 +37,6 @@ case class DataWritingCommandExecParser( val duration = None val speedupFactor = checker.getSpeedupFactor(wStub.mappedExec) val finalSpeedup = if (writeSupported) speedupFactor else 1 - // TODO - add in parsing expressions - average speedup across? // We do not want to parse the node description to avoid mistakenly marking the node as RDD/UDF ExecInfo.createExecNoNode(sqlID, s"${node.name.trim} ${wStub.dataFormat.toLowerCase.trim}", s"Format: ${wStub.dataFormat.toLowerCase.trim}", diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DeltaLakeHelper.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DeltaLakeHelper.scala index 5d25c1705..884ad8907 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DeltaLakeHelper.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DeltaLakeHelper.scala @@ -45,7 +45,7 @@ class DLWriteWithFormatAndSchemaParser(node: SparkPlanGraphNode, val finalSpeedupFactor = if (writeSupported) speedupFactor else 1.0 // execs like SaveIntoDataSourceCommand has prefix "Execute". So, we need to get rid of it. - val nodeName = node.name.replace("Execute ", "") + val nodeName = node.name.replaceFirst("Execute\\s*", "") ExecInfo.createExecNoNode(sqlID, nodeName, s"Format: $dataFormat", finalSpeedupFactor, None, node.id, OpTypes.WriteExec, isSupported = writeSupported && isExecSupported, children = None, expressions = Seq.empty) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FileSourceScanExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FileSourceScanExecParser.scala index bcda8a1a9..520d05832 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FileSourceScanExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FileSourceScanExecParser.scala @@ -30,6 +30,9 @@ case class FileSourceScanExecParser( // The node name for Scans is Scan so here we hardcode val fullExecName = "FileSourceScanExec" + // Matches the first alphaneumeric characters of a string after trimming leading/trailing + // white spaces. + val nodeNameRegeX = """^\s*(\w+).*""".r override def parse: ExecInfo = { // Remove trailing spaces from node name @@ -37,7 +40,7 @@ case class FileSourceScanExecParser( val nodeName = node.name.trim val rddCheckRes = RDDCheckHelper.isDatasetOrRDDPlan(nodeName, node.desc) if (rddCheckRes.nodeNameRDD) { - // This is a scanRDD. we do not need to parse it as a normal node. + // This is a scanRDD. We do not need to parse it as a normal node. // cleanup the node name if possible: val newNodeName = if (nodeName.contains("ExistingRDD")) { val nodeNameLength = nodeName.indexOf("ExistingRDD") + "ExistingRDD".length @@ -57,14 +60,36 @@ case class FileSourceScanExecParser( // Use the default parser (fullExecName, ReadParser.parseReadNode(node)) } + // 1- Set the exec name to nodeLabel + format + // 2- If the format is not found, then put the entire node description to make it easy to + // troubleshoot by reading the output files. + val nodeLabel = nodeNameRegeX.findFirstMatchIn(nodeName) match { + case Some(m) => m.group(1) + // in case not found, use the full exec name + case None => execName + } + val readFormat = readInfo.getReadFormatLC + val exexExpr = if (readInfo.hasUnknownFormat) { + node.desc + } else { + s"Format: ${readFormat}" + } val speedupFactor = checker.getSpeedupFactor(execName) // don't use the isExecSupported because we have finer grain. val score = ReadParser.calculateReadScoreRatio(readInfo, checker) val overallSpeedup = Math.max(speedupFactor * score, 1.0) - // TODO - add in parsing expressions - average speedup across? - ExecInfo.createExecNoNode(sqlID, nodeName, "", overallSpeedup, maxDuration, - node.id, OpTypes.ReadExec, score > 0, children = None, expressions = Seq.empty) + ExecInfo.createExecNoNode( + sqlID = sqlID, + exec = s"$nodeLabel $readFormat", + expr = exexExpr, + speedupFactor = overallSpeedup, + duration = maxDuration, + nodeId = node.id, + opType = OpTypes.ReadExec, + isSupported = score > 0, + children = None, + expressions = Seq.empty) } } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ReadParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ReadParser.scala index 131d6b2d3..a87ed67d1 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ReadParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ReadParser.scala @@ -29,6 +29,14 @@ case class ReadMetaData(schema: String, location: String, format: String, def pushedFilters: String = tags(ReadParser.METAFIELD_TAG_PUSHED_FILTERS) def dataFilters: String = tags(ReadParser.METAFIELD_TAG_DATA_FILTERS) def partitionFilters: String = tags(ReadParser.METAFIELD_TAG_PARTITION_FILTERS) + + def hasUnknownFormat: Boolean = format.equals(ReadParser.UNKNOWN_METAFIELD) + + /** + * Returns the read format in lowercase. This is used to be consistent. + * @return the lower case of the read format + */ + def getReadFormatLC: String = format.toLowerCase } object ReadParser extends Logging { diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffleExchangeExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffleExchangeExecParser.scala index f79ffeff0..2026a2a17 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffleExchangeExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffleExchangeExecParser.scala @@ -44,7 +44,6 @@ case class ShuffleExchangeExecParser( } else { (1.0, false) } - // TODO - add in parsing expressions - average speedup across? ExecInfo(node, sqlID, node.name, "", filterSpeedupFactor, duration, node.id, isSupported, children = None, expressions = Seq.empty) } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffledHashJoinExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffledHashJoinExecParser.scala index 303705de5..4c90928a4 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffledHashJoinExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffledHashJoinExecParser.scala @@ -43,8 +43,6 @@ case class ShuffledHashJoinExecParser( } else { (1.0, false) } - - // TODO - add in parsing expressions - average speedup across? ExecInfo(node, sqlID, node.name, "", speedupFactor, maxDuration, node.id, isSupported, children = None, expressions = expressions) } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortMergeJoinExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortMergeJoinExecParser.scala index d85123afe..4828d8ac9 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortMergeJoinExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortMergeJoinExecParser.scala @@ -42,7 +42,6 @@ case class SortMergeJoinExecParser( } else { (1.0, false) } - // TODO - add in parsing expressions - average speedup across? ExecInfo(node, sqlID, opName, "", speedupFactor, duration, node.id, isSupported, children = None, expressions = expressions) } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SubqueryBroadcastExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SubqueryBroadcastExecParser.scala index 45fabc47e..048a317b0 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SubqueryBroadcastExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SubqueryBroadcastExecParser.scala @@ -38,7 +38,6 @@ case class SubqueryBroadcastExecParser( } else { (1.0, false) } - // TODO - check is broadcast associated can be replaced ExecInfo(node, sqlID, node.name, "", filterSpeedupFactor, duration, node.id, isSupported, children = None, expressions = Seq.empty) } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala index da4e100c9..d0ff5c4ff 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala @@ -31,6 +31,9 @@ abstract class WholeStageExecParserBase( nodeIdToStagesFunc: Long => Set[Int]) extends Logging { val fullExecName = "WholeStageCodegenExec" + // Matches the first alphaneumeric characters of a string after trimming leading/trailing + // white spaces. + val nodeNameRegeX = """^\s*(\w+).*""".r def parse: Seq[ExecInfo] = { // TODO - does metrics for time have previous ops? per op thing, only some do @@ -50,19 +53,31 @@ abstract class WholeStageExecParserBase( } // if any of the execs in WholeStageCodegen supported mark this entire thing as supported val anySupported = childNodes.exists(_.isSupported == true) -// val unSupportedExprsArray = -// childNodes.filter(_.unsupportedExprs.nonEmpty).flatMap(x => x.unsupportedExprs).toArray // average speedup across the execs in the WholeStageCodegen for now val supportedChildren = childNodes.filterNot(_.shouldRemove) val avSpeedupFactor = SQLPlanParser.averageSpeedup(supportedChildren.map(_.speedupFactor)) // The node should be marked as shouldRemove when all the children of the // wholeStageCodeGen are marked as shouldRemove. val removeNode = isDupNode || childNodes.forall(_.shouldRemove) - val execInfo = ExecInfo(node, sqlID, node.name, node.name, avSpeedupFactor, maxDuration, - node.id, anySupported, - Some(childNodes), stages = stagesInNode, + // Remove any suffix in order to get the node label without any trailing number. + val nodeLabel = nodeNameRegeX.findFirstMatchIn(node.name) match { + case Some(m) => m.group(1) + // in case not found, use the full exec name + case None => fullExecName + } + val execInfo = ExecInfo( + node = node, + sqlID = sqlID, + exec = nodeLabel, + expr = node.name, + speedupFactor = avSpeedupFactor, + duration = maxDuration, + nodeId = node.id, + isSupported = anySupported, + children = Some(childNodes), + stages = stagesInNode, shouldRemove = removeNode, - // unsupportyed expressions should not be set for the cluster nodes. + // unsupported expressions should not be set for the cluster nodes. unsupportedExprs = Seq.empty, // expressions of wholeStageCodeGen should not be set. They belong to the children nodes. expressions = Seq.empty) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowGroupLimitParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowGroupLimitParser.scala index 89f76e6c9..853c1b767 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowGroupLimitParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowGroupLimitParser.scala @@ -57,7 +57,7 @@ case class WindowGroupLimitParser( * 3. Ranking function is supported by plugin's implementation of WindowGroupLimitExec. */ override def parse: ExecInfo = { - val exprString = node.desc.replaceFirst("WindowGroupLimit ", "") + val exprString = node.desc.replaceFirst("WindowGroupLimit\\s*", "") val expressions = SQLPlanParser.parseWindowGroupLimitExpressions(exprString) val notSupportedExprs = checker.getNotSupportedExprs(expressions) ++ getUnsupportedExprReasonsForExec(expressions) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WriteFilesExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WriteFilesExecParser.scala index d226df19c..0fa6cb254 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WriteFilesExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WriteFilesExecParser.scala @@ -44,7 +44,10 @@ case class WriteFilesExecParser( "", speedupFactor, duration, - node.id, opType = OpTypes.WriteExec, true, children = None, expressions = Seq.empty) + node.id, opType = OpTypes.WriteExec, + isSupported = true, + children = None, + expressions = Seq.empty) } } diff --git a/core/src/test/resources/QualificationExpectations/db_sim_test_expectation.csv b/core/src/test/resources/QualificationExpectations/db_sim_test_expectation.csv index 5c618e929..35d5d6f1b 100644 --- a/core/src/test/resources/QualificationExpectations/db_sim_test_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/db_sim_test_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,SQL Stage Durations Sum,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly),Total Core Seconds -"Spark shell","local-1623876083964",119353,1417661,133857,92667,91.14,"","","","","","",119903,143821,14504,316964,1100697,false,"Scan;SerializeFromObject","",30,1599 +"Spark shell","local-1623876083964",119353,1417661,133857,92667,91.14,"","","","","","",119903,143821,14504,316964,1100697,false,"Scan unknown;SerializeFromObject","",30,1599 diff --git a/core/src/test/resources/QualificationExpectations/directory_test_expectation.csv b/core/src/test/resources/QualificationExpectations/directory_test_expectation.csv index 5c618e929..35d5d6f1b 100644 --- a/core/src/test/resources/QualificationExpectations/directory_test_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/directory_test_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,SQL Stage Durations Sum,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly),Total Core Seconds -"Spark shell","local-1623876083964",119353,1417661,133857,92667,91.14,"","","","","","",119903,143821,14504,316964,1100697,false,"Scan;SerializeFromObject","",30,1599 +"Spark shell","local-1623876083964",119353,1417661,133857,92667,91.14,"","","","","","",119903,143821,14504,316964,1100697,false,"Scan unknown;SerializeFromObject","",30,1599 diff --git a/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv b/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv index df554c136..50e8986ba 100644 --- a/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,SQL Stage Durations Sum,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly),Total Core Seconds -"Spark shell","app-20211019113801-0001",2942,19894,571967,2814,28.41,"","JDBC[*]","","","","",1812,2883,569025,859,19035,false,"CollectLimit;Scan JDBCRelation(TBLS) [numPartitions=1];Execute CreateViewCommand","",30,9110 +"Spark shell","app-20211019113801-0001",2942,19894,571967,2814,28.41,"","JDBC[*]","","","","",1812,2883,569025,859,19035,false,"CollectLimit;Scan jdbc;Execute CreateViewCommand","",30,9110 diff --git a/core/src/test/resources/QualificationExpectations/qual_test_missing_sql_end_expectation.csv b/core/src/test/resources/QualificationExpectations/qual_test_missing_sql_end_expectation.csv index 2561e6ca5..428f89654 100644 --- a/core/src/test/resources/QualificationExpectations/qual_test_missing_sql_end_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/qual_test_missing_sql_end_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,SQL Stage Durations Sum,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly),Total Core Seconds -"Rapids Spark Profiling Tool Unit Tests","local-1622561780883",0,40448,7673,0,55.94,"","","","","","",0,5000,7673,8096,32352,false,"Scan;SerializeFromObject","",30,82 +"Rapids Spark Profiling Tool Unit Tests","local-1622561780883",0,40448,7673,0,55.94,"","","","","","",0,5000,7673,8096,32352,false,"Scan unknown;SerializeFromObject","",30,82 diff --git a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv index a3a847ad0..62421d9bf 100644 --- a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv @@ -1,5 +1,5 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,SQL Stage Durations Sum,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly),Total Core Seconds -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",11600,132257,16319,9868,37.7,"","","JSON","","","",7143,13770,4719,19744,112513,false,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",1,186 +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",11600,132257,16319,9868,37.7,"","","JSON","","","",7143,13770,4719,19744,112513,false,"SerializeFromObject;Scan unknown;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements","",1,186 "Spark shell","local-1651187225439",224,180,355637,74,87.88,"","JSON[string:bigint:int]","","","","",498,228,355101,120,60,false,"SerializeFromObject;CollectLimit;DeserializeToObject;Scan json;Filter;MapElements","",1,2834 "Spark shell","local-1651188809790",347,283,166215,14,81.18,"","JSON[string:bigint:int]","","","","UDF",715,318,165572,271,12,false,"CollectLimit;Scan json;Project","UDF",1,1318 "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",1156,4666,6240,1,46.27,"","JSON[string:bigint:int]","JSON","","","UDF",1209,1130,5809,4661,5,false,"Execute InsertIntoHadoopFsRelationCommand json;LocalTableScan;Project;Scan json;Execute CreateViewCommand","UDF",1,64 diff --git a/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv b/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv index f7737d508..79cb8dede 100644 --- a/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,SQL Stage Durations Sum,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly),Total Core Seconds -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",395,14353,4872,164,62.67,"","","JSON","","","",1306,794,4477,8376,5977,true,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",30,49 +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",395,14353,4872,164,62.67,"","","JSON","","","",1306,794,4477,8376,5977,true,"SerializeFromObject;Scan unknown;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements","",30,49 diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala index b51250827..e31064851 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala @@ -276,12 +276,12 @@ class SQLPlanParserSuite extends BasePlanParserSuite { val parquet = allExecInfo.filter(_.exec.contains("Scan parquet")) val text = allExecInfo.filter(_.exec.contains("Scan text")) val csv = allExecInfo.filter(_.exec.contains("Scan csv")) - assertSizeAndNotSupported(2, json.toSeq) - assertSizeAndNotSupported(1, text.toSeq) + assertSizeAndNotSupported(2, json) + assertSizeAndNotSupported(1, text) for (t <- Seq(parquet, csv)) { - assertSizeAndSupported(1, t.toSeq) + assertSizeAndSupported(1, t) } - assertSizeAndSupported(2, orc.toSeq) + assertSizeAndSupported(2, orc) } test("BatchScan") { @@ -299,10 +299,10 @@ class SQLPlanParserSuite extends BasePlanParserSuite { val orc = allExecInfo.filter(_.exec.contains("BatchScan orc")) val parquet = allExecInfo.filter(_.exec.contains("BatchScan parquet")) val csv = allExecInfo.filter(_.exec.contains("BatchScan csv")) - assertSizeAndNotSupported(3, json.toSeq) - assertSizeAndSupported(1, csv.toSeq) + assertSizeAndNotSupported(3, json) + assertSizeAndSupported(1, csv) for (t <- Seq(orc, parquet)) { - assertSizeAndSupported(2, t.toSeq) + assertSizeAndSupported(2, t) } } diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala index 57b6f1cb3..03943d463 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala @@ -1066,7 +1066,7 @@ class QualificationSuite extends BaseTestSuite { val stdOutunsupportedExecs = stdOutValues(stdOutValues.length - 3) // index of unsupportedExprs val stdOutunsupportedExprs = stdOutValues(stdOutValues.length - 2) - val expectedstdOutExecs = "Scan;Filter;SerializeF..." + val expectedstdOutExecs = "Scan unknown;Filter;Se..." assert(stdOutunsupportedExecs == expectedstdOutExecs) // Exec value is Scan;Filter;SerializeFromObject and UNSUPPORTED_EXECS_MAX_SIZE is 25 val expectedStdOutExecsMaxLength = 25 @@ -1096,7 +1096,7 @@ class QualificationSuite extends BaseTestSuite { val rows = outputActual.collect() assert(rows.size == 1) - val expectedExecs = "Scan;Filter;SerializeFromObject" // Unsupported Execs + val expectedExecs = "Scan unknown;Filter;SerializeFromObject" // Unsupported Execs val expectedExprs = "hex" //Unsupported Exprs val unsupportedExecs = outputActual.select(QualOutputWriter.UNSUPPORTED_EXECS).first.getString(0) From 106305837b97b3a44b09c4bde87692c062931669 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Tue, 3 Dec 2024 13:28:10 -0800 Subject: [PATCH 4/8] update documentation and fix Signed-off-by: Niranjan Artal --- .../planparser/WholeStageExecParser.scala | 2 +- .../rapids/tool/planparser/ops/OpRef.scala | 16 ++++++++++++ .../tool/planparser/ops/OperatorCounter.scala | 25 +++++++++++++++++++ .../tool/planparser/ops/OperatorRefBase.scala | 10 ++++++++ .../planparser/ops/UnsupportedExprOpRef.scala | 16 ++++++++++++ 5 files changed, 68 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala index d0ff5c4ff..5b8730938 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala @@ -31,7 +31,7 @@ abstract class WholeStageExecParserBase( nodeIdToStagesFunc: Long => Set[Int]) extends Logging { val fullExecName = "WholeStageCodegenExec" - // Matches the first alphaneumeric characters of a string after trimming leading/trailing + // Matches the first alphanumeric characters of a string after trimming leading/trailing // white spaces. val nodeNameRegeX = """^\s*(\w+).*""".r diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OpRef.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OpRef.scala index 18c6613a9..845efa696 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OpRef.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OpRef.scala @@ -21,6 +21,12 @@ import java.util.concurrent.ConcurrentHashMap import com.nvidia.spark.rapids.tool.planparser.OpTypes +/** + * A reference to an operator(either Exec operator or expression operator) in a Spark plan. + * + * @param value The name of the operator. + * @param opType The type of the operator (e.g., Exec, Expr). + */ case class OpRef(override val value: String, override val opType: OpTypes.OpType) extends OperatorRefBase(value, opType) @@ -38,10 +44,20 @@ object OpRef { initMap } + /** + * Retrieves an `OpRef` for an expression operator. + * If the operator name already exists in the cache, it returns the existing `OpRef`. + * Otherwise, it creates a new `OpRef` with the given name and `OpTypes.Expr`. + */ def fromExpr(name: String): OpRef = { OP_NAMES.computeIfAbsent(name, OpRef.apply(_, OpTypes.Expr)) } + /** + * Retrieves an `OpRef` for an exec operator. + * If the operator name already exists in the cache, it returns the existing `OpRef`. + * Otherwise, it creates a new `OpRef` with the given name and `OpTypes.Exec`. + */ def fromExec(name: String): OpRef = { OP_NAMES.computeIfAbsent(name, OpRef.apply(_, OpTypes.Exec)) } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorCounter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorCounter.scala index 9e5a76308..fab9422aa 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorCounter.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorCounter.scala @@ -20,12 +20,29 @@ import scala.collection.mutable import com.nvidia.spark.rapids.tool.planparser.{ExecInfo, PlanInfo} +/** + * `OperatorCounter` is responsible for counting the occurrences of execs and expressions + * in a given execution plan (`PlanInfo`). It maintains counts separately for supported and + * unsupported execs and expressions. + * + * @param planInfo The execution plan information to analyze. + */ case class OperatorCounter(planInfo: PlanInfo) { + + /** + * Represents data for an exec or expression, including its reference, + * occurrence count, and stages where it appears. + * + * @param opRef The operator reference. + * @param count The number of times the operator appears. + * @param stages The set of stages where the operator appears. + */ case class OperatorData( opRef: OperatorRefBase, var count: Int = 0, var stages: Set[Int] = Set()) + // Summarizes the count information for an exec or expression, including whether it is supported. case class OperatorCountSummary( opData: OperatorData, isSupported: Boolean) @@ -33,11 +50,16 @@ case class OperatorCounter(planInfo: PlanInfo) { private val supportedMap: mutable.Map[OperatorRefBase, OperatorData] = mutable.Map() private val unsupportedMap: mutable.Map[OperatorRefBase, OperatorData] = mutable.Map() + // Returns a sequence of `OperatorCountSummary`, combining both supported and + // unsupported operators. def getOpsCountSummary(): Seq[OperatorCountSummary] = { supportedMap.values.map(OperatorCountSummary(_, isSupported = true)).toSeq ++ unsupportedMap.values.map(OperatorCountSummary(_, isSupported = false)).toSeq } + + // Updates the operator data in the given map (supported or unsupported). + // Increments the count and updates the stages where the operator appears. private def updateOpRefEntry(opRef: OperatorRefBase, stages: Set[Int], targetMap: mutable.Map[OperatorRefBase, OperatorData]): Unit = { val operatorData = targetMap.getOrElseUpdate(opRef, OperatorData(opRef)) @@ -45,6 +67,8 @@ case class OperatorCounter(planInfo: PlanInfo) { operatorData.stages ++= stages } + // Processes an `ExecInfo` node to update exec and expression counts. + // Separates supported and unsupported execs and expressions into their respective maps. private def processExecInfo(execInfo: ExecInfo): Unit = { val opMap = execInfo.isSupported match { case true => supportedMap @@ -62,6 +86,7 @@ case class OperatorCounter(planInfo: PlanInfo) { } } + // Counts the execs and expressions in the execution plan. private def countOperators(): Unit = { planInfo.execInfo.foreach { exec => exec.isClusterNode match { diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorRefBase.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorRefBase.scala index c4c54a692..04bd9a190 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorRefBase.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorRefBase.scala @@ -20,7 +20,17 @@ import com.nvidia.spark.rapids.tool.planparser.OpTypes import org.apache.spark.sql.rapids.tool.util.StringUtils +/** + * Base class representing a reference to an operator (either exec operator or expression operator). + * It provides methods to retrieve the operator's name and type in both raw and + * CSV-friendly formats. + * + * @param value The name of the operator. + * @param opType The type of the operator (e.g., Exec, Expr). + */ + class OperatorRefBase(val value: String, val opType: OpTypes.OpType) extends OperatorRefTrait { + // Preformatted values for CSV output to avoid reformatting multiple times. val csvValue: String = StringUtils.reformatCSVString(value) val csvOpType: String = StringUtils.reformatCSVString(opType.toString) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/UnsupportedExprOpRef.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/UnsupportedExprOpRef.scala index ef03faa64..0e17d8f0d 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/UnsupportedExprOpRef.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/UnsupportedExprOpRef.scala @@ -16,10 +16,26 @@ package com.nvidia.spark.rapids.tool.planparser.ops + +/** + * Represents a reference to an unsupported expression operator. + * Extends `OperatorRefBase` and includes a reason why the expression is unsupported. + * + * @param opRef The underlying `OpRef` for the expression. + * @param unsupportedReason A string describing why the expression is unsupported. + */ case class UnsupportedExprOpRef(opRef: OpRef, unsupportedReason: String) extends OperatorRefBase(opRef.value, opRef.opType) +// Provides a factory method to create an instance from an expression name and unsupported reason. object UnsupportedExprOpRef { + /** + * Creates an `UnsupportedExprOpRef` for the given expression name and unsupported reason. + * + * @param exprName The name of the unsupported expression. + * @param unsupportedReason A string describing why the expression is unsupported. + * @return An instance of `UnsupportedExprOpRef`. + */ def apply(exprName: String, unsupportedReason: String): UnsupportedExprOpRef = { val opRef = OpRef.fromExpr(exprName) UnsupportedExprOpRef(opRef, unsupportedReason) From 1d1f0fb167ca88d83dab6b1f70311aadc8603ffe Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Tue, 3 Dec 2024 15:33:12 -0800 Subject: [PATCH 5/8] qualx related changes --- .../tools/qualx/preprocess.py | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py b/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py index 320615b7c..2961f9f90 100644 --- a/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py +++ b/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py @@ -109,6 +109,10 @@ 'sqlOp_DeserializeToObject', 'sqlOp_Exchange', 'sqlOp_Execute InsertIntoHadoopFsRelationCommand', + 'sqlOp_Execute InsertIntoHadoopFsRelationCommand csv', + 'sqlOp_Execute InsertIntoHadoopFsRelationCommand parquet', + 'sqlOp_Execute InsertIntoHadoopFsRelationCommand orc', + 'sqlOp_Execute InsertIntoHadoopFsRelationCommand json', 'sqlOp_Expand', 'sqlOp_Filter', 'sqlOp_Generate', @@ -125,15 +129,16 @@ 'sqlOp_Project', 'sqlOp_ReusedSort', 'sqlOp_RunningWindowFunction', - 'sqlOp_Scan csv ', + 'sqlOp_Scan csv', 'sqlOp_Scan ExistingRDD Delta Table Checkpoint', 'sqlOp_Scan ExistingRDD Delta Table State', - 'sqlOp_Scan JDBCRelation', - 'sqlOp_Scan json ', + 'sqlOp_Scan ExistingRDD', + 'sqlOp_Scan jdbc', + 'sqlOp_Scan json', 'sqlOp_Scan OneRowRelation', - 'sqlOp_Scan orc ', - 'sqlOp_Scan parquet ', - 'sqlOp_Scan text ', + 'sqlOp_Scan orc', + 'sqlOp_Scan parquet', + 'sqlOp_Scan text', 'sqlOp_SerializeFromObject', 'sqlOp_Sort', 'sqlOp_SortAggregate', @@ -481,8 +486,8 @@ def combine_tables(table_name: str) -> pd.DataFrame: 'Scan DeltaCDFRelation', 'Scan ExistingRDD Delta Table Checkpoint', 'Scan ExistingRDD Delta Table State', - 'Scan JDBCRelation', - 'Scan parquet ', # trailing space is also in default sql op name + 'Scan jdbc', + 'Scan parquet', # GPU 'GpuScan parquet', ] From 1b3f5dd54ea917cfc6606068997feb077a0cdf24 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Tue, 3 Dec 2024 15:51:35 -0800 Subject: [PATCH 6/8] update qualx changes --- user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py b/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py index 2961f9f90..111515373 100644 --- a/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py +++ b/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py @@ -108,11 +108,11 @@ 'sqlOp_CustomShuffleReader', 'sqlOp_DeserializeToObject', 'sqlOp_Exchange', - 'sqlOp_Execute InsertIntoHadoopFsRelationCommand', 'sqlOp_Execute InsertIntoHadoopFsRelationCommand csv', 'sqlOp_Execute InsertIntoHadoopFsRelationCommand parquet', 'sqlOp_Execute InsertIntoHadoopFsRelationCommand orc', 'sqlOp_Execute InsertIntoHadoopFsRelationCommand json', + 'sqlOp_Execute InsertIntoHadoopFsRelationCommand unknown', 'sqlOp_Expand', 'sqlOp_Filter', 'sqlOp_Generate', @@ -139,6 +139,7 @@ 'sqlOp_Scan orc', 'sqlOp_Scan parquet', 'sqlOp_Scan text', + 'sqlOp_Scan unknown', 'sqlOp_SerializeFromObject', 'sqlOp_Sort', 'sqlOp_SortAggregate', From eb919da26ab4d0d97cb06b17878b80e3441ac2e4 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Tue, 3 Dec 2024 17:21:12 -0800 Subject: [PATCH 7/8] Fix for Scan OneRowRelation Signed-off-by: Niranjan Artal --- .../com/nvidia/spark/rapids/tool/planparser/ReadParser.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ReadParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ReadParser.scala index a87ed67d1..ac43bf783 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ReadParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ReadParser.scala @@ -42,6 +42,8 @@ case class ReadMetaData(schema: String, location: String, format: String, object ReadParser extends Logging { // It was found that some eventlogs could have "NativeScan" instead of "Scan" val SCAN_NODE_PREFIXES = Seq("Scan", "NativeScan") + // Do not include OneRowRelation in the scan nodes, consider it as regular Exec + val SCAN_ONE_ROW_RELATION = "Scan OneRowRelation" // DatasourceV2 node names that exactly match the following labels val DATASOURCE_V2_NODE_EXACT_PREF = Set( "BatchScan") @@ -66,7 +68,7 @@ object ReadParser extends Logging { ) def isScanNode(nodeName: String): Boolean = { - SCAN_NODE_PREFIXES.exists(nodeName.startsWith(_)) + SCAN_NODE_PREFIXES.exists(nodeName.startsWith(_)) && !nodeName.startsWith(SCAN_ONE_ROW_RELATION) } def isScanNode(node: SparkPlanGraphNode): Boolean = { From a2cc6705bbb56aad37836d6a1fd5ee93b5ff7bdb Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Wed, 4 Dec 2024 08:36:57 -0800 Subject: [PATCH 8/8] addressed review comments --- .../com/nvidia/spark/rapids/tool/planparser/ops/OpRef.scala | 2 +- .../main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala | 4 ++-- user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py | 1 + 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OpRef.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OpRef.scala index 845efa696..3e5f22c08 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OpRef.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OpRef.scala @@ -40,7 +40,7 @@ object OpRef { val OP_NAMES: ConcurrentHashMap[String, OpRef] = { val initMap = new ConcurrentHashMap[String, OpRef]() initMap.put(EMPTY_OP_NAME_REF.value, EMPTY_OP_NAME_REF) - // Add the accum to the map because it is being used internally. + // Add the operator to the map because it is being used internally. initMap } diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala index f8d4d7703..b90917cd8 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala @@ -378,11 +378,11 @@ abstract class AppBase( allMetaWithSchema.foreach { plan => val meta = plan.metadata val readSchema = ReadParser.formatSchemaStr(meta.getOrElse("ReadSchema", "")) - val scanNode = allNodes.filter(node => { + val scanNode = allNodes.filter(ReadParser.isScanNode(_)).filter(node => { // Get ReadSchema of each Node and sanitize it for comparison val trimmedNode = AppBase.trimSchema(ReadParser.parseReadNode(node).schema) readSchema.contains(trimmedNode) - }).filter(ReadParser.isScanNode(_)) + }) // If the ReadSchema is empty or if it is PhotonScan, then we don't need to // add it to the dataSourceInfo diff --git a/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py b/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py index 111515373..1bdbd76b7 100644 --- a/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py +++ b/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py @@ -112,6 +112,7 @@ 'sqlOp_Execute InsertIntoHadoopFsRelationCommand parquet', 'sqlOp_Execute InsertIntoHadoopFsRelationCommand orc', 'sqlOp_Execute InsertIntoHadoopFsRelationCommand json', + 'sqlOp_Execute InsertIntoHadoopFsRelationCommand text', 'sqlOp_Execute InsertIntoHadoopFsRelationCommand unknown', 'sqlOp_Expand', 'sqlOp_Filter',