Skip to content

Commit

Permalink
Report all operators in the output file (#1444)
Browse files Browse the repository at this point in the history
* first iterations all the UTs pass

Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>

* running end-to-end

Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>

* cleaned-up exec parsers

Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>

* update documentation and fix

Signed-off-by: Niranjan Artal <[email protected]>

* qualx related changes

* update qualx changes

* Fix for Scan OneRowRelation

Signed-off-by: Niranjan Artal <[email protected]>

* addressed review comments

---------

Signed-off-by: Niranjan Artal <[email protected]>
Co-authored-by: Ahmed Hussein (amahussein) <[email protected]>
  • Loading branch information
nartal1 and amahussein authored Dec 4, 2024
1 parent 091af08 commit 993bc8f
Show file tree
Hide file tree
Showing 40 changed files with 574 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ case class BatchScanExecParser(
checker: PluginTypeChecker,
sqlID: Long,
app: AppBase) extends ExecParser with Logging {

val fullExecName = "BatchScanExec"
val nodeName = "BatchScan"
val fullExecName = nodeName + "Exec"

override def parse: ExecInfo = {
val accumId = node.metrics.find(_.name == "scan time").map(_.accumulatorId)
Expand All @@ -39,8 +39,26 @@ case class BatchScanExecParser(
val speedupFactor = checker.getSpeedupFactor(fullExecName)
val overallSpeedup = Math.max((speedupFactor * score), 1.0)

// TODO - add in parsing expressions - average speedup across?
ExecInfo(node, sqlID, s"${node.name} ${readInfo.format}", s"Format: ${readInfo.format}",
overallSpeedup, maxDuration, node.id, score > 0, None)
// 1- Set the exec name to be the batchScan + format
// 2- If the format cannot be found, then put the entire node description to make it easy to
// troubleshoot by reading the output files.
val readFormat = readInfo.getReadFormatLC
val execExpression = if (readInfo.hasUnknownFormat) {
node.desc
} else {
s"Format: $readFormat"
}

ExecInfo.createExecNoNode(
sqlID = sqlID,
exec = s"$nodeName $readFormat",
expr = execExpression,
speedupFactor = overallSpeedup,
duration = maxDuration,
nodeId = node.id,
opType = OpTypes.ReadExec,
isSupported = score > 0.0,
children = None,
expressions = Seq.empty)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ case class BroadcastExchangeExecParser(
} else {
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(node, sqlID, node.name, "", filterSpeedupFactor,
duration, node.id, isSupported, None)
duration, node.id, isSupported, children = None, expressions = Seq.empty)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ case class BroadcastHashJoinExecParser(
override def parse: ExecInfo = {
// BroadcastHashJoin doesn't have duration
val duration = None
val exprString = node.desc.replaceFirst("BroadcastHashJoin ", "")
val exprString = node.desc.replaceFirst("^BroadcastHashJoin\\s*", "")
val (expressions, supportedJoinType) = SQLPlanParser.parseEquijoinsExpressions(exprString)
val notSupportedExprs = expressions.filterNot(expr => checker.isExprSupported(expr))
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) &&
Expand All @@ -39,7 +39,7 @@ case class BroadcastHashJoinExecParser(
} else {
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported,
children = None, expressions = expressions)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ abstract class BroadcastNestedLoopJoinExecParserBase(

override def parse: ExecInfo = {
// BroadcastNestedLoopJoin doesn't have duration
val exprString = node.desc.replaceFirst("BroadcastNestedLoopJoin ", "")
val exprString = node.desc.replaceFirst("^BroadcastNestedLoopJoin\\s*", "")
val (buildSide, joinType) = extractBuildAndJoinTypes(exprString)
val (expressions, supportedJoinType) =
SQLPlanParser.parseNestedLoopJoinExpressions(exprString, buildSide, joinType)
Expand All @@ -51,8 +51,8 @@ abstract class BroadcastNestedLoopJoinExecParserBase(
} else {
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported,
children = None, expressions = expressions)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ case class DataWritingCommandExecParser(
val duration = None
val speedupFactor = checker.getSpeedupFactor(wStub.mappedExec)
val finalSpeedup = if (writeSupported) speedupFactor else 1
// TODO - add in parsing expressions - average speedup across?
// We do not want to parse the node description to avoid mistakenly marking the node as RDD/UDF
ExecInfo.createExecNoNode(sqlID, s"${node.name.trim} ${wStub.dataFormat.toLowerCase.trim}",
s"Format: ${wStub.dataFormat.toLowerCase.trim}",
finalSpeedup, duration, node.id, opType = OpTypes.WriteExec, writeSupported, None)
finalSpeedup, duration, node.id, opType = OpTypes.WriteExec, writeSupported,
children = None, expressions = Seq.empty)
}
}

Expand Down Expand Up @@ -175,4 +175,4 @@ object DataWritingCommandExecParser {
parsedString.split(",")(0) // return third parameter from the input string
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ class DLWriteWithFormatAndSchemaParser(node: SparkPlanGraphNode,
val finalSpeedupFactor = if (writeSupported) speedupFactor else 1.0

// execs like SaveIntoDataSourceCommand has prefix "Execute". So, we need to get rid of it.
val nodeName = node.name.replace("Execute ", "")
val nodeName = node.name.replaceFirst("Execute\\s*", "")
ExecInfo.createExecNoNode(sqlID, nodeName,
s"Format: $dataFormat", finalSpeedupFactor, None, node.id, OpTypes.WriteExec,
isSupported = writeSupported && isExecSupported, children = None)
isSupported = writeSupported && isExecSupported, children = None, expressions = Seq.empty)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.nvidia.spark.rapids.tool.planparser

import org.apache.spark.sql.rapids.tool.UnsupportedExpr
import com.nvidia.spark.rapids.tool.planparser.ops.UnsupportedExprOpRef

trait ExecParser {
def parse: ExecInfo
Expand All @@ -32,5 +32,6 @@ trait ExecParser {
* @param expressions Array of expression strings to evaluate for support.
* @return Empty Seq[UnsupportedExpr], indicating no unsupported expressions by default.
*/
def getUnsupportedExprReasonsForExec(expressions: Array[String]): Seq[UnsupportedExpr] = Seq.empty
def getUnsupportedExprReasonsForExec(
expressions: Array[String]): Seq[UnsupportedExprOpRef] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@ case class FileSourceScanExecParser(

// The node name for Scans is Scan <format> so here we hardcode
val fullExecName = "FileSourceScanExec"
// Matches the first alphaneumeric characters of a string after trimming leading/trailing
// white spaces.
val nodeNameRegeX = """^\s*(\w+).*""".r

override def parse: ExecInfo = {
// Remove trailing spaces from node name
// Example: Scan parquet . -> Scan parquet.
val nodeName = node.name.trim
val rddCheckRes = RDDCheckHelper.isDatasetOrRDDPlan(nodeName, node.desc)
if (rddCheckRes.nodeNameRDD) {
// This is a scanRDD. we do not need to parse it as a normal node.
// This is a scanRDD. We do not need to parse it as a normal node.
// cleanup the node name if possible:
val newNodeName = if (nodeName.contains("ExistingRDD")) {
val nodeNameLength = nodeName.indexOf("ExistingRDD") + "ExistingRDD".length
Expand All @@ -46,7 +49,7 @@ case class FileSourceScanExecParser(
nodeName
}
ExecInfo.createExecNoNode(sqlID, newNodeName, "", 1.0, duration = None,
node.id, OpTypes.ReadRDD, false, None)
node.id, OpTypes.ReadRDD, false, children = None, expressions = Seq.empty)
} else {
val accumId = node.metrics.find(_.name == "scan time").map(_.accumulatorId)
val maxDuration = SQLPlanParser.getTotalDuration(accumId, app)
Expand All @@ -57,14 +60,36 @@ case class FileSourceScanExecParser(
// Use the default parser
(fullExecName, ReadParser.parseReadNode(node))
}
// 1- Set the exec name to nodeLabel + format
// 2- If the format is not found, then put the entire node description to make it easy to
// troubleshoot by reading the output files.
val nodeLabel = nodeNameRegeX.findFirstMatchIn(nodeName) match {
case Some(m) => m.group(1)
// in case not found, use the full exec name
case None => execName
}
val readFormat = readInfo.getReadFormatLC
val exexExpr = if (readInfo.hasUnknownFormat) {
node.desc
} else {
s"Format: ${readFormat}"
}
val speedupFactor = checker.getSpeedupFactor(execName)
// don't use the isExecSupported because we have finer grain.
val score = ReadParser.calculateReadScoreRatio(readInfo, checker)
val overallSpeedup = Math.max(speedupFactor * score, 1.0)

// TODO - add in parsing expressions - average speedup across?
ExecInfo.createExecNoNode(sqlID, nodeName, "", overallSpeedup, maxDuration,
node.id, OpTypes.ReadExec, score > 0, None)
ExecInfo.createExecNoNode(
sqlID = sqlID,
exec = s"$nodeLabel $readFormat",
expr = exexExpr,
speedupFactor = overallSpeedup,
duration = maxDuration,
nodeId = node.id,
opType = OpTypes.ReadExec,
isSupported = score > 0,
children = None,
expressions = Seq.empty)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

package com.nvidia.spark.rapids.tool.planparser

import com.nvidia.spark.rapids.tool.planparser.ops.UnsupportedExprOpRef
import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.sql.execution.ui.SparkPlanGraphNode
import org.apache.spark.sql.rapids.tool.{AppBase, UnsupportedExpr}
import org.apache.spark.sql.rapids.tool.AppBase

class GenericExecParser(
val node: SparkPlanGraphNode,
Expand All @@ -46,7 +47,8 @@ class GenericExecParser(
(1.0, false)
}

createExecInfo(speedupFactor, isSupported, duration, notSupportedExprs)
createExecInfo(speedupFactor, isSupported, duration,
notSupportedExprs = notSupportedExprs, expressions = expressions)
}

protected def parseExpressions(): Array[String] = {
Expand All @@ -63,7 +65,7 @@ class GenericExecParser(
node.desc.replaceFirst(s"^${node.name}\\s*", "")
}

protected def getNotSupportedExprs(expressions: Array[String]): Seq[UnsupportedExpr] = {
protected def getNotSupportedExprs(expressions: Array[String]): Seq[UnsupportedExprOpRef] = {
checker.getNotSupportedExprs(expressions)
}

Expand All @@ -83,7 +85,8 @@ class GenericExecParser(
speedupFactor: Double,
isSupported: Boolean,
duration: Option[Long],
notSupportedExprs: Seq[UnsupportedExpr]
notSupportedExprs: Seq[UnsupportedExprOpRef],
expressions: Array[String]
): ExecInfo = {
ExecInfo(
node,
Expand All @@ -95,7 +98,8 @@ class GenericExecParser(
node.id,
isSupported,
None,
unsupportedExprs = notSupportedExprs
unsupportedExprs = notSupportedExprs,
expressions = expressions
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,21 @@ case class ReadMetaData(schema: String, location: String, format: String,
def pushedFilters: String = tags(ReadParser.METAFIELD_TAG_PUSHED_FILTERS)
def dataFilters: String = tags(ReadParser.METAFIELD_TAG_DATA_FILTERS)
def partitionFilters: String = tags(ReadParser.METAFIELD_TAG_PARTITION_FILTERS)

def hasUnknownFormat: Boolean = format.equals(ReadParser.UNKNOWN_METAFIELD)

/**
* Returns the read format in lowercase. This is used to be consistent.
* @return the lower case of the read format
*/
def getReadFormatLC: String = format.toLowerCase
}

object ReadParser extends Logging {
// It was found that some eventlogs could have "NativeScan" instead of "Scan"
val SCAN_NODE_PREFIXES = Seq("Scan", "NativeScan")
// Do not include OneRowRelation in the scan nodes, consider it as regular Exec
val SCAN_ONE_ROW_RELATION = "Scan OneRowRelation"
// DatasourceV2 node names that exactly match the following labels
val DATASOURCE_V2_NODE_EXACT_PREF = Set(
"BatchScan")
Expand All @@ -58,7 +68,7 @@ object ReadParser extends Logging {
)

def isScanNode(nodeName: String): Boolean = {
SCAN_NODE_PREFIXES.exists(nodeName.startsWith(_))
SCAN_NODE_PREFIXES.exists(nodeName.startsWith(_)) && !nodeName.startsWith(SCAN_ONE_ROW_RELATION)
}

def isScanNode(node: SparkPlanGraphNode): Boolean = {
Expand Down
Loading

0 comments on commit 993bc8f

Please sign in to comment.