From 106305837b97b3a44b09c4bde87692c062931669 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Tue, 3 Dec 2024 13:28:10 -0800 Subject: [PATCH] update documentation and fix Signed-off-by: Niranjan Artal --- .../planparser/WholeStageExecParser.scala | 2 +- .../rapids/tool/planparser/ops/OpRef.scala | 16 ++++++++++++ .../tool/planparser/ops/OperatorCounter.scala | 25 +++++++++++++++++++ .../tool/planparser/ops/OperatorRefBase.scala | 10 ++++++++ .../planparser/ops/UnsupportedExprOpRef.scala | 16 ++++++++++++ 5 files changed, 68 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala index d0ff5c4ff..5b8730938 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala @@ -31,7 +31,7 @@ abstract class WholeStageExecParserBase( nodeIdToStagesFunc: Long => Set[Int]) extends Logging { val fullExecName = "WholeStageCodegenExec" - // Matches the first alphaneumeric characters of a string after trimming leading/trailing + // Matches the first alphanumeric characters of a string after trimming leading/trailing // white spaces. val nodeNameRegeX = """^\s*(\w+).*""".r diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OpRef.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OpRef.scala index 18c6613a9..845efa696 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OpRef.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OpRef.scala @@ -21,6 +21,12 @@ import java.util.concurrent.ConcurrentHashMap import com.nvidia.spark.rapids.tool.planparser.OpTypes +/** + * A reference to an operator(either Exec operator or expression operator) in a Spark plan. + * + * @param value The name of the operator. + * @param opType The type of the operator (e.g., Exec, Expr). + */ case class OpRef(override val value: String, override val opType: OpTypes.OpType) extends OperatorRefBase(value, opType) @@ -38,10 +44,20 @@ object OpRef { initMap } + /** + * Retrieves an `OpRef` for an expression operator. + * If the operator name already exists in the cache, it returns the existing `OpRef`. + * Otherwise, it creates a new `OpRef` with the given name and `OpTypes.Expr`. + */ def fromExpr(name: String): OpRef = { OP_NAMES.computeIfAbsent(name, OpRef.apply(_, OpTypes.Expr)) } + /** + * Retrieves an `OpRef` for an exec operator. + * If the operator name already exists in the cache, it returns the existing `OpRef`. + * Otherwise, it creates a new `OpRef` with the given name and `OpTypes.Exec`. + */ def fromExec(name: String): OpRef = { OP_NAMES.computeIfAbsent(name, OpRef.apply(_, OpTypes.Exec)) } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorCounter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorCounter.scala index 9e5a76308..fab9422aa 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorCounter.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorCounter.scala @@ -20,12 +20,29 @@ import scala.collection.mutable import com.nvidia.spark.rapids.tool.planparser.{ExecInfo, PlanInfo} +/** + * `OperatorCounter` is responsible for counting the occurrences of execs and expressions + * in a given execution plan (`PlanInfo`). It maintains counts separately for supported and + * unsupported execs and expressions. + * + * @param planInfo The execution plan information to analyze. + */ case class OperatorCounter(planInfo: PlanInfo) { + + /** + * Represents data for an exec or expression, including its reference, + * occurrence count, and stages where it appears. + * + * @param opRef The operator reference. + * @param count The number of times the operator appears. + * @param stages The set of stages where the operator appears. + */ case class OperatorData( opRef: OperatorRefBase, var count: Int = 0, var stages: Set[Int] = Set()) + // Summarizes the count information for an exec or expression, including whether it is supported. case class OperatorCountSummary( opData: OperatorData, isSupported: Boolean) @@ -33,11 +50,16 @@ case class OperatorCounter(planInfo: PlanInfo) { private val supportedMap: mutable.Map[OperatorRefBase, OperatorData] = mutable.Map() private val unsupportedMap: mutable.Map[OperatorRefBase, OperatorData] = mutable.Map() + // Returns a sequence of `OperatorCountSummary`, combining both supported and + // unsupported operators. def getOpsCountSummary(): Seq[OperatorCountSummary] = { supportedMap.values.map(OperatorCountSummary(_, isSupported = true)).toSeq ++ unsupportedMap.values.map(OperatorCountSummary(_, isSupported = false)).toSeq } + + // Updates the operator data in the given map (supported or unsupported). + // Increments the count and updates the stages where the operator appears. private def updateOpRefEntry(opRef: OperatorRefBase, stages: Set[Int], targetMap: mutable.Map[OperatorRefBase, OperatorData]): Unit = { val operatorData = targetMap.getOrElseUpdate(opRef, OperatorData(opRef)) @@ -45,6 +67,8 @@ case class OperatorCounter(planInfo: PlanInfo) { operatorData.stages ++= stages } + // Processes an `ExecInfo` node to update exec and expression counts. + // Separates supported and unsupported execs and expressions into their respective maps. private def processExecInfo(execInfo: ExecInfo): Unit = { val opMap = execInfo.isSupported match { case true => supportedMap @@ -62,6 +86,7 @@ case class OperatorCounter(planInfo: PlanInfo) { } } + // Counts the execs and expressions in the execution plan. private def countOperators(): Unit = { planInfo.execInfo.foreach { exec => exec.isClusterNode match { diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorRefBase.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorRefBase.scala index c4c54a692..04bd9a190 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorRefBase.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/OperatorRefBase.scala @@ -20,7 +20,17 @@ import com.nvidia.spark.rapids.tool.planparser.OpTypes import org.apache.spark.sql.rapids.tool.util.StringUtils +/** + * Base class representing a reference to an operator (either exec operator or expression operator). + * It provides methods to retrieve the operator's name and type in both raw and + * CSV-friendly formats. + * + * @param value The name of the operator. + * @param opType The type of the operator (e.g., Exec, Expr). + */ + class OperatorRefBase(val value: String, val opType: OpTypes.OpType) extends OperatorRefTrait { + // Preformatted values for CSV output to avoid reformatting multiple times. val csvValue: String = StringUtils.reformatCSVString(value) val csvOpType: String = StringUtils.reformatCSVString(opType.toString) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/UnsupportedExprOpRef.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/UnsupportedExprOpRef.scala index ef03faa64..0e17d8f0d 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/UnsupportedExprOpRef.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ops/UnsupportedExprOpRef.scala @@ -16,10 +16,26 @@ package com.nvidia.spark.rapids.tool.planparser.ops + +/** + * Represents a reference to an unsupported expression operator. + * Extends `OperatorRefBase` and includes a reason why the expression is unsupported. + * + * @param opRef The underlying `OpRef` for the expression. + * @param unsupportedReason A string describing why the expression is unsupported. + */ case class UnsupportedExprOpRef(opRef: OpRef, unsupportedReason: String) extends OperatorRefBase(opRef.value, opRef.opType) +// Provides a factory method to create an instance from an expression name and unsupported reason. object UnsupportedExprOpRef { + /** + * Creates an `UnsupportedExprOpRef` for the given expression name and unsupported reason. + * + * @param exprName The name of the unsupported expression. + * @param unsupportedReason A string describing why the expression is unsupported. + * @return An instance of `UnsupportedExprOpRef`. + */ def apply(exprName: String, unsupportedReason: String): UnsupportedExprOpRef = { val opRef = OpRef.fromExpr(exprName) UnsupportedExprOpRef(opRef, unsupportedReason)