Skip to content

Commit

Permalink
update documentation and fix
Browse files Browse the repository at this point in the history
Signed-off-by: Niranjan Artal <[email protected]>
  • Loading branch information
nartal1 committed Dec 3, 2024
1 parent d862838 commit 1063058
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ abstract class WholeStageExecParserBase(
nodeIdToStagesFunc: Long => Set[Int]) extends Logging {

val fullExecName = "WholeStageCodegenExec"
// Matches the first alphaneumeric characters of a string after trimming leading/trailing
// Matches the first alphanumeric characters of a string after trimming leading/trailing
// white spaces.
val nodeNameRegeX = """^\s*(\w+).*""".r

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ import java.util.concurrent.ConcurrentHashMap
import com.nvidia.spark.rapids.tool.planparser.OpTypes


/**
* A reference to an operator(either Exec operator or expression operator) in a Spark plan.
*
* @param value The name of the operator.
* @param opType The type of the operator (e.g., Exec, Expr).
*/
case class OpRef(override val value: String,
override val opType: OpTypes.OpType) extends OperatorRefBase(value, opType)

Expand All @@ -38,10 +44,20 @@ object OpRef {
initMap
}

/**
* Retrieves an `OpRef` for an expression operator.
* If the operator name already exists in the cache, it returns the existing `OpRef`.
* Otherwise, it creates a new `OpRef` with the given name and `OpTypes.Expr`.
*/
def fromExpr(name: String): OpRef = {
OP_NAMES.computeIfAbsent(name, OpRef.apply(_, OpTypes.Expr))
}

/**
* Retrieves an `OpRef` for an exec operator.
* If the operator name already exists in the cache, it returns the existing `OpRef`.
* Otherwise, it creates a new `OpRef` with the given name and `OpTypes.Exec`.
*/
def fromExec(name: String): OpRef = {
OP_NAMES.computeIfAbsent(name, OpRef.apply(_, OpTypes.Exec))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,55 @@ import scala.collection.mutable

import com.nvidia.spark.rapids.tool.planparser.{ExecInfo, PlanInfo}

/**
* `OperatorCounter` is responsible for counting the occurrences of execs and expressions
* in a given execution plan (`PlanInfo`). It maintains counts separately for supported and
* unsupported execs and expressions.
*
* @param planInfo The execution plan information to analyze.
*/
case class OperatorCounter(planInfo: PlanInfo) {

/**
* Represents data for an exec or expression, including its reference,
* occurrence count, and stages where it appears.
*
* @param opRef The operator reference.
* @param count The number of times the operator appears.
* @param stages The set of stages where the operator appears.
*/
case class OperatorData(
opRef: OperatorRefBase,
var count: Int = 0,
var stages: Set[Int] = Set())

// Summarizes the count information for an exec or expression, including whether it is supported.
case class OperatorCountSummary(
opData: OperatorData,
isSupported: Boolean)

private val supportedMap: mutable.Map[OperatorRefBase, OperatorData] = mutable.Map()
private val unsupportedMap: mutable.Map[OperatorRefBase, OperatorData] = mutable.Map()

// Returns a sequence of `OperatorCountSummary`, combining both supported and
// unsupported operators.
def getOpsCountSummary(): Seq[OperatorCountSummary] = {
supportedMap.values.map(OperatorCountSummary(_, isSupported = true)).toSeq ++
unsupportedMap.values.map(OperatorCountSummary(_, isSupported = false)).toSeq
}


// Updates the operator data in the given map (supported or unsupported).
// Increments the count and updates the stages where the operator appears.
private def updateOpRefEntry(opRef: OperatorRefBase, stages: Set[Int],
targetMap: mutable.Map[OperatorRefBase, OperatorData]): Unit = {
val operatorData = targetMap.getOrElseUpdate(opRef, OperatorData(opRef))
operatorData.count += 1
operatorData.stages ++= stages
}

// Processes an `ExecInfo` node to update exec and expression counts.
// Separates supported and unsupported execs and expressions into their respective maps.
private def processExecInfo(execInfo: ExecInfo): Unit = {
val opMap = execInfo.isSupported match {
case true => supportedMap
Expand All @@ -62,6 +86,7 @@ case class OperatorCounter(planInfo: PlanInfo) {
}
}

// Counts the execs and expressions in the execution plan.
private def countOperators(): Unit = {
planInfo.execInfo.foreach { exec =>
exec.isClusterNode match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,17 @@ import com.nvidia.spark.rapids.tool.planparser.OpTypes

import org.apache.spark.sql.rapids.tool.util.StringUtils

/**
* Base class representing a reference to an operator (either exec operator or expression operator).
* It provides methods to retrieve the operator's name and type in both raw and
* CSV-friendly formats.
*
* @param value The name of the operator.
* @param opType The type of the operator (e.g., Exec, Expr).
*/

class OperatorRefBase(val value: String, val opType: OpTypes.OpType) extends OperatorRefTrait {
// Preformatted values for CSV output to avoid reformatting multiple times.
val csvValue: String = StringUtils.reformatCSVString(value)
val csvOpType: String = StringUtils.reformatCSVString(opType.toString)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,26 @@

package com.nvidia.spark.rapids.tool.planparser.ops


/**
* Represents a reference to an unsupported expression operator.
* Extends `OperatorRefBase` and includes a reason why the expression is unsupported.
*
* @param opRef The underlying `OpRef` for the expression.
* @param unsupportedReason A string describing why the expression is unsupported.
*/
case class UnsupportedExprOpRef(opRef: OpRef,
unsupportedReason: String) extends OperatorRefBase(opRef.value, opRef.opType)

// Provides a factory method to create an instance from an expression name and unsupported reason.
object UnsupportedExprOpRef {
/**
* Creates an `UnsupportedExprOpRef` for the given expression name and unsupported reason.
*
* @param exprName The name of the unsupported expression.
* @param unsupportedReason A string describing why the expression is unsupported.
* @return An instance of `UnsupportedExprOpRef`.
*/
def apply(exprName: String, unsupportedReason: String): UnsupportedExprOpRef = {
val opRef = OpRef.fromExpr(exprName)
UnsupportedExprOpRef(opRef, unsupportedReason)
Expand Down

0 comments on commit 1063058

Please sign in to comment.