From 993bc8f82e5c512349c3485fb15dc144f0bbc9b5 Mon Sep 17 00:00:00 2001 From: Niranjan Artal <50492963+nartal1@users.noreply.github.com> Date: Wed, 4 Dec 2024 14:03:15 -0800 Subject: [PATCH] Report all operators in the output file (#1444) * first iterations all the UTs pass Signed-off-by: Ahmed Hussein (amahussein) * running end-to-end Signed-off-by: Ahmed Hussein (amahussein) * cleaned-up exec parsers Signed-off-by: Ahmed Hussein (amahussein) * update documentation and fix Signed-off-by: Niranjan Artal * qualx related changes * update qualx changes * Fix for Scan OneRowRelation Signed-off-by: Niranjan Artal * addressed review comments --------- Signed-off-by: Niranjan Artal Co-authored-by: Ahmed Hussein (amahussein) --- .../tool/planparser/BatchScanExecParser.scala | 28 ++++- .../BroadcastExchangeExecParser.scala | 3 +- .../BroadcastHashJoinExecParser.scala | 6 +- .../BroadcastNestedLoopJoinExecParser.scala | 6 +- .../DataWritingCommandExecParser.scala | 6 +- .../tool/planparser/DeltaLakeHelper.scala | 4 +- .../rapids/tool/planparser/ExecParser.scala | 5 +- .../planparser/FileSourceScanExecParser.scala | 35 +++++- .../tool/planparser/GenericExecParser.scala | 14 ++- .../rapids/tool/planparser/ReadParser.scala | 12 +- .../tool/planparser/SQLPlanParser.scala | 73 ++++++++---- .../ShuffleExchangeExecParser.scala | 4 +- .../ShuffledHashJoinExecParser.scala | 4 +- .../planparser/SortMergeJoinExecParser.scala | 4 +- .../SubqueryBroadcastExecParser.scala | 5 +- .../tool/planparser/SubqueryExecParser.scala | 3 +- .../planparser/WholeStageExecParser.scala | 30 ++++- .../planparser/WindowGroupLimitParser.scala | 11 +- .../planparser/WriteFilesExecParser.scala | 5 +- .../rapids/tool/planparser/ops/OpRef.scala | 64 +++++++++++ .../tool/planparser/ops/OperatorCounter.scala | 106 ++++++++++++++++++ .../tool/planparser/ops/OperatorRefBase.scala | 41 +++++++ .../planparser/ops/OperatorRefTrait.scala | 24 ++++ .../planparser/ops/UnsupportedExprOpRef.scala | 43 +++++++ .../qualification/PluginTypeChecker.scala | 6 +- .../tool/qualification/QualOutputWriter.scala | 73 +++++++++++- .../tool/qualification/Qualification.scala | 1 + .../spark/sql/rapids/tool/AppBase.scala | 4 +- .../spark/sql/rapids/tool/ToolUtils.scala | 2 - .../qualification/QualificationAppInfo.scala | 9 +- .../db_sim_test_expectation.csv | 2 +- .../directory_test_expectation.csv | 2 +- .../jdbc_expectation.csv | 2 +- .../qual_test_missing_sql_end_expectation.csv | 2 +- .../qual_test_simple_expectation.csv | 2 +- .../truncated_1_end_expectation.csv | 2 +- .../tool/planparser/SqlPlanParserSuite.scala | 19 ++-- .../PluginTypeCheckerSuite.scala | 2 +- .../qualification/QualificationSuite.scala | 4 +- .../tools/qualx/preprocess.py | 25 +++-- 40 files changed, 574 insertions(+), 119 deletions(-) create mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OpRef.scala create mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorCounter.scala create mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorRefBase.scala create mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorRefTrait.scala create mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/UnsupportedExprOpRef.scala diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BatchScanExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BatchScanExecParser.scala index a8188065c..7c591b372 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BatchScanExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BatchScanExecParser.scala @@ -27,8 +27,8 @@ case class BatchScanExecParser( checker: PluginTypeChecker, sqlID: Long, app: AppBase) extends ExecParser with Logging { - - val fullExecName = "BatchScanExec" + val nodeName = "BatchScan" + val fullExecName = nodeName + "Exec" override def parse: ExecInfo = { val accumId = node.metrics.find(_.name == "scan time").map(_.accumulatorId) @@ -39,8 +39,26 @@ case class BatchScanExecParser( val speedupFactor = checker.getSpeedupFactor(fullExecName) val overallSpeedup = Math.max((speedupFactor * score), 1.0) - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, s"${node.name} ${readInfo.format}", s"Format: ${readInfo.format}", - overallSpeedup, maxDuration, node.id, score > 0, None) + // 1- Set the exec name to be the batchScan + format + // 2- If the format cannot be found, then put the entire node description to make it easy to + // troubleshoot by reading the output files. + val readFormat = readInfo.getReadFormatLC + val execExpression = if (readInfo.hasUnknownFormat) { + node.desc + } else { + s"Format: $readFormat" + } + + ExecInfo.createExecNoNode( + sqlID = sqlID, + exec = s"$nodeName $readFormat", + expr = execExpression, + speedupFactor = overallSpeedup, + duration = maxDuration, + nodeId = node.id, + opType = OpTypes.ReadExec, + isSupported = score > 0.0, + children = None, + expressions = Seq.empty) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastExchangeExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastExchangeExecParser.scala index f9c348ea6..e6bbc99d8 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastExchangeExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastExchangeExecParser.scala @@ -44,8 +44,7 @@ case class BroadcastExchangeExecParser( } else { (1.0, false) } - // TODO - add in parsing expressions - average speedup across? ExecInfo(node, sqlID, node.name, "", filterSpeedupFactor, - duration, node.id, isSupported, None) + duration, node.id, isSupported, children = None, expressions = Seq.empty) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastHashJoinExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastHashJoinExecParser.scala index 8426f1047..8acd2db7b 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastHashJoinExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastHashJoinExecParser.scala @@ -30,7 +30,7 @@ case class BroadcastHashJoinExecParser( override def parse: ExecInfo = { // BroadcastHashJoin doesn't have duration val duration = None - val exprString = node.desc.replaceFirst("BroadcastHashJoin ", "") + val exprString = node.desc.replaceFirst("^BroadcastHashJoin\\s*", "") val (expressions, supportedJoinType) = SQLPlanParser.parseEquijoinsExpressions(exprString) val notSupportedExprs = expressions.filterNot(expr => checker.isExprSupported(expr)) val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && @@ -39,7 +39,7 @@ case class BroadcastHashJoinExecParser( } else { (1.0, false) } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) + ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, + children = None, expressions = expressions) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastNestedLoopJoinExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastNestedLoopJoinExecParser.scala index 49f242efc..f6be5decd 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastNestedLoopJoinExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastNestedLoopJoinExecParser.scala @@ -39,7 +39,7 @@ abstract class BroadcastNestedLoopJoinExecParserBase( override def parse: ExecInfo = { // BroadcastNestedLoopJoin doesn't have duration - val exprString = node.desc.replaceFirst("BroadcastNestedLoopJoin ", "") + val exprString = node.desc.replaceFirst("^BroadcastNestedLoopJoin\\s*", "") val (buildSide, joinType) = extractBuildAndJoinTypes(exprString) val (expressions, supportedJoinType) = SQLPlanParser.parseNestedLoopJoinExpressions(exprString, buildSide, joinType) @@ -51,8 +51,8 @@ abstract class BroadcastNestedLoopJoinExecParserBase( } else { (1.0, false) } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) + ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, + children = None, expressions = expressions) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala index fedbd2ceb..86e553ac1 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala @@ -37,11 +37,11 @@ case class DataWritingCommandExecParser( val duration = None val speedupFactor = checker.getSpeedupFactor(wStub.mappedExec) val finalSpeedup = if (writeSupported) speedupFactor else 1 - // TODO - add in parsing expressions - average speedup across? // We do not want to parse the node description to avoid mistakenly marking the node as RDD/UDF ExecInfo.createExecNoNode(sqlID, s"${node.name.trim} ${wStub.dataFormat.toLowerCase.trim}", s"Format: ${wStub.dataFormat.toLowerCase.trim}", - finalSpeedup, duration, node.id, opType = OpTypes.WriteExec, writeSupported, None) + finalSpeedup, duration, node.id, opType = OpTypes.WriteExec, writeSupported, + children = None, expressions = Seq.empty) } } @@ -175,4 +175,4 @@ object DataWritingCommandExecParser { parsedString.split(",")(0) // return third parameter from the input string } } -} \ No newline at end of file +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DeltaLakeHelper.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DeltaLakeHelper.scala index 82ece4f45..884ad8907 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DeltaLakeHelper.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DeltaLakeHelper.scala @@ -45,10 +45,10 @@ class DLWriteWithFormatAndSchemaParser(node: SparkPlanGraphNode, val finalSpeedupFactor = if (writeSupported) speedupFactor else 1.0 // execs like SaveIntoDataSourceCommand has prefix "Execute". So, we need to get rid of it. - val nodeName = node.name.replace("Execute ", "") + val nodeName = node.name.replaceFirst("Execute\\s*", "") ExecInfo.createExecNoNode(sqlID, nodeName, s"Format: $dataFormat", finalSpeedupFactor, None, node.id, OpTypes.WriteExec, - isSupported = writeSupported && isExecSupported, children = None) + isSupported = writeSupported && isExecSupported, children = None, expressions = Seq.empty) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExecParser.scala index b43d9943f..c0ea0f369 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExecParser.scala @@ -16,7 +16,7 @@ package com.nvidia.spark.rapids.tool.planparser -import org.apache.spark.sql.rapids.tool.UnsupportedExpr +import com.nvidia.spark.rapids.tool.planparser.ops.UnsupportedExprOpRef trait ExecParser { def parse: ExecInfo @@ -32,5 +32,6 @@ trait ExecParser { * @param expressions Array of expression strings to evaluate for support. * @return Empty Seq[UnsupportedExpr], indicating no unsupported expressions by default. */ - def getUnsupportedExprReasonsForExec(expressions: Array[String]): Seq[UnsupportedExpr] = Seq.empty + def getUnsupportedExprReasonsForExec( + expressions: Array[String]): Seq[UnsupportedExprOpRef] = Seq.empty } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FileSourceScanExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FileSourceScanExecParser.scala index 85387872f..520d05832 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FileSourceScanExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FileSourceScanExecParser.scala @@ -30,6 +30,9 @@ case class FileSourceScanExecParser( // The node name for Scans is Scan so here we hardcode val fullExecName = "FileSourceScanExec" + // Matches the first alphaneumeric characters of a string after trimming leading/trailing + // white spaces. + val nodeNameRegeX = """^\s*(\w+).*""".r override def parse: ExecInfo = { // Remove trailing spaces from node name @@ -37,7 +40,7 @@ case class FileSourceScanExecParser( val nodeName = node.name.trim val rddCheckRes = RDDCheckHelper.isDatasetOrRDDPlan(nodeName, node.desc) if (rddCheckRes.nodeNameRDD) { - // This is a scanRDD. we do not need to parse it as a normal node. + // This is a scanRDD. We do not need to parse it as a normal node. // cleanup the node name if possible: val newNodeName = if (nodeName.contains("ExistingRDD")) { val nodeNameLength = nodeName.indexOf("ExistingRDD") + "ExistingRDD".length @@ -46,7 +49,7 @@ case class FileSourceScanExecParser( nodeName } ExecInfo.createExecNoNode(sqlID, newNodeName, "", 1.0, duration = None, - node.id, OpTypes.ReadRDD, false, None) + node.id, OpTypes.ReadRDD, false, children = None, expressions = Seq.empty) } else { val accumId = node.metrics.find(_.name == "scan time").map(_.accumulatorId) val maxDuration = SQLPlanParser.getTotalDuration(accumId, app) @@ -57,14 +60,36 @@ case class FileSourceScanExecParser( // Use the default parser (fullExecName, ReadParser.parseReadNode(node)) } + // 1- Set the exec name to nodeLabel + format + // 2- If the format is not found, then put the entire node description to make it easy to + // troubleshoot by reading the output files. + val nodeLabel = nodeNameRegeX.findFirstMatchIn(nodeName) match { + case Some(m) => m.group(1) + // in case not found, use the full exec name + case None => execName + } + val readFormat = readInfo.getReadFormatLC + val exexExpr = if (readInfo.hasUnknownFormat) { + node.desc + } else { + s"Format: ${readFormat}" + } val speedupFactor = checker.getSpeedupFactor(execName) // don't use the isExecSupported because we have finer grain. val score = ReadParser.calculateReadScoreRatio(readInfo, checker) val overallSpeedup = Math.max(speedupFactor * score, 1.0) - // TODO - add in parsing expressions - average speedup across? - ExecInfo.createExecNoNode(sqlID, nodeName, "", overallSpeedup, maxDuration, - node.id, OpTypes.ReadExec, score > 0, None) + ExecInfo.createExecNoNode( + sqlID = sqlID, + exec = s"$nodeLabel $readFormat", + expr = exexExpr, + speedupFactor = overallSpeedup, + duration = maxDuration, + nodeId = node.id, + opType = OpTypes.ReadExec, + isSupported = score > 0, + children = None, + expressions = Seq.empty) } } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala index 62ffb5eaa..6295c5533 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala @@ -16,10 +16,11 @@ package com.nvidia.spark.rapids.tool.planparser +import com.nvidia.spark.rapids.tool.planparser.ops.UnsupportedExprOpRef import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker import org.apache.spark.sql.execution.ui.SparkPlanGraphNode -import org.apache.spark.sql.rapids.tool.{AppBase, UnsupportedExpr} +import org.apache.spark.sql.rapids.tool.AppBase class GenericExecParser( val node: SparkPlanGraphNode, @@ -46,7 +47,8 @@ class GenericExecParser( (1.0, false) } - createExecInfo(speedupFactor, isSupported, duration, notSupportedExprs) + createExecInfo(speedupFactor, isSupported, duration, + notSupportedExprs = notSupportedExprs, expressions = expressions) } protected def parseExpressions(): Array[String] = { @@ -63,7 +65,7 @@ class GenericExecParser( node.desc.replaceFirst(s"^${node.name}\\s*", "") } - protected def getNotSupportedExprs(expressions: Array[String]): Seq[UnsupportedExpr] = { + protected def getNotSupportedExprs(expressions: Array[String]): Seq[UnsupportedExprOpRef] = { checker.getNotSupportedExprs(expressions) } @@ -83,7 +85,8 @@ class GenericExecParser( speedupFactor: Double, isSupported: Boolean, duration: Option[Long], - notSupportedExprs: Seq[UnsupportedExpr] + notSupportedExprs: Seq[UnsupportedExprOpRef], + expressions: Array[String] ): ExecInfo = { ExecInfo( node, @@ -95,7 +98,8 @@ class GenericExecParser( node.id, isSupported, None, - unsupportedExprs = notSupportedExprs + unsupportedExprs = notSupportedExprs, + expressions = expressions ) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ReadParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ReadParser.scala index 131d6b2d3..ac43bf783 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ReadParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ReadParser.scala @@ -29,11 +29,21 @@ case class ReadMetaData(schema: String, location: String, format: String, def pushedFilters: String = tags(ReadParser.METAFIELD_TAG_PUSHED_FILTERS) def dataFilters: String = tags(ReadParser.METAFIELD_TAG_DATA_FILTERS) def partitionFilters: String = tags(ReadParser.METAFIELD_TAG_PARTITION_FILTERS) + + def hasUnknownFormat: Boolean = format.equals(ReadParser.UNKNOWN_METAFIELD) + + /** + * Returns the read format in lowercase. This is used to be consistent. + * @return the lower case of the read format + */ + def getReadFormatLC: String = format.toLowerCase } object ReadParser extends Logging { // It was found that some eventlogs could have "NativeScan" instead of "Scan" val SCAN_NODE_PREFIXES = Seq("Scan", "NativeScan") + // Do not include OneRowRelation in the scan nodes, consider it as regular Exec + val SCAN_ONE_ROW_RELATION = "Scan OneRowRelation" // DatasourceV2 node names that exactly match the following labels val DATASOURCE_V2_NODE_EXACT_PREF = Set( "BatchScan") @@ -58,7 +68,7 @@ object ReadParser extends Logging { ) def isScanNode(nodeName: String): Boolean = { - SCAN_NODE_PREFIXES.exists(nodeName.startsWith(_)) + SCAN_NODE_PREFIXES.exists(nodeName.startsWith(_)) && !nodeName.startsWith(SCAN_ONE_ROW_RELATION) } def isScanNode(node: SparkPlanGraphNode): Boolean = { diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala index ad2634c41..17159842b 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala @@ -21,16 +21,18 @@ import scala.collection.mutable.{ArrayBuffer, WeakHashMap} import scala.util.control.NonFatal import scala.util.matching.Regex +import com.nvidia.spark.rapids.tool.planparser.ops.{OperatorRefBase, OpRef, UnsupportedExprOpRef} import com.nvidia.spark.rapids.tool.planparser.photon.{PhotonPlanParser, PhotonStageExecParser} import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.SparkPlanInfo import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphNode} -import org.apache.spark.sql.rapids.tool.{AppBase, BuildSide, ExecHelper, JoinType, RDDCheckHelper, ToolUtils, UnsupportedExpr} +import org.apache.spark.sql.rapids.tool.{AppBase, BuildSide, ExecHelper, JoinType, RDDCheckHelper, ToolUtils} import org.apache.spark.sql.rapids.tool.util.ToolsPlanGraph import org.apache.spark.sql.rapids.tool.util.plangraph.{PhotonSparkPlanGraphCluster, PhotonSparkPlanGraphNode} + object OpActions extends Enumeration { type OpAction = Value val NONE, IgnoreNoPerf, IgnorePerf, Triage = Value @@ -73,11 +75,10 @@ object UnsupportedReasons extends Enumeration { case class UnsupportedExecSummary( sqlId: Long, execId: Long, - execValue: String, + execRef: OperatorRefBase, opType: OpTypes.OpType, reason: UnsupportedReasons.UnsupportedReason, - opAction: OpActions.OpAction, - isExpression: Boolean = false) { + opAction: OpActions.OpAction) { val finalOpType: String = if (opType.equals(OpTypes.UDF) || opType.equals(OpTypes.DataSet)) { s"${OpTypes.Exec.toString}" @@ -85,14 +86,16 @@ case class UnsupportedExecSummary( s"${opType.toString}" } - val unsupportedOperator: String = execValue + val unsupportedOperatorCSVFormat: String = execRef.getOpNameCSV val details: String = UnsupportedReasons.reportUnsupportedReason(reason) + + def isExpression: Boolean = execRef.isInstanceOf[UnsupportedExprOpRef] } case class ExecInfo( sqlID: Long, - exec: String, + execRef: OpRef, expr: String, speedupFactor: Double, duration: Option[Long], @@ -103,10 +106,11 @@ case class ExecInfo( var stages: Set[Int], var shouldRemove: Boolean, var unsupportedExecReason: String, - unsupportedExprs: Seq[UnsupportedExpr], + unsupportedExprs: Seq[UnsupportedExprOpRef], dataSet: Boolean, udf: Boolean, - shouldIgnore: Boolean) { + shouldIgnore: Boolean, + expressions: Seq[OpRef]) { private def childrenToString = { val str = children.map { c => @@ -119,6 +123,12 @@ case class ExecInfo( } } + def exec: String = execRef.value + + def isClusterNode: Boolean = { + execRef.getOpName.contains("StageCodegen") || execRef.getOpName.contains("PhotonResultStage") + } + override def toString: String = { s"exec: $exec, expr: $expr, sqlID: $sqlID , speedupFactor: $speedupFactor, " + s"duration: $duration, nodeId: $nodeId, " + @@ -194,7 +204,7 @@ case class ExecInfo( getUnsupportedReason) // Initialize the result with the exec summary - val res = ArrayBuffer(UnsupportedExecSummary(sqlID, execId, exec, opType, + val res = ArrayBuffer(UnsupportedExecSummary(sqlID, execId, execRef, opType, execUnsupportedReason, getOpAction)) // TODO: Should we iterate on exec children? @@ -211,8 +221,8 @@ case class ExecInfo( unsupportedExprs.foreach { expr => val exprUnsupportedReason = determineUnsupportedReason(expr.unsupportedReason, exprKnownReason) - res += UnsupportedExecSummary(sqlID, execId, expr.exprName, OpTypes.Expr, - exprUnsupportedReason, getOpAction, isExpression = true) + res += UnsupportedExecSummary(sqlID, execId, expr, OpTypes.Expr, + exprUnsupportedReason, getOpAction) } } res @@ -235,9 +245,10 @@ object ExecInfo { stages: Set[Int] = Set.empty, shouldRemove: Boolean = false, unsupportedExecReason: String = "", - unsupportedExprs: Seq[UnsupportedExpr] = Seq.empty, + unsupportedExprs: Seq[UnsupportedExprOpRef] = Seq.empty, dataSet: Boolean = false, - udf: Boolean = false): ExecInfo = { + udf: Boolean = false, + expressions: Seq[String]): ExecInfo = { // Set the ignoreFlag // 1- we ignore any exec with UDF // 2- we ignore any exec with dataset @@ -259,7 +270,7 @@ object ExecInfo { val supportedFlag = isSupported && !udf && !finalDataSet ExecInfo( sqlID, - exec, + OpRef.fromExec(exec), expr, speedupFactor, duration, @@ -273,7 +284,9 @@ object ExecInfo { unsupportedExprs, finalDataSet, udf, - shouldIgnore + shouldIgnore, + // convert array of string expressions to OpRefs + expressions = expressions.map(OpRef.fromExpr) ) } @@ -290,10 +303,11 @@ object ExecInfo { stages: Set[Int] = Set.empty, shouldRemove: Boolean = false, unsupportedExecReason:String = "", - unsupportedExprs: Seq[UnsupportedExpr] = Seq.empty, + unsupportedExprs: Seq[UnsupportedExprOpRef] = Seq.empty, dataSet: Boolean = false, udf: Boolean = false, - opType: OpTypes.OpType = OpTypes.Exec): ExecInfo = { + opType: OpTypes.OpType = OpTypes.Exec, + expressions: Seq[String]): ExecInfo = { // Some execs need to be trimmed such as "Scan" // Example: Scan parquet . -> Scan parquet. // scan nodes needs trimming @@ -307,7 +321,7 @@ object ExecInfo { // if the expression is RDD because of the node name, then we do not want to add the // unsupportedExpressions because it becomes bogus. val finalUnsupportedExpr = if (rddCheckRes.nodeDescRDD) { - Seq.empty[UnsupportedExpr] + Seq.empty[UnsupportedExprOpRef] } else { unsupportedExprs } @@ -326,7 +340,8 @@ object ExecInfo { unsupportedExecReason, finalUnsupportedExpr, ds, - containsUDF + containsUDF, + expressions = expressions ) } } @@ -335,8 +350,18 @@ case class PlanInfo( appID: String, sqlID: Long, sqlDesc: String, - execInfo: Seq[ExecInfo] -) + execInfo: Seq[ExecInfo]) { + def getUnsupportedExpressions: Seq[OperatorRefBase] = { + execInfo.flatMap { e => + if (e.isClusterNode) { + // wholeStageCodeGen does not have expressions/unsupported-expressions + e.children.getOrElse(Seq.empty).flatMap(_.unsupportedExprs) + } else { + e.unsupportedExprs + } + } + } +} object SQLPlanParser extends Logging { @@ -528,7 +553,8 @@ object SQLPlanParser extends Logging { // supported but with shouldRemove flag set to True. // Setting the "shouldRemove" is handled at the end of the function. ExecInfo(node, sqlID, normalizedNodeName, expr = "", 1, duration = None, node.id, - isSupported = reuseExecs.contains(normalizedNodeName), None) + isSupported = reuseExecs.contains(normalizedNodeName), children = None, + expressions = Seq.empty) } } @@ -584,7 +610,8 @@ object SQLPlanParser extends Logging { logWarning(s"Unexpected error parsing plan node ${normalizedNodeName}. " + s" sqlID = ${sqlID}", e) ExecInfo(node, sqlID, normalizedNodeName, expr = "", 1, duration = None, node.id, - isSupported = false, None) + isSupported = false, children = None, + expressions = Seq.empty) } val stagesInNode = nodeIdToStagesFunc(node.id) execInfo.setStages(stagesInNode) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffleExchangeExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffleExchangeExecParser.scala index 2a1315718..2026a2a17 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffleExchangeExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffleExchangeExecParser.scala @@ -44,7 +44,7 @@ case class ShuffleExchangeExecParser( } else { (1.0, false) } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", filterSpeedupFactor, duration, node.id, isSupported, None) + ExecInfo(node, sqlID, node.name, "", filterSpeedupFactor, duration, node.id, isSupported, + children = None, expressions = Seq.empty) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffledHashJoinExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffledHashJoinExecParser.scala index b81f33fd6..4c90928a4 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffledHashJoinExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffledHashJoinExecParser.scala @@ -43,9 +43,7 @@ case class ShuffledHashJoinExecParser( } else { (1.0, false) } - - // TODO - add in parsing expressions - average speedup across? ExecInfo(node, sqlID, node.name, "", speedupFactor, - maxDuration, node.id, isSupported, None) + maxDuration, node.id, isSupported, children = None, expressions = expressions) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortMergeJoinExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortMergeJoinExecParser.scala index f98a0542b..4828d8ac9 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortMergeJoinExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortMergeJoinExecParser.scala @@ -42,8 +42,8 @@ case class SortMergeJoinExecParser( } else { (1.0, false) } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, opName, "", speedupFactor, duration, node.id, isSupported, None) + ExecInfo(node, sqlID, opName, "", speedupFactor, duration, node.id, isSupported, + children = None, expressions = expressions) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SubqueryBroadcastExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SubqueryBroadcastExecParser.scala index 64fc5ff5e..048a317b0 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SubqueryBroadcastExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SubqueryBroadcastExecParser.scala @@ -38,8 +38,7 @@ case class SubqueryBroadcastExecParser( } else { (1.0, false) } - // TODO - check is broadcast associated can be replaced - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", filterSpeedupFactor, duration, node.id, isSupported, None) + ExecInfo(node, sqlID, node.name, "", filterSpeedupFactor, duration, node.id, isSupported, + children = None, expressions = Seq.empty) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SubqueryExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SubqueryExecParser.scala index c60eff7ac..e01f7a27a 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SubqueryExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SubqueryExecParser.scala @@ -39,7 +39,8 @@ case class SubqueryExecParser( // TODO: Should we also collect the "data size" metric? val duration = SQLPlanParser.getDriverTotalDuration(collectTimeId, app) // should remove is kept in 1 place. So no need to set it here. - ExecInfo(node, sqlID, node.name, "", 1.0, duration, node.id, isSupported = false, None) + ExecInfo(node, sqlID, node.name, "", 1.0, duration, node.id, isSupported = false, + children = None, expressions = Seq.empty) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala index cdef3b225..5b8730938 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala @@ -31,6 +31,9 @@ abstract class WholeStageExecParserBase( nodeIdToStagesFunc: Long => Set[Int]) extends Logging { val fullExecName = "WholeStageCodegenExec" + // Matches the first alphanumeric characters of a string after trimming leading/trailing + // white spaces. + val nodeNameRegeX = """^\s*(\w+).*""".r def parse: Seq[ExecInfo] = { // TODO - does metrics for time have previous ops? per op thing, only some do @@ -50,17 +53,34 @@ abstract class WholeStageExecParserBase( } // if any of the execs in WholeStageCodegen supported mark this entire thing as supported val anySupported = childNodes.exists(_.isSupported == true) - val unSupportedExprsArray = - childNodes.filter(_.unsupportedExprs.nonEmpty).flatMap(x => x.unsupportedExprs).toArray // average speedup across the execs in the WholeStageCodegen for now val supportedChildren = childNodes.filterNot(_.shouldRemove) val avSpeedupFactor = SQLPlanParser.averageSpeedup(supportedChildren.map(_.speedupFactor)) // The node should be marked as shouldRemove when all the children of the // wholeStageCodeGen are marked as shouldRemove. val removeNode = isDupNode || childNodes.forall(_.shouldRemove) - val execInfo = ExecInfo(node, sqlID, node.name, node.name, avSpeedupFactor, maxDuration, - node.id, anySupported, Some(childNodes), stagesInNode, - shouldRemove = removeNode, unsupportedExprs = unSupportedExprsArray) + // Remove any suffix in order to get the node label without any trailing number. + val nodeLabel = nodeNameRegeX.findFirstMatchIn(node.name) match { + case Some(m) => m.group(1) + // in case not found, use the full exec name + case None => fullExecName + } + val execInfo = ExecInfo( + node = node, + sqlID = sqlID, + exec = nodeLabel, + expr = node.name, + speedupFactor = avSpeedupFactor, + duration = maxDuration, + nodeId = node.id, + isSupported = anySupported, + children = Some(childNodes), + stages = stagesInNode, + shouldRemove = removeNode, + // unsupported expressions should not be set for the cluster nodes. + unsupportedExprs = Seq.empty, + // expressions of wholeStageCodeGen should not be set. They belong to the children nodes. + expressions = Seq.empty) Seq(execInfo) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowGroupLimitParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowGroupLimitParser.scala index 01b5edc3d..853c1b767 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowGroupLimitParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowGroupLimitParser.scala @@ -16,10 +16,10 @@ package com.nvidia.spark.rapids.tool.planparser +import com.nvidia.spark.rapids.tool.planparser.ops.UnsupportedExprOpRef import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker import org.apache.spark.sql.execution.ui.SparkPlanGraphNode -import org.apache.spark.sql.rapids.tool.UnsupportedExpr case class WindowGroupLimitParser( node: SparkPlanGraphNode, @@ -36,10 +36,10 @@ case class WindowGroupLimitParser( } override def getUnsupportedExprReasonsForExec( - expressions: Array[String]): Seq[UnsupportedExpr] = { + expressions: Array[String]): Seq[UnsupportedExprOpRef] = { expressions.flatMap { expr => if (!supportedRankingExprs.contains(expr)) { - Some(UnsupportedExpr(expr, + Some(UnsupportedExprOpRef(expr, s"Ranking function $expr is not supported in $fullExecName")) } else { None @@ -57,7 +57,7 @@ case class WindowGroupLimitParser( * 3. Ranking function is supported by plugin's implementation of WindowGroupLimitExec. */ override def parse: ExecInfo = { - val exprString = node.desc.replaceFirst("WindowGroupLimit ", "") + val exprString = node.desc.replaceFirst("WindowGroupLimit\\s*", "") val expressions = SQLPlanParser.parseWindowGroupLimitExpressions(exprString) val notSupportedExprs = checker.getNotSupportedExprs(expressions) ++ getUnsupportedExprReasonsForExec(expressions) @@ -69,8 +69,7 @@ case class WindowGroupLimitParser( } else { (1.0, false) } - // TODO - add in parsing expressions - average speedup across? ExecInfo(node, sqlID, node.name, "", speedupFactor, None, node.id, isSupported, None, - unsupportedExprs = notSupportedExprs) + unsupportedExprs = notSupportedExprs, expressions = expressions) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WriteFilesExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WriteFilesExecParser.scala index bcd2272e0..0fa6cb254 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WriteFilesExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WriteFilesExecParser.scala @@ -44,7 +44,10 @@ case class WriteFilesExecParser( "", speedupFactor, duration, - node.id, opType = OpTypes.WriteExec, true, None) + node.id, opType = OpTypes.WriteExec, + isSupported = true, + children = None, + expressions = Seq.empty) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OpRef.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OpRef.scala new file mode 100644 index 000000000..3e5f22c08 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OpRef.scala @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.planparser.ops + +import java.util.concurrent.ConcurrentHashMap + +import com.nvidia.spark.rapids.tool.planparser.OpTypes + + +/** + * A reference to an operator(either Exec operator or expression operator) in a Spark plan. + * + * @param value The name of the operator. + * @param opType The type of the operator (e.g., Exec, Expr). + */ +case class OpRef(override val value: String, + override val opType: OpTypes.OpType) extends OperatorRefBase(value, opType) + +object OpRef { + // Dummy OpNameRef to represent None accumulator names. This is an optimization to avoid + // storing an option[string] for all operator names which leads to "get-or-else" everywhere. + private val EMPTY_OP_NAME_REF: OpRef = new OpRef("", OpTypes.Exec) + // A global table to store reference to all operator names. The map is accessible by all + // threads (different applications) running in parallel. This avoids duplicate work across + // different threads. + val OP_NAMES: ConcurrentHashMap[String, OpRef] = { + val initMap = new ConcurrentHashMap[String, OpRef]() + initMap.put(EMPTY_OP_NAME_REF.value, EMPTY_OP_NAME_REF) + // Add the operator to the map because it is being used internally. + initMap + } + + /** + * Retrieves an `OpRef` for an expression operator. + * If the operator name already exists in the cache, it returns the existing `OpRef`. + * Otherwise, it creates a new `OpRef` with the given name and `OpTypes.Expr`. + */ + def fromExpr(name: String): OpRef = { + OP_NAMES.computeIfAbsent(name, OpRef.apply(_, OpTypes.Expr)) + } + + /** + * Retrieves an `OpRef` for an exec operator. + * If the operator name already exists in the cache, it returns the existing `OpRef`. + * Otherwise, it creates a new `OpRef` with the given name and `OpTypes.Exec`. + */ + def fromExec(name: String): OpRef = { + OP_NAMES.computeIfAbsent(name, OpRef.apply(_, OpTypes.Exec)) + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorCounter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorCounter.scala new file mode 100644 index 000000000..fab9422aa --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorCounter.scala @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.planparser.ops + +import scala.collection.mutable + +import com.nvidia.spark.rapids.tool.planparser.{ExecInfo, PlanInfo} + +/** + * `OperatorCounter` is responsible for counting the occurrences of execs and expressions + * in a given execution plan (`PlanInfo`). It maintains counts separately for supported and + * unsupported execs and expressions. + * + * @param planInfo The execution plan information to analyze. + */ +case class OperatorCounter(planInfo: PlanInfo) { + + /** + * Represents data for an exec or expression, including its reference, + * occurrence count, and stages where it appears. + * + * @param opRef The operator reference. + * @param count The number of times the operator appears. + * @param stages The set of stages where the operator appears. + */ + case class OperatorData( + opRef: OperatorRefBase, + var count: Int = 0, + var stages: Set[Int] = Set()) + + // Summarizes the count information for an exec or expression, including whether it is supported. + case class OperatorCountSummary( + opData: OperatorData, + isSupported: Boolean) + + private val supportedMap: mutable.Map[OperatorRefBase, OperatorData] = mutable.Map() + private val unsupportedMap: mutable.Map[OperatorRefBase, OperatorData] = mutable.Map() + + // Returns a sequence of `OperatorCountSummary`, combining both supported and + // unsupported operators. + def getOpsCountSummary(): Seq[OperatorCountSummary] = { + supportedMap.values.map(OperatorCountSummary(_, isSupported = true)).toSeq ++ + unsupportedMap.values.map(OperatorCountSummary(_, isSupported = false)).toSeq + } + + + // Updates the operator data in the given map (supported or unsupported). + // Increments the count and updates the stages where the operator appears. + private def updateOpRefEntry(opRef: OperatorRefBase, stages: Set[Int], + targetMap: mutable.Map[OperatorRefBase, OperatorData]): Unit = { + val operatorData = targetMap.getOrElseUpdate(opRef, OperatorData(opRef)) + operatorData.count += 1 + operatorData.stages ++= stages + } + + // Processes an `ExecInfo` node to update exec and expression counts. + // Separates supported and unsupported execs and expressions into their respective maps. + private def processExecInfo(execInfo: ExecInfo): Unit = { + val opMap = execInfo.isSupported match { + case true => supportedMap + case false => unsupportedMap + } + updateOpRefEntry(execInfo.execRef, execInfo.stages, opMap) + // update the map for supported expressions. We should exclude the unsupported expressions. + execInfo.expressions.filterNot( + e => execInfo.unsupportedExprs.exists(exp => exp.opRef.equals(e))).foreach { expr => + updateOpRefEntry(expr, execInfo.stages, supportedMap) + } + // update the map for unsupported expressions + execInfo.unsupportedExprs.foreach { expr => + updateOpRefEntry(expr, execInfo.stages, unsupportedMap) + } + } + + // Counts the execs and expressions in the execution plan. + private def countOperators(): Unit = { + planInfo.execInfo.foreach { exec => + exec.isClusterNode match { + // we do not want to count the cluster nodes in that aggregation + case true => + if (exec.children.nonEmpty) { + exec.children.get.foreach { child => + processExecInfo(child) + } + } + case false => processExecInfo(exec) + } + } + } + + countOperators() +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorRefBase.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorRefBase.scala new file mode 100644 index 000000000..04bd9a190 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorRefBase.scala @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.planparser.ops + +import com.nvidia.spark.rapids.tool.planparser.OpTypes + +import org.apache.spark.sql.rapids.tool.util.StringUtils + +/** + * Base class representing a reference to an operator (either exec operator or expression operator). + * It provides methods to retrieve the operator's name and type in both raw and + * CSV-friendly formats. + * + * @param value The name of the operator. + * @param opType The type of the operator (e.g., Exec, Expr). + */ + +class OperatorRefBase(val value: String, val opType: OpTypes.OpType) extends OperatorRefTrait { + // Preformatted values for CSV output to avoid reformatting multiple times. + val csvValue: String = StringUtils.reformatCSVString(value) + val csvOpType: String = StringUtils.reformatCSVString(opType.toString) + + override def getOpName: String = value + override def getOpNameCSV: String = csvValue + override def getOpType: String = opType.toString + override def getOpTypeCSV: String = csvOpType +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorRefTrait.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorRefTrait.scala new file mode 100644 index 000000000..40385ba7a --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorRefTrait.scala @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.planparser.ops + +trait OperatorRefTrait { + def getOpName: String + def getOpNameCSV: String + def getOpType: String + def getOpTypeCSV: String +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/UnsupportedExprOpRef.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/UnsupportedExprOpRef.scala new file mode 100644 index 000000000..0e17d8f0d --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/UnsupportedExprOpRef.scala @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.planparser.ops + + +/** + * Represents a reference to an unsupported expression operator. + * Extends `OperatorRefBase` and includes a reason why the expression is unsupported. + * + * @param opRef The underlying `OpRef` for the expression. + * @param unsupportedReason A string describing why the expression is unsupported. + */ +case class UnsupportedExprOpRef(opRef: OpRef, + unsupportedReason: String) extends OperatorRefBase(opRef.value, opRef.opType) + +// Provides a factory method to create an instance from an expression name and unsupported reason. +object UnsupportedExprOpRef { + /** + * Creates an `UnsupportedExprOpRef` for the given expression name and unsupported reason. + * + * @param exprName The name of the unsupported expression. + * @param unsupportedReason A string describing why the expression is unsupported. + * @return An instance of `UnsupportedExprOpRef`. + */ + def apply(exprName: String, unsupportedReason: String): UnsupportedExprOpRef = { + val opRef = OpRef.fromExpr(exprName) + UnsupportedExprOpRef(opRef, unsupportedReason) + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala index f04ddcac7..44bbf9fa3 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala @@ -21,11 +21,11 @@ import scala.io.BufferedSource import scala.util.control.NonFatal import com.nvidia.spark.rapids.tool.{Platform, PlatformFactory} +import com.nvidia.spark.rapids.tool.planparser.ops.UnsupportedExprOpRef import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.internal.Logging -import org.apache.spark.sql.rapids.tool.UnsupportedExpr import org.apache.spark.sql.rapids.tool.util.UTF8Source object OpSuppLevel extends Enumeration { @@ -387,11 +387,11 @@ class PluginTypeChecker(platform: Platform = PlatformFactory.createInstance(), exprSupported.isSupported } - def getNotSupportedExprs(exprs: Seq[String]): Seq[UnsupportedExpr] = { + def getNotSupportedExprs(exprs: Seq[String]): Seq[UnsupportedExprOpRef] = { exprs.collect { case expr if !isExprSupported(expr) => val reason = unsupportedOpsReasons.getOrElse(expr, "") - UnsupportedExpr(expr, reason) + UnsupportedExprOpRef(expr, reason) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala index ed101d820..9dbe7c920 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala @@ -23,6 +23,7 @@ import scala.collection.mutable.{Buffer, LinkedHashMap, ListBuffer} import com.nvidia.spark.rapids.tool.ToolTextFileWriter import com.nvidia.spark.rapids.tool.planparser.{DatabricksParseHelper, ExecInfo, PlanInfo, UnsupportedExecSummary} +import com.nvidia.spark.rapids.tool.planparser.ops.OperatorCounter import com.nvidia.spark.rapids.tool.profiling.AppStatusResult import com.nvidia.spark.rapids.tool.profiling.ProfileUtils.replaceDelimiter import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.{CLUSTER_ID, CLUSTER_ID_STR_SIZE, JOB_ID, JOB_ID_STR_SIZE, RUN_NAME, RUN_NAME_STR_SIZE, TEXT_DELIMITER} @@ -172,6 +173,24 @@ class QualOutputWriter(outputDir: String, reportReadSchema: Boolean, } } + def writeAllOpsSummaryCSVReport( + sums: Seq[QualificationSummaryInfo]): Unit = { + val csvFileWriter = new ToolTextFileWriter(outputDir, + s"${QualOutputWriter.LOGFILE_NAME}_operatorsStats.csv", + "All Operators CSV Report", hadoopConf) + try { + val headersAndSizes = QualOutputWriter.getAllOperatorsHeaderStrings + csvFileWriter.write(QualOutputWriter.constructOutputRowFromMap(headersAndSizes, + QualOutputWriter.CSV_DELIMITER)) + sums.foreach { sum => + QualOutputWriter.constructAllOperatorsInfo(csvFileWriter, sum.planInfo, sum.appId, + QualOutputWriter.CSV_DELIMITER) + } + } finally { + csvFileWriter.close() + } + } + def writePerSqlCSVReport(sums: Seq[QualificationSummaryInfo], maxSQLDescLength: Int): Unit = { val csvFileWriter = new ToolTextFileWriter(outputDir, s"${QualOutputWriter.LOGFILE_NAME}_persql.csv", @@ -282,7 +301,7 @@ class QualOutputWriter(outputDir: String, reportReadSchema: Boolean, } else { ("", "") } - Seq(appIDCSVStr, sqlIDStr, StringUtils.reformatCSVString(info.exec), + Seq(appIDCSVStr, sqlIDStr, info.execRef.getOpNameCSV, StringUtils.reformatCSVString(info.expr), if (info.duration.isDefined) info.duration.get.toString else zeroDurationStr, info.nodeId.toString, @@ -445,6 +464,11 @@ object QualOutputWriter { val UNSUPPORTED_TASK_DURATION_STR = "Unsupported Task Duration" val SUPPORTED_SQL_TASK_DURATION_STR = "Supported SQL DF Task Duration" val LONGEST_SQL_DURATION_STR = "Longest SQL Duration" + val OPERATOR_TYPE = "Operator Type" + val OPERATOR_NAME = "Operator Name" + val COUNT = "Count" + val IS_SUPPORTED = "Supported" + val STAGES = "Stages" val EXEC_STR = "Exec Name" val EXPR_STR = "Expression Name" val EXEC_DURATION = "Exec Duration" @@ -838,6 +862,49 @@ object QualOutputWriter { mutable.LinkedHashMap(headersAndFields: _*) } + private def getAllOperatorsHeaderStrings: LinkedHashMap[String, Int] = { + val detailedHeaderAndFields = LinkedHashMap[String, Int]( + APP_ID_STR -> APP_ID_STR.size, + SQL_ID_STR -> SQL_ID_STR.size, + OPERATOR_TYPE -> OPERATOR_TYPE.size, + OPERATOR_NAME -> OPERATOR_NAME.size, + COUNT -> COUNT.size, + IS_SUPPORTED -> IS_SUPPORTED.size, + STAGES -> STAGES.size + ) + detailedHeaderAndFields + } + + private def constructAllOperatorsInfo( + csvWriter: ToolTextFileWriter, + planInfos: Seq[PlanInfo], + appId: String, + delimiter: String): Unit = { + // This method iterates on PlanInfo which are sorted by SqlID. It constructs the operators per + // SQLPlan and sumps it to the CSV as a bufferedString instead of writing one row at a time. + val appIDCSVStr = StringUtils.reformatCSVString(appId) + val supportedCSVStr = "true" + val unsupportedCSVStr = "false" + planInfos.foreach { planInfo => + val sqlIDCSVStr = planInfo.sqlID.toString + val allOpsCount = OperatorCounter(planInfo) + .getOpsCountSummary().sortBy(oInfo => (-oInfo.opData.count, oInfo.opData.opRef.getOpName)) + if (allOpsCount.nonEmpty) { + val planBuffer = allOpsCount.map { opInfo => + val supportFlag = if (opInfo.isSupported) supportedCSVStr else unsupportedCSVStr + val stageStr = StringUtils.reformatCSVString(opInfo.opData.stages.mkString(":")) + s"$appIDCSVStr$delimiter" + + s"$sqlIDCSVStr$delimiter" + + s"${opInfo.opData.opRef.getOpTypeCSV}$delimiter" + + s"${opInfo.opData.opRef.getOpNameCSV}$delimiter${opInfo.opData.count}$delimiter" + + s"$supportFlag$delimiter" + + s"$stageStr" + } + csvWriter.write(s"${planBuffer.mkString("\n")}\n") + } + } + } + def constructClusterInfo( sumInfo: QualificationSummaryInfo, headersAndSizes: LinkedHashMap[String, Int], @@ -940,7 +1007,7 @@ object QualOutputWriter { // need to remove the WholeStageCodegen wrappers since they aren't actual // execs that we want to get timings of execs.flatMap { e => - if (e.exec.contains("WholeStageCodegen")) { + if (e.isClusterNode) { e.children.getOrElse(Seq.empty) } else { e.children.getOrElse(Seq.empty) :+ e @@ -1048,7 +1115,7 @@ object QualOutputWriter { stageId.toString -> headersAndSizes(STAGE_ID_STR), reformatCSVFunc(unSupExecInfo.execId.toString) -> headersAndSizes(EXEC_ID), reformatCSVFunc(unSupExecInfo.finalOpType) -> headersAndSizes(UNSUPPORTED_TYPE), - reformatCSVFunc(unSupExecInfo.unsupportedOperator) -> headersAndSizes(UNSUPPORTED_OPERATOR), + unSupExecInfo.unsupportedOperatorCSVFormat -> headersAndSizes(UNSUPPORTED_OPERATOR), reformatCSVFunc(unSupExecInfo.details) -> headersAndSizes(DETAILS), stageAppDuration.toString -> headersAndSizes(STAGE_WALLCLOCK_DUR_STR), appDuration.toString -> headersAndSizes(APP_DUR_STR), diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala index 49d70b80e..fa59e8b3e 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala @@ -255,6 +255,7 @@ class Qualification(outputPath: String, numRows: Int, hadoopConf: Configuration, qWriter.writeExecReport(allAppsSum) qWriter.writeStageReport(allAppsSum, order) qWriter.writeUnsupportedOpsSummaryCSVReport(allAppsSum) + qWriter.writeAllOpsSummaryCSVReport(allAppsSum) val appStatusResult = generateStatusResults(appStatusReporter.asScala.values.toSeq) logOutputPath() qWriter.writeStatusReport(appStatusResult, order) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala index f8d4d7703..b90917cd8 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala @@ -378,11 +378,11 @@ abstract class AppBase( allMetaWithSchema.foreach { plan => val meta = plan.metadata val readSchema = ReadParser.formatSchemaStr(meta.getOrElse("ReadSchema", "")) - val scanNode = allNodes.filter(node => { + val scanNode = allNodes.filter(ReadParser.isScanNode(_)).filter(node => { // Get ReadSchema of each Node and sanitize it for comparison val trimmedNode = AppBase.trimSchema(ReadParser.parseReadNode(node).schema) readSchema.contains(trimmedNode) - }).filter(ReadParser.isScanNode(_)) + }) // If the ReadSchema is empty or if it is PhotonScan, then we don't need to // add it to the dataSourceInfo diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala index fc276a064..021337495 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala @@ -401,8 +401,6 @@ object ExecHelper { } } -case class UnsupportedExpr(exprName: String, unsupportedReason: String) - object MlOps { val sparkml = "spark.ml." val xgBoost = "spark.XGBoost" diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index 887075c8c..1ef8b7315 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -281,7 +281,7 @@ class QualificationAppInfo( // need to remove the WholeStageCodegen wrappers since they aren't actual // execs that we want to get timings of execs.flatMap { e => - if (e.exec.contains("WholeStageCodegen")) { + if (e.isClusterNode) { e.children.getOrElse(Seq.empty) } else { e.children.getOrElse(Seq.empty) :+ e @@ -573,7 +573,7 @@ class QualificationAppInfo( val unSupportedExecs = planInfos.flatMap { p => // WholeStageCodeGen is excluded from the result. val topLevelExecs = p.execInfo.filterNot(_.isSupported).filterNot( - x => x.exec.startsWith("WholeStage")) + x => x.isClusterNode) val childrenExecs = p.execInfo.flatMap { e => e.children.map(x => x.filterNot(_.isSupported)) }.flatten @@ -581,9 +581,8 @@ class QualificationAppInfo( }.map(_.exec).toSet.mkString(";").trim.replaceAll("\n", "").replace(",", ":") // Get all the unsupported Expressions from the plan - val unSupportedExprs = origPlanInfos.map(_.execInfo.flatMap( - _.unsupportedExprs.map(_.exprName))).flatten.filter(_.nonEmpty).toSet.mkString(";") - .trim.replaceAll("\n", "").replace(",", ":") + val unSupportedExprs = origPlanInfos.flatMap(p => p.getUnsupportedExpressions) + .map(s => s.getOpName).toSet.mkString(";").trim.replaceAll("\n", "").replace(",", ":") // TODO - this is not correct as this is using the straight stage wall // clock time and hasn't been adjusted to the app duration wall clock diff --git a/core/src/test/resources/QualificationExpectations/db_sim_test_expectation.csv b/core/src/test/resources/QualificationExpectations/db_sim_test_expectation.csv index 5c618e929..35d5d6f1b 100644 --- a/core/src/test/resources/QualificationExpectations/db_sim_test_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/db_sim_test_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,SQL Stage Durations Sum,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly),Total Core Seconds -"Spark shell","local-1623876083964",119353,1417661,133857,92667,91.14,"","","","","","",119903,143821,14504,316964,1100697,false,"Scan;SerializeFromObject","",30,1599 +"Spark shell","local-1623876083964",119353,1417661,133857,92667,91.14,"","","","","","",119903,143821,14504,316964,1100697,false,"Scan unknown;SerializeFromObject","",30,1599 diff --git a/core/src/test/resources/QualificationExpectations/directory_test_expectation.csv b/core/src/test/resources/QualificationExpectations/directory_test_expectation.csv index 5c618e929..35d5d6f1b 100644 --- a/core/src/test/resources/QualificationExpectations/directory_test_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/directory_test_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,SQL Stage Durations Sum,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly),Total Core Seconds -"Spark shell","local-1623876083964",119353,1417661,133857,92667,91.14,"","","","","","",119903,143821,14504,316964,1100697,false,"Scan;SerializeFromObject","",30,1599 +"Spark shell","local-1623876083964",119353,1417661,133857,92667,91.14,"","","","","","",119903,143821,14504,316964,1100697,false,"Scan unknown;SerializeFromObject","",30,1599 diff --git a/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv b/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv index df554c136..50e8986ba 100644 --- a/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,SQL Stage Durations Sum,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly),Total Core Seconds -"Spark shell","app-20211019113801-0001",2942,19894,571967,2814,28.41,"","JDBC[*]","","","","",1812,2883,569025,859,19035,false,"CollectLimit;Scan JDBCRelation(TBLS) [numPartitions=1];Execute CreateViewCommand","",30,9110 +"Spark shell","app-20211019113801-0001",2942,19894,571967,2814,28.41,"","JDBC[*]","","","","",1812,2883,569025,859,19035,false,"CollectLimit;Scan jdbc;Execute CreateViewCommand","",30,9110 diff --git a/core/src/test/resources/QualificationExpectations/qual_test_missing_sql_end_expectation.csv b/core/src/test/resources/QualificationExpectations/qual_test_missing_sql_end_expectation.csv index 2561e6ca5..428f89654 100644 --- a/core/src/test/resources/QualificationExpectations/qual_test_missing_sql_end_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/qual_test_missing_sql_end_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,SQL Stage Durations Sum,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly),Total Core Seconds -"Rapids Spark Profiling Tool Unit Tests","local-1622561780883",0,40448,7673,0,55.94,"","","","","","",0,5000,7673,8096,32352,false,"Scan;SerializeFromObject","",30,82 +"Rapids Spark Profiling Tool Unit Tests","local-1622561780883",0,40448,7673,0,55.94,"","","","","","",0,5000,7673,8096,32352,false,"Scan unknown;SerializeFromObject","",30,82 diff --git a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv index a3a847ad0..62421d9bf 100644 --- a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv @@ -1,5 +1,5 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,SQL Stage Durations Sum,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly),Total Core Seconds -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",11600,132257,16319,9868,37.7,"","","JSON","","","",7143,13770,4719,19744,112513,false,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",1,186 +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",11600,132257,16319,9868,37.7,"","","JSON","","","",7143,13770,4719,19744,112513,false,"SerializeFromObject;Scan unknown;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements","",1,186 "Spark shell","local-1651187225439",224,180,355637,74,87.88,"","JSON[string:bigint:int]","","","","",498,228,355101,120,60,false,"SerializeFromObject;CollectLimit;DeserializeToObject;Scan json;Filter;MapElements","",1,2834 "Spark shell","local-1651188809790",347,283,166215,14,81.18,"","JSON[string:bigint:int]","","","","UDF",715,318,165572,271,12,false,"CollectLimit;Scan json;Project","UDF",1,1318 "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",1156,4666,6240,1,46.27,"","JSON[string:bigint:int]","JSON","","","UDF",1209,1130,5809,4661,5,false,"Execute InsertIntoHadoopFsRelationCommand json;LocalTableScan;Project;Scan json;Execute CreateViewCommand","UDF",1,64 diff --git a/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv b/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv index f7737d508..79cb8dede 100644 --- a/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,SQL Stage Durations Sum,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly),Total Core Seconds -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",395,14353,4872,164,62.67,"","","JSON","","","",1306,794,4477,8376,5977,true,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",30,49 +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",395,14353,4872,164,62.67,"","","JSON","","","",1306,794,4477,8376,5977,true,"SerializeFromObject;Scan unknown;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements","",30,49 diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala index 6fc9357cf..e31064851 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala @@ -276,12 +276,12 @@ class SQLPlanParserSuite extends BasePlanParserSuite { val parquet = allExecInfo.filter(_.exec.contains("Scan parquet")) val text = allExecInfo.filter(_.exec.contains("Scan text")) val csv = allExecInfo.filter(_.exec.contains("Scan csv")) - assertSizeAndNotSupported(2, json.toSeq) - assertSizeAndNotSupported(1, text.toSeq) + assertSizeAndNotSupported(2, json) + assertSizeAndNotSupported(1, text) for (t <- Seq(parquet, csv)) { - assertSizeAndSupported(1, t.toSeq) + assertSizeAndSupported(1, t) } - assertSizeAndSupported(2, orc.toSeq) + assertSizeAndSupported(2, orc) } test("BatchScan") { @@ -299,10 +299,10 @@ class SQLPlanParserSuite extends BasePlanParserSuite { val orc = allExecInfo.filter(_.exec.contains("BatchScan orc")) val parquet = allExecInfo.filter(_.exec.contains("BatchScan parquet")) val csv = allExecInfo.filter(_.exec.contains("BatchScan csv")) - assertSizeAndNotSupported(3, json.toSeq) - assertSizeAndSupported(1, csv.toSeq) + assertSizeAndNotSupported(3, json) + assertSizeAndSupported(1, csv) for (t <- Seq(orc, parquet)) { - assertSizeAndSupported(2, t.toSeq) + assertSizeAndSupported(2, t) } } @@ -1224,7 +1224,8 @@ class SQLPlanParserSuite extends BasePlanParserSuite { val projects = allExecInfo.filter(_.exec.contains("Project")) assertSizeAndNotSupported(1, projects) val expectedExprss = Seq("parse_url_ref", "parse_url_userinfo") - projects(0).unsupportedExprs.map(_.exprName) should contain theSameElementsAs expectedExprss + projects(0) + .unsupportedExprs.map(_.getOpName) should contain theSameElementsAs expectedExprss } } } @@ -1802,7 +1803,7 @@ class SQLPlanParserSuite extends BasePlanParserSuite { val allExecInfo = getAllExecsFromPlan(parsedPlans.toSeq) val windowExecNotSupportedExprs = allExecInfo.filter( _.exec.contains(windowGroupLimitExecCmd)).flatMap(x => x.unsupportedExprs) - windowExecNotSupportedExprs.head.exprName shouldEqual "row_number" + windowExecNotSupportedExprs.head.getOpName shouldEqual "row_number" windowExecNotSupportedExprs.head.unsupportedReason shouldEqual "Ranking function row_number is not supported in WindowGroupLimitExec" val windowGroupLimitExecs = allExecInfo.filter(_.exec.contains(windowGroupLimitExecCmd)) diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala index 3e9988c41..6b7821d3f 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala @@ -241,6 +241,6 @@ class PluginTypeCheckerSuite extends FunSuite with Logging { val expressions = SQLPlanParser.parseAggregateExpressions(hashAggregateExpr) assert(expressions.contains("decimalsum")) val notSupportedExprs = checker.getNotSupportedExprs(expressions) - assert(notSupportedExprs.find(_.exprName == "decimalsum").isEmpty) + assert(notSupportedExprs.find(_.getOpName == "decimalsum").isEmpty) } } diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala index 57b6f1cb3..03943d463 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala @@ -1066,7 +1066,7 @@ class QualificationSuite extends BaseTestSuite { val stdOutunsupportedExecs = stdOutValues(stdOutValues.length - 3) // index of unsupportedExprs val stdOutunsupportedExprs = stdOutValues(stdOutValues.length - 2) - val expectedstdOutExecs = "Scan;Filter;SerializeF..." + val expectedstdOutExecs = "Scan unknown;Filter;Se..." assert(stdOutunsupportedExecs == expectedstdOutExecs) // Exec value is Scan;Filter;SerializeFromObject and UNSUPPORTED_EXECS_MAX_SIZE is 25 val expectedStdOutExecsMaxLength = 25 @@ -1096,7 +1096,7 @@ class QualificationSuite extends BaseTestSuite { val rows = outputActual.collect() assert(rows.size == 1) - val expectedExecs = "Scan;Filter;SerializeFromObject" // Unsupported Execs + val expectedExecs = "Scan unknown;Filter;SerializeFromObject" // Unsupported Execs val expectedExprs = "hex" //Unsupported Exprs val unsupportedExecs = outputActual.select(QualOutputWriter.UNSUPPORTED_EXECS).first.getString(0) diff --git a/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py b/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py index 320615b7c..1bdbd76b7 100644 --- a/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py +++ b/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py @@ -108,7 +108,12 @@ 'sqlOp_CustomShuffleReader', 'sqlOp_DeserializeToObject', 'sqlOp_Exchange', - 'sqlOp_Execute InsertIntoHadoopFsRelationCommand', + 'sqlOp_Execute InsertIntoHadoopFsRelationCommand csv', + 'sqlOp_Execute InsertIntoHadoopFsRelationCommand parquet', + 'sqlOp_Execute InsertIntoHadoopFsRelationCommand orc', + 'sqlOp_Execute InsertIntoHadoopFsRelationCommand json', + 'sqlOp_Execute InsertIntoHadoopFsRelationCommand text', + 'sqlOp_Execute InsertIntoHadoopFsRelationCommand unknown', 'sqlOp_Expand', 'sqlOp_Filter', 'sqlOp_Generate', @@ -125,15 +130,17 @@ 'sqlOp_Project', 'sqlOp_ReusedSort', 'sqlOp_RunningWindowFunction', - 'sqlOp_Scan csv ', + 'sqlOp_Scan csv', 'sqlOp_Scan ExistingRDD Delta Table Checkpoint', 'sqlOp_Scan ExistingRDD Delta Table State', - 'sqlOp_Scan JDBCRelation', - 'sqlOp_Scan json ', + 'sqlOp_Scan ExistingRDD', + 'sqlOp_Scan jdbc', + 'sqlOp_Scan json', 'sqlOp_Scan OneRowRelation', - 'sqlOp_Scan orc ', - 'sqlOp_Scan parquet ', - 'sqlOp_Scan text ', + 'sqlOp_Scan orc', + 'sqlOp_Scan parquet', + 'sqlOp_Scan text', + 'sqlOp_Scan unknown', 'sqlOp_SerializeFromObject', 'sqlOp_Sort', 'sqlOp_SortAggregate', @@ -481,8 +488,8 @@ def combine_tables(table_name: str) -> pd.DataFrame: 'Scan DeltaCDFRelation', 'Scan ExistingRDD Delta Table Checkpoint', 'Scan ExistingRDD Delta Table State', - 'Scan JDBCRelation', - 'Scan parquet ', # trailing space is also in default sql op name + 'Scan jdbc', + 'Scan parquet', # GPU 'GpuScan parquet', ]