From 89361b1417aab665400ab4ecbd0cde1cec1346e6 Mon Sep 17 00:00:00 2001 From: Niranjan Artal <50492963+nartal1@users.noreply.github.com> Date: Mon, 16 Oct 2023 09:32:44 -0700 Subject: [PATCH] Qualification tool: Add penalty for row conversions (#471) * Qualification tool: Add penalty for row conversions Signed-off-by: Niranjan Artal * optimize code * addressed review comments * addressed review comments and added test Signed-off-by: Niranjan Artal * fix unit test * addressed review comments * addressed review comments and updated test results Signed-off-by: Niranjan Artal * Address review comments Signed-off-by: Niranjan Artal * address review comments * update tests * Revert "update tests" This reverts commit 7e1ebe032e9430eacda1587fbfb854b750dd8480. * add penalty to durations * change transitiontime calculation * update variable name * addressed review comments * change penaly percentage --------- Signed-off-by: Niranjan Artal --- .../tool/qualification/QualOutputWriter.scala | 7 +- .../tool/qualification/Qualification.scala | 5 +- .../qualification/QualificationArgs.scala | 7 ++ .../qualification/QualificationMain.scala | 3 +- .../qualification/QualificationAppInfo.scala | 113 ++++++++++++++++-- .../QualificationEventProcessor.scala | 14 ++- .../tool/planparser/SqlPlanParserSuite.scala | 47 +++++++- 7 files changed, 178 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala index 9953893c7..d4eff43fc 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala @@ -391,6 +391,7 @@ object QualOutputWriter { val ESTIMATED_GPU_SPEEDUP = "Estimated GPU Speedup" val ESTIMATED_GPU_TIMESAVED = "Estimated GPU Time Saved" val STAGE_ESTIMATED_STR = "Stage Estimated" + val NUM_TRANSITIONS = "Number of transitions from or to GPU" val UNSUPPORTED_EXECS = "Unsupported Execs" val UNSUPPORTED_EXPRS = "Unsupported Expressions" val CLUSTER_TAGS = "Cluster Tags" @@ -856,7 +857,8 @@ object QualOutputWriter { AVERAGE_SPEEDUP_STR -> AVERAGE_SPEEDUP_STR.size, STAGE_DUR_STR -> STAGE_DUR_STR.size, UNSUPPORTED_TASK_DURATION_STR -> UNSUPPORTED_TASK_DURATION_STR.size, - STAGE_ESTIMATED_STR -> STAGE_ESTIMATED_STR.size + STAGE_ESTIMATED_STR -> STAGE_ESTIMATED_STR.size, + NUM_TRANSITIONS -> NUM_TRANSITIONS.size ) detailedHeadersAndFields } @@ -878,7 +880,8 @@ object QualOutputWriter { headersAndSizes(AVERAGE_SPEEDUP_STR), info.stageTaskTime.toString -> headersAndSizes(STAGE_DUR_STR), info.unsupportedTaskDur.toString -> headersAndSizes(UNSUPPORTED_TASK_DURATION_STR), - info.estimated.toString -> headersAndSizes(STAGE_ESTIMATED_STR)) + info.estimated.toString -> headersAndSizes(STAGE_ESTIMATED_STR), + info.numTransitions.toString -> headersAndSizes(NUM_TRANSITIONS)) constructOutputRow(data, delimiter, prettyPrint) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala index f7b624957..e733d73c2 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala @@ -34,7 +34,8 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration, timeout: Option[Long], nThreads: Int, order: String, pluginTypeChecker: PluginTypeChecker, reportReadSchema: Boolean, printStdout: Boolean, uiEnabled: Boolean, enablePB: Boolean, - reportSqlLevel: Boolean, maxSQLDescLength: Int, mlOpsEnabled:Boolean) extends Logging { + reportSqlLevel: Boolean, maxSQLDescLength: Int, mlOpsEnabled:Boolean, + penalizeTransitions: Boolean) extends Logging { private val allApps = new ConcurrentLinkedQueue[QualificationSummaryInfo]() @@ -166,7 +167,7 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration, try { val startTime = System.currentTimeMillis() val appResult = QualificationAppInfo.createApp(path, hadoopConf, pluginTypeChecker, - reportSqlLevel, mlOpsEnabled) + reportSqlLevel, mlOpsEnabled, penalizeTransitions) val qualAppResult = appResult match { case Left(errorMessage: String) => // Case when an error occurred during QualificationAppInfo creation diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala index 76a2fba8e..c73d4cc0f 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala @@ -112,6 +112,13 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* opt[Boolean](required = false, descr = "Whether to parse ML functions in the eventlogs. Default is false.", default = Some(false)) + val penalizeTransitions: ScallopOption[Boolean] = + toggle("penalize-transitions", + default = Some(true), + prefix = "no-", + descrYes = "Add penalty for ColumnarToRow and RowToColumnar transitions. " + + "Enabled by default.", + descrNo = "Do not add penalty for ColumnarToRow and RowToColumnar transitions.") val sparkProperty: ScallopOption[List[String]] = opt[List[String]](required = false, descr = "Filter applications based on certain Spark properties that were set during " + diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala index 0510b03f0..cb8a3c583 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala @@ -60,6 +60,7 @@ object QualificationMain extends Logging { val reportSqlLevel = appArgs.perSql.getOrElse(false) val platform = appArgs.platform.getOrElse("onprem") val mlOpsEnabled = appArgs.mlFunctions.getOrElse(false) + val penalizeTransitions = appArgs.penalizeTransitions.getOrElse(true) val hadoopConf = RapidsToolsConfUtil.newHadoopConf @@ -93,7 +94,7 @@ object QualificationMain extends Logging { val qual = new Qualification(outputDirectory, numOutputRows, hadoopConf, timeout, nThreads, order, pluginTypeChecker, reportReadSchema, printStdout, uiEnabled, - enablePB, reportSqlLevel, maxSQLDescLength, mlOpsEnabled) + enablePB, reportSqlLevel, maxSQLDescLength, mlOpsEnabled, penalizeTransitions) val res = qual.qualifyApps(filteredLogs) (0, res) } diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index fe8ae7fc6..028c904ed 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -16,6 +16,8 @@ package org.apache.spark.sql.rapids.tool.qualification +import java.util.concurrent.TimeUnit + import scala.collection.mutable.{ArrayBuffer, HashMap} import com.nvidia.spark.rapids.tool.EventLogInfo @@ -37,7 +39,8 @@ class QualificationAppInfo( pluginTypeChecker: PluginTypeChecker, reportSqlLevel: Boolean, perSqlOnly: Boolean = false, - mlOpsEnabled: Boolean = false) + mlOpsEnabled: Boolean = false, + penalizeTransitions: Boolean = true) extends AppBase(eventLogInfo, hadoopConf) with Logging { var appId: String = "" @@ -51,6 +54,7 @@ class QualificationAppInfo( HashMap.empty[Long, StageTaskQualificationSummary] val stageIdToTaskEndSum: HashMap[Long, StageTaskQualificationSummary] = HashMap.empty[Long, StageTaskQualificationSummary] + val stageIdToGpuCpuTransitions: HashMap[Int, Int] = HashMap.empty[Int, Int] val stageIdToSqlID: HashMap[Int, Long] = HashMap.empty[Int, Long] val sqlIDtoFailures: HashMap[Long, ArrayBuffer[String]] = HashMap.empty[Long, ArrayBuffer[String]] @@ -147,8 +151,10 @@ class QualificationAppInfo( // Look at the total task times for all jobs/stages that aren't SQL or // SQL but dataset or rdd - private def calculateNonSQLTaskDataframeDuration(taskDFDuration: Long): Long = { - val allTaskTime = stageIdToTaskEndSum.values.map(_.totalTaskDuration).sum + private def calculateNonSQLTaskDataframeDuration( + taskDFDuration: Long, + totalTransitionTime: Long): Long = { + val allTaskTime = stageIdToTaskEndSum.values.map(_.totalTaskDuration).sum + totalTransitionTime val res = allTaskTime - taskDFDuration assert(res >= 0) res @@ -241,13 +247,87 @@ class QualificationAppInfo( stages.map { stageId => val stageTaskTime = stageIdToTaskEndSum.get(stageId) .map(_.totalTaskDuration).getOrElse(0L) + val numTransitions = penalizeTransitions match { + case true => stageIdToGpuCpuTransitions.getOrElse(stageId, 0) + case false => 0 + } + val transitionsTime = numTransitions match { + case 0 => 0L // no transitions + case gpuCpuTransitions => + // Duration to transfer data from GPU to CPU and vice versa. + // Assuming it's a PCI-E Gen3, but also assuming that some of the result could be + // spilled to disk. + // Duration in Spark metrics is in milliseconds and CPU-GPU transfer rate is in bytes/sec. + // So we need to convert the transitions time to milliseconds. + val totalBytesRead = { + stageIdToTaskEndSum.get(stageId).map(_.totalbytesRead).getOrElse(0L) + } + if (totalBytesRead > 0) { + val transitionTime = (totalBytesRead / + QualificationAppInfo.CPU_GPU_TRANSFER_RATE.toDouble) * gpuCpuTransitions + (transitionTime * 1000).toLong // convert to milliseconds + } else { + 0L + } + + case _ => 0L + } + val finalEachStageUnsupported = if (transitionsTime != 0) { + // Add 50% penalty for unsupported duration if there are transitions. This number + // was randomly picked because it matched roughly what we saw on the experiments + // with customer/nds event logs + (eachStageUnsupported * 0.5 + eachStageUnsupported).toLong + } else { + eachStageUnsupported + } + StageQualSummaryInfo(stageId, allSpeedupFactorAvg, stageTaskTime, - eachStageUnsupported, estimated) + finalEachStageUnsupported, numTransitions, transitionsTime, estimated) }.toSet } + private def calculateNumberOfTransitions(allStagesToExecs: Map[Int, Seq[ExecInfo]]): Unit = { + allStagesToExecs.foreach { case (stageId, execs) => + // Flatten all the Execs within a stage. + // Example: Exchange;WholeStageCodegen (14);Exchange;WholeStageCodegen (13);Exchange + // will be flattened to Exchange;Sort;Exchange;Sort;SortMergeJoin;SortMergeJoin;Exchange; + val allExecs = execs.map(x => if (x.exec.startsWith("WholeStage")) { + x.children.getOrElse(Seq.empty) + } else { + Seq(x) + }).flatten.reverse + + // If it's a shuffle stage, then we need to keep the first and last Exchange and remove + // all the intermediate Exchanges as input size is captured in Exchange node. + val dedupedExecs = if (allExecs.size > 2) { + allExecs.head +: + allExecs.tail.init.filter(x => x.exec != "Exchange") :+ allExecs.last + } else { + allExecs + } + // Create a list of transitions by zipping allExecs with itself but with the first element + // This will create a list of adjacent pairs. + // Example: If allExecs = (ScanExec, FilterExec, SortExec, ProjectExec), then it will create + // a list of tuples as follows: + // (ScanExec, FilterExec), (FilterExec, SortExec), (SortExec, ProjectExec) + val transitions = dedupedExecs.zip(dedupedExecs.drop(1)).count { + // If the current execution (currExec) is supported, and the next execution (nextExec) + // is not supported, or if the current execution is not supported and the next execution + // is supported, then we consider this as a transition. + case (currExec, nextExec) => (currExec.isSupported && !nextExec.isSupported) || + (!currExec.isSupported && nextExec.isSupported) + } + stageIdToGpuCpuTransitions(stageId) = transitions + } + } + def summarizeStageLevel(execInfos: Seq[ExecInfo], sqlID: Long): Set[StageQualSummaryInfo] = { val (allStagesToExecs, execsNoStage) = getStageToExec(execInfos) + + // Get the total number of transitions between CPU and GPU for each stage and + // store it in a Map. + calculateNumberOfTransitions(allStagesToExecs) + if (allStagesToExecs.isEmpty) { // use job level // also get the job ids associated with the SQLId @@ -302,7 +382,7 @@ class QualificationAppInfo( val numUnsupportedExecs = execInfos.filterNot(_.isSupported).size // This is a guestimate at how much wall clock was supported val numExecs = execInfos.size.toDouble - val numSupportedExecs = (numExecs - numUnsupportedExecs).toDouble + val numSupportedExecs = (numExecs - numUnsupportedExecs) val ratio = numSupportedExecs / numExecs val estimateWallclockSupported = (sqlWallClockDuration * ratio).toInt // don't worry about supported execs for these are these are mostly indicator of I/O @@ -428,11 +508,12 @@ class QualificationAppInfo( val allStagesSummary = perSqlStageSummary.flatMap(_.stageSum) .map(sum => sum.stageId -> sum).toMap.values.toSeq val sqlDataframeTaskDuration = allStagesSummary.map(s => s.stageTaskTime).sum + val totalTransitionsTime = allStagesSummary.map(s => s.transitionTime).sum val unsupportedSQLTaskDuration = calculateSQLUnsupportedTaskDuration(allStagesSummary) val endDurationEstimated = this.appEndTime.isEmpty && appDuration > 0 val jobOverheadTime = calculateJobOverHeadTime(info.startTime) val nonSQLDataframeTaskDuration = - calculateNonSQLTaskDataframeDuration(sqlDataframeTaskDuration) + calculateNonSQLTaskDataframeDuration(sqlDataframeTaskDuration, totalTransitionsTime) val nonSQLTaskDuration = nonSQLDataframeTaskDuration + jobOverheadTime // note that these ratios are based off the stage times which may be missing some stage // overhead or execs that didn't have associated stages @@ -488,8 +569,11 @@ class QualificationAppInfo( } // get the ratio based on the Task durations that we will use for wall clock durations + // totalTransitionTime is the overhead time for ColumnarToRow/RowToColumnar transitions + // which impacts the GPU ratio. val estimatedGPURatio = if (sqlDataframeTaskDuration > 0) { - supportedSQLTaskDuration.toDouble / sqlDataframeTaskDuration.toDouble + supportedSQLTaskDuration.toDouble / ( + sqlDataframeTaskDuration.toDouble + totalTransitionsTime.toDouble) } else { 1 } @@ -670,7 +754,8 @@ class StageTaskQualificationSummary( val stageAttemptId: Int, var executorRunTime: Long, var executorCPUTime: Long, - var totalTaskDuration: Long) + var totalTaskDuration: Long, + var totalbytesRead: Long) case class QualApplicationInfo( appName: String, @@ -736,6 +821,8 @@ case class StageQualSummaryInfo( averageSpeedup: Double, stageTaskTime: Long, unsupportedTaskDur: Long, + numTransitions: Int, + transitionTime: Long, estimated: Boolean = false) object QualificationAppInfo extends Logging { @@ -746,6 +833,11 @@ object QualificationAppInfo extends Logging { val NOT_APPLICABLE = "Not Applicable" val LOWER_BOUND_RECOMMENDED = 1.3 val LOWER_BOUND_STRONGLY_RECOMMENDED = 2.5 + // Below is the total time taken whenever there are ColumnarToRow or RowToColumnar transitions + // This includes the time taken to convert the data from one format to another and the time taken + // to transfer the data from CPU to GPU and vice versa. Current transfer rate is 1GB/s and is + // based on the testing on few candidate eventlogs. + val CPU_GPU_TRANSFER_RATE = 1000000000L private def handleException(e: Exception, path: EventLogInfo): String = { val message: String = e match { @@ -838,10 +930,11 @@ object QualificationAppInfo extends Logging { hadoopConf: Configuration, pluginTypeChecker: PluginTypeChecker, reportSqlLevel: Boolean, - mlOpsEnabled: Boolean): Either[String, QualificationAppInfo] = { + mlOpsEnabled: Boolean, + penalizeTransitions: Boolean): Either[String, QualificationAppInfo] = { try { val app = new QualificationAppInfo(Some(path), Some(hadoopConf), pluginTypeChecker, - reportSqlLevel, false, mlOpsEnabled) + reportSqlLevel, false, mlOpsEnabled, penalizeTransitions) logInfo(s"${path.eventLog.toString} has App: ${app.appId}") Right(app) } catch { diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala index 42b0345c1..390251177 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala @@ -72,16 +72,26 @@ class QualificationEventProcessor(app: QualificationAppInfo, perSqlOnly: Boolean super.doSparkListenerTaskEnd(app, event) // keep all stage task times to see for nonsql duration val taskSum = app.stageIdToTaskEndSum.getOrElseUpdate(event.stageId, { - new StageTaskQualificationSummary(event.stageId, event.stageAttemptId, 0, 0, 0) + new StageTaskQualificationSummary(event.stageId, event.stageAttemptId, 0, 0, 0, 0) }) taskSum.executorRunTime += event.taskMetrics.executorRunTime taskSum.executorCPUTime += NANOSECONDS.toMillis(event.taskMetrics.executorCpuTime) taskSum.totalTaskDuration += event.taskInfo.duration + // Add the total bytes read from the task if it's available. This is from inputMetrics if + // it is reading from datasource, or shuffleReadMetrics if it is reading from shuffle. + val inputMetrics = event.taskMetrics.inputMetrics + if (inputMetrics != null) { + taskSum.totalbytesRead += inputMetrics.bytesRead + } + val shuffleReadMetrics = event.taskMetrics.shuffleReadMetrics + if (shuffleReadMetrics != null) { + taskSum.totalbytesRead += shuffleReadMetrics.totalBytesRead + } // Adds in everything (including failures) app.stageIdToSqlID.get(event.stageId).foreach { sqlID => val taskSum = app.sqlIDToTaskEndSum.getOrElseUpdate(sqlID, { - new StageTaskQualificationSummary(event.stageId, event.stageAttemptId, 0, 0, 0) + new StageTaskQualificationSummary(event.stageId, event.stageAttemptId, 0, 0, 0, 0) }) taskSum.executorRunTime += event.taskMetrics.executorRunTime taskSum.executorCPUTime += NANOSECONDS.toMillis(event.taskMetrics.executorCpuTime) diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala index 3a414b8c1..f8d372995 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala @@ -76,7 +76,7 @@ class SQLPlanParserSuite extends BaseTestSuite { val pluginTypeChecker = new PluginTypeChecker() assert(allEventLogs.size == 1) val appResult = QualificationAppInfo.createApp(allEventLogs.head, hadoopConf, - pluginTypeChecker, reportSqlLevel = false, mlOpsEnabled = false) + pluginTypeChecker, reportSqlLevel = false, mlOpsEnabled = false, penalizeTransitions = true) appResult match { case Right(app) => app case Left(_) => throw new AssertionError("Cannot create application") @@ -217,6 +217,51 @@ class SQLPlanParserSuite extends BaseTestSuite { } } + test("Parse Execs within WholeStageCodeGen in Order") { + TrampolineUtil.withTempDir { eventLogDir => + val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir, + "Execs within WSCG ") { spark => + import spark.implicits._ + val df = Seq(("foo", 1L, 1.2), ("foo", 2L, 2.2), ("bar", 2L, 3.2), + ("bar", 2L, 4.2)).toDF("x", "y", "z") + df.cube($"x", ceil($"y")).count + } + val pluginTypeChecker = new PluginTypeChecker() + val app = createAppFromEventlog(eventLog) + assert(app.sqlPlans.size == 1) + app.sqlPlans.foreach { case (sqlID, plan) => + val planInfo = SQLPlanParser.parseSQLPlan(app.appId, plan, sqlID, "", + pluginTypeChecker, app) + val allExecInfo = planInfo.execInfo + val expectedAllExecInfoSize = if (ToolUtils.isSpark320OrLater()) { + // AdaptiveSparkPlan, WholeStageCodegen, AQEShuffleRead, Exchange, WholeStageCodegen + 5 + } else { + // WholeStageCodegen, Exchange, WholeStageCodegen + 3 + } + val wholeStages = planInfo.execInfo.filter(_.exec.contains("WholeStageCodegen")) + assert(wholeStages.size == 2) + // Expanding the children of WholeStageCodegen + val allExecs = allExecInfo.map(x => if (x.exec.startsWith("WholeStage")) { + x.children.getOrElse(Seq.empty) + } else { + Seq(x) + }).flatten.reverse + val expectedOrder = if (ToolUtils.isSpark320OrLater()) { + // Order should be: LocalTableScan, Expand, HashAggregate, Exchange, + // AQEShuffleRead, HashAggregate, AdaptiveSparkPlan + Seq("LocalTableScan", "Expand", "HashAggregate", "Exchange", "AQEShuffleRead", + "HashAggregate", "AdaptiveSparkPlan") + } else { + // Order should be: LocalTableScan, Expand, HashAggregate, Exchange, HashAggregate + Seq("LocalTableScan", "Expand", "HashAggregate", "Exchange", "HashAggregate") + } + assert(allExecs.map(_.exec) == expectedOrder) + } + } + } + test("HashAggregate") { TrampolineUtil.withTempDir { eventLogDir => val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir,