Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report all operators in the output file #1444

Merged
merged 8 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ case class BatchScanExecParser(
checker: PluginTypeChecker,
sqlID: Long,
app: AppBase) extends ExecParser with Logging {

val fullExecName = "BatchScanExec"
val nodeName = "BatchScan"
val fullExecName = nodeName + "Exec"

override def parse: ExecInfo = {
val accumId = node.metrics.find(_.name == "scan time").map(_.accumulatorId)
Expand All @@ -39,8 +39,26 @@ case class BatchScanExecParser(
val speedupFactor = checker.getSpeedupFactor(fullExecName)
val overallSpeedup = Math.max((speedupFactor * score), 1.0)

// TODO - add in parsing expressions - average speedup across?
ExecInfo(node, sqlID, s"${node.name} ${readInfo.format}", s"Format: ${readInfo.format}",
overallSpeedup, maxDuration, node.id, score > 0, None)
// 1- Set the exec name to be the batchScan + format
// 2- If the format cannot be found, then put the entire node description to make it easy to
// troubleshoot by reading the output files.
val readFormat = readInfo.getReadFormatLC
val execExpression = if (readInfo.hasUnknownFormat) {
node.desc
} else {
s"Format: $readFormat"
}

ExecInfo.createExecNoNode(
sqlID = sqlID,
exec = s"$nodeName $readFormat",
expr = execExpression,
speedupFactor = overallSpeedup,
duration = maxDuration,
nodeId = node.id,
opType = OpTypes.ReadExec,
isSupported = score > 0.0,
children = None,
expressions = Seq.empty)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ case class BroadcastExchangeExecParser(
} else {
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(node, sqlID, node.name, "", filterSpeedupFactor,
duration, node.id, isSupported, None)
duration, node.id, isSupported, children = None, expressions = Seq.empty)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ case class BroadcastHashJoinExecParser(
override def parse: ExecInfo = {
// BroadcastHashJoin doesn't have duration
val duration = None
val exprString = node.desc.replaceFirst("BroadcastHashJoin ", "")
val exprString = node.desc.replaceFirst("^BroadcastHashJoin\\s*", "")
val (expressions, supportedJoinType) = SQLPlanParser.parseEquijoinsExpressions(exprString)
val notSupportedExprs = expressions.filterNot(expr => checker.isExprSupported(expr))
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) &&
Expand All @@ -39,7 +39,7 @@ case class BroadcastHashJoinExecParser(
} else {
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported,
children = None, expressions = expressions)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ abstract class BroadcastNestedLoopJoinExecParserBase(

override def parse: ExecInfo = {
// BroadcastNestedLoopJoin doesn't have duration
val exprString = node.desc.replaceFirst("BroadcastNestedLoopJoin ", "")
val exprString = node.desc.replaceFirst("^BroadcastNestedLoopJoin\\s*", "")
val (buildSide, joinType) = extractBuildAndJoinTypes(exprString)
val (expressions, supportedJoinType) =
SQLPlanParser.parseNestedLoopJoinExpressions(exprString, buildSide, joinType)
Expand All @@ -51,8 +51,8 @@ abstract class BroadcastNestedLoopJoinExecParserBase(
} else {
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported,
children = None, expressions = expressions)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ case class DataWritingCommandExecParser(
val duration = None
val speedupFactor = checker.getSpeedupFactor(wStub.mappedExec)
val finalSpeedup = if (writeSupported) speedupFactor else 1
// TODO - add in parsing expressions - average speedup across?
// We do not want to parse the node description to avoid mistakenly marking the node as RDD/UDF
ExecInfo.createExecNoNode(sqlID, s"${node.name.trim} ${wStub.dataFormat.toLowerCase.trim}",
s"Format: ${wStub.dataFormat.toLowerCase.trim}",
finalSpeedup, duration, node.id, opType = OpTypes.WriteExec, writeSupported, None)
finalSpeedup, duration, node.id, opType = OpTypes.WriteExec, writeSupported,
children = None, expressions = Seq.empty)
}
}

Expand Down Expand Up @@ -175,4 +175,4 @@ object DataWritingCommandExecParser {
parsedString.split(",")(0) // return third parameter from the input string
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ class DLWriteWithFormatAndSchemaParser(node: SparkPlanGraphNode,
val finalSpeedupFactor = if (writeSupported) speedupFactor else 1.0

// execs like SaveIntoDataSourceCommand has prefix "Execute". So, we need to get rid of it.
val nodeName = node.name.replace("Execute ", "")
val nodeName = node.name.replaceFirst("Execute\\s*", "")
ExecInfo.createExecNoNode(sqlID, nodeName,
s"Format: $dataFormat", finalSpeedupFactor, None, node.id, OpTypes.WriteExec,
isSupported = writeSupported && isExecSupported, children = None)
isSupported = writeSupported && isExecSupported, children = None, expressions = Seq.empty)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.nvidia.spark.rapids.tool.planparser

import org.apache.spark.sql.rapids.tool.UnsupportedExpr
import com.nvidia.spark.rapids.tool.planparser.ops.UnsupportedExprOpRef

trait ExecParser {
def parse: ExecInfo
Expand All @@ -32,5 +32,6 @@ trait ExecParser {
* @param expressions Array of expression strings to evaluate for support.
* @return Empty Seq[UnsupportedExpr], indicating no unsupported expressions by default.
*/
def getUnsupportedExprReasonsForExec(expressions: Array[String]): Seq[UnsupportedExpr] = Seq.empty
def getUnsupportedExprReasonsForExec(
expressions: Array[String]): Seq[UnsupportedExprOpRef] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@ case class FileSourceScanExecParser(

// The node name for Scans is Scan <format> so here we hardcode
val fullExecName = "FileSourceScanExec"
// Matches the first alphaneumeric characters of a string after trimming leading/trailing
// white spaces.
val nodeNameRegeX = """^\s*(\w+).*""".r

override def parse: ExecInfo = {
// Remove trailing spaces from node name
// Example: Scan parquet . -> Scan parquet.
val nodeName = node.name.trim
val rddCheckRes = RDDCheckHelper.isDatasetOrRDDPlan(nodeName, node.desc)
if (rddCheckRes.nodeNameRDD) {
// This is a scanRDD. we do not need to parse it as a normal node.
// This is a scanRDD. We do not need to parse it as a normal node.
// cleanup the node name if possible:
val newNodeName = if (nodeName.contains("ExistingRDD")) {
val nodeNameLength = nodeName.indexOf("ExistingRDD") + "ExistingRDD".length
Expand All @@ -46,7 +49,7 @@ case class FileSourceScanExecParser(
nodeName
}
ExecInfo.createExecNoNode(sqlID, newNodeName, "", 1.0, duration = None,
node.id, OpTypes.ReadRDD, false, None)
node.id, OpTypes.ReadRDD, false, children = None, expressions = Seq.empty)
} else {
val accumId = node.metrics.find(_.name == "scan time").map(_.accumulatorId)
val maxDuration = SQLPlanParser.getTotalDuration(accumId, app)
Expand All @@ -57,14 +60,36 @@ case class FileSourceScanExecParser(
// Use the default parser
(fullExecName, ReadParser.parseReadNode(node))
}
// 1- Set the exec name to nodeLabel + format
// 2- If the format is not found, then put the entire node description to make it easy to
// troubleshoot by reading the output files.
val nodeLabel = nodeNameRegeX.findFirstMatchIn(nodeName) match {
case Some(m) => m.group(1)
// in case not found, use the full exec name
case None => execName
}
val readFormat = readInfo.getReadFormatLC
val exexExpr = if (readInfo.hasUnknownFormat) {
node.desc
} else {
s"Format: ${readFormat}"
}
val speedupFactor = checker.getSpeedupFactor(execName)
// don't use the isExecSupported because we have finer grain.
val score = ReadParser.calculateReadScoreRatio(readInfo, checker)
val overallSpeedup = Math.max(speedupFactor * score, 1.0)

// TODO - add in parsing expressions - average speedup across?
ExecInfo.createExecNoNode(sqlID, nodeName, "", overallSpeedup, maxDuration,
node.id, OpTypes.ReadExec, score > 0, None)
ExecInfo.createExecNoNode(
sqlID = sqlID,
exec = s"$nodeLabel $readFormat",
expr = exexExpr,
speedupFactor = overallSpeedup,
duration = maxDuration,
nodeId = node.id,
opType = OpTypes.ReadExec,
isSupported = score > 0,
children = None,
expressions = Seq.empty)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

package com.nvidia.spark.rapids.tool.planparser

import com.nvidia.spark.rapids.tool.planparser.ops.UnsupportedExprOpRef
import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.sql.execution.ui.SparkPlanGraphNode
import org.apache.spark.sql.rapids.tool.{AppBase, UnsupportedExpr}
import org.apache.spark.sql.rapids.tool.AppBase

class GenericExecParser(
val node: SparkPlanGraphNode,
Expand All @@ -46,7 +47,8 @@ class GenericExecParser(
(1.0, false)
}

createExecInfo(speedupFactor, isSupported, duration, notSupportedExprs)
createExecInfo(speedupFactor, isSupported, duration,
notSupportedExprs = notSupportedExprs, expressions = expressions)
}

protected def parseExpressions(): Array[String] = {
Expand All @@ -63,7 +65,7 @@ class GenericExecParser(
node.desc.replaceFirst(s"^${node.name}\\s*", "")
}

protected def getNotSupportedExprs(expressions: Array[String]): Seq[UnsupportedExpr] = {
protected def getNotSupportedExprs(expressions: Array[String]): Seq[UnsupportedExprOpRef] = {
checker.getNotSupportedExprs(expressions)
}

Expand All @@ -83,7 +85,8 @@ class GenericExecParser(
speedupFactor: Double,
isSupported: Boolean,
duration: Option[Long],
notSupportedExprs: Seq[UnsupportedExpr]
notSupportedExprs: Seq[UnsupportedExprOpRef],
expressions: Array[String]
): ExecInfo = {
ExecInfo(
node,
Expand All @@ -95,7 +98,8 @@ class GenericExecParser(
node.id,
isSupported,
None,
unsupportedExprs = notSupportedExprs
unsupportedExprs = notSupportedExprs,
expressions = expressions
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,21 @@ case class ReadMetaData(schema: String, location: String, format: String,
def pushedFilters: String = tags(ReadParser.METAFIELD_TAG_PUSHED_FILTERS)
def dataFilters: String = tags(ReadParser.METAFIELD_TAG_DATA_FILTERS)
def partitionFilters: String = tags(ReadParser.METAFIELD_TAG_PARTITION_FILTERS)

def hasUnknownFormat: Boolean = format.equals(ReadParser.UNKNOWN_METAFIELD)

/**
* Returns the read format in lowercase. This is used to be consistent.
* @return the lower case of the read format
*/
def getReadFormatLC: String = format.toLowerCase
}

object ReadParser extends Logging {
// It was found that some eventlogs could have "NativeScan" instead of "Scan"
val SCAN_NODE_PREFIXES = Seq("Scan", "NativeScan")
// Do not include OneRowRelation in the scan nodes, consider it as regular Exec
val SCAN_ONE_ROW_RELATION = "Scan OneRowRelation"
// DatasourceV2 node names that exactly match the following labels
val DATASOURCE_V2_NODE_EXACT_PREF = Set(
"BatchScan")
Expand All @@ -58,7 +68,7 @@ object ReadParser extends Logging {
)

def isScanNode(nodeName: String): Boolean = {
SCAN_NODE_PREFIXES.exists(nodeName.startsWith(_))
SCAN_NODE_PREFIXES.exists(nodeName.startsWith(_)) && !nodeName.startsWith(SCAN_ONE_ROW_RELATION)
}

def isScanNode(node: SparkPlanGraphNode): Boolean = {
Expand Down
Loading
Loading