From 5008ad85836c53e6d1472c55b4d5a4e881d3dd89 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Tue, 1 Aug 2023 16:43:13 -0700 Subject: [PATCH 01/16] Qualification tool: Add penalty for row conversions Signed-off-by: Niranjan Artal --- .../qualification/QualificationAppInfo.scala | 55 ++++++++++++++++++- .../QualificationEventProcessor.scala | 14 ++++- 2 files changed, 65 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index ba47ca89b..20e7096b2 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -51,6 +51,7 @@ class QualificationAppInfo( HashMap.empty[Long, StageTaskQualificationSummary] val stageIdToTaskEndSum: HashMap[Long, StageTaskQualificationSummary] = HashMap.empty[Long, StageTaskQualificationSummary] + val stageIdToGpuCpuTransitions: HashMap[Int, Int] = HashMap.empty[Int, Int] val stageIdToSqlID: HashMap[Int, Long] = HashMap.empty[Int, Long] val sqlIDtoFailures: HashMap[Long, ArrayBuffer[String]] = HashMap.empty[Long, ArrayBuffer[String]] @@ -241,13 +242,60 @@ class QualificationAppInfo( stages.map { stageId => val stageTaskTime = stageIdToTaskEndSum.get(stageId) .map(_.totalTaskDuration).getOrElse(0L) + val transitionsTime = if (stageIdToGpuCpuTransitions.get(stageId).isDefined) { + val gpuCpuTransitions = stageIdToGpuCpuTransitions.get(stageId).get + if (gpuCpuTransitions > 0) { + val totalBytesRead = + stageIdToTaskEndSum.get(stageId).map(_.totalbytesRead).getOrElse(0L).toDouble + // Duration to transfer data from GPU to CPU and vice versa. + // Assuming it's a PCI-E Gen3, but also assuming that some of the result could be + // spilled to disk. + // Duration in Spark metrics is in millisecond, so multiply this by 1000 to make + // it consistent + if (totalBytesRead > 0) { + val fallback_duration = + (totalBytesRead / QualificationAppInfo.CPU_GPU_TRANSFER_RATE) * + QualificationAppInfo.SECONDS_TO_MILLISECONDS * gpuCpuTransitions + fallback_duration.toLong + } else { + 0L + } + } else { + 0L + } + } else { + 0L + } + // Update totaltaskduration of stageIdToTaskEndSum to include transitions time + val stageIdToTasksMetrics = stageIdToTaskEndSum.get(stageId).orElse(None) + if (stageIdToTasksMetrics.isDefined) { + stageIdToTasksMetrics.get.totalTaskDuration += transitionsTime + } StageQualSummaryInfo(stageId, allSpeedupFactorAvg, stageTaskTime, - eachStageUnsupported, estimated) + eachStageUnsupported + transitionsTime, estimated) }.toSet } def summarizeStageLevel(execInfos: Seq[ExecInfo], sqlID: Long): Set[StageQualSummaryInfo] = { val (allStagesToExecs, execsNoStage) = getStageToExec(execInfos) + + // Get the total number of transitions between CPU and GPU for each stage and + // store it in a Map. + allStagesToExecs.foreach { case (stageId, execs) => + val topLevelExecs = execs.filterNot( + x => x.exec.startsWith("WholeStage")) + val childrenExecs = execs.flatMap { e => + e.children.map(x => x) + }.flatten + val allExecs = topLevelExecs ++ childrenExecs + val transitions = if (allExecs.size > 1) { + allExecs.sliding(2).count(x => (x(0).isSupported && !x(1).isSupported) + || (!x(0).isSupported && x(1).isSupported)) + } else { + 0 + } + stageIdToGpuCpuTransitions(stageId) = transitions + } if (allStagesToExecs.isEmpty) { // use job level // also get the job ids associated with the SQLId @@ -670,7 +718,8 @@ class StageTaskQualificationSummary( val stageAttemptId: Int, var executorRunTime: Long, var executorCPUTime: Long, - var totalTaskDuration: Long) + var totalTaskDuration: Long, + var totalbytesRead: Long) case class QualApplicationInfo( appName: String, @@ -739,6 +788,8 @@ object QualificationAppInfo extends Logging { val NOT_APPLICABLE = "Not Applicable" val LOWER_BOUND_RECOMMENDED = 1.3 val LOWER_BOUND_STRONGLY_RECOMMENDED = 2.5 + val CPU_GPU_TRANSFER_RATE = 10000000L + val SECONDS_TO_MILLISECONDS = 1000L private def handleException(e: Exception, path: EventLogInfo): Unit = { val message: String = e match { diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala index 42b0345c1..390251177 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala @@ -72,16 +72,26 @@ class QualificationEventProcessor(app: QualificationAppInfo, perSqlOnly: Boolean super.doSparkListenerTaskEnd(app, event) // keep all stage task times to see for nonsql duration val taskSum = app.stageIdToTaskEndSum.getOrElseUpdate(event.stageId, { - new StageTaskQualificationSummary(event.stageId, event.stageAttemptId, 0, 0, 0) + new StageTaskQualificationSummary(event.stageId, event.stageAttemptId, 0, 0, 0, 0) }) taskSum.executorRunTime += event.taskMetrics.executorRunTime taskSum.executorCPUTime += NANOSECONDS.toMillis(event.taskMetrics.executorCpuTime) taskSum.totalTaskDuration += event.taskInfo.duration + // Add the total bytes read from the task if it's available. This is from inputMetrics if + // it is reading from datasource, or shuffleReadMetrics if it is reading from shuffle. + val inputMetrics = event.taskMetrics.inputMetrics + if (inputMetrics != null) { + taskSum.totalbytesRead += inputMetrics.bytesRead + } + val shuffleReadMetrics = event.taskMetrics.shuffleReadMetrics + if (shuffleReadMetrics != null) { + taskSum.totalbytesRead += shuffleReadMetrics.totalBytesRead + } // Adds in everything (including failures) app.stageIdToSqlID.get(event.stageId).foreach { sqlID => val taskSum = app.sqlIDToTaskEndSum.getOrElseUpdate(sqlID, { - new StageTaskQualificationSummary(event.stageId, event.stageAttemptId, 0, 0, 0) + new StageTaskQualificationSummary(event.stageId, event.stageAttemptId, 0, 0, 0, 0) }) taskSum.executorRunTime += event.taskMetrics.executorRunTime taskSum.executorCPUTime += NANOSECONDS.toMillis(event.taskMetrics.executorCpuTime) From af962b8002f76406cb2f1890094630d51ad4d227 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Wed, 2 Aug 2023 14:17:21 -0700 Subject: [PATCH 02/16] optimize code --- .../qualification/QualificationAppInfo.scala | 35 +++++++------------ 1 file changed, 12 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index ccbace9a3..2e76b1bb4 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -242,29 +242,23 @@ class QualificationAppInfo( stages.map { stageId => val stageTaskTime = stageIdToTaskEndSum.get(stageId) .map(_.totalTaskDuration).getOrElse(0L) - val transitionsTime = if (stageIdToGpuCpuTransitions.get(stageId).isDefined) { - val gpuCpuTransitions = stageIdToGpuCpuTransitions.get(stageId).get - if (gpuCpuTransitions > 0) { - val totalBytesRead = - stageIdToTaskEndSum.get(stageId).map(_.totalbytesRead).getOrElse(0L).toDouble + val transitionsTime = stageIdToGpuCpuTransitions.getOrElse(stageId, 0) match { + case gpuCpuTransitions if gpuCpuTransitions > 0 => // Duration to transfer data from GPU to CPU and vice versa. // Assuming it's a PCI-E Gen3, but also assuming that some of the result could be // spilled to disk. // Duration in Spark metrics is in millisecond, so multiply this by 1000 to make // it consistent + val totalBytesRead = + stageIdToTaskEndSum.get(stageId).map(_.totalbytesRead).getOrElse(0L).toDouble if (totalBytesRead > 0) { - val fallback_duration = - (totalBytesRead / QualificationAppInfo.CPU_GPU_TRANSFER_RATE) * - QualificationAppInfo.SECONDS_TO_MILLISECONDS * gpuCpuTransitions + val fallback_duration = (totalBytesRead / QualificationAppInfo.CPU_GPU_TRANSFER_RATE) * + QualificationAppInfo.SECONDS_TO_MILLISECONDS * gpuCpuTransitions fallback_duration.toLong } else { 0L } - } else { - 0L - } - } else { - 0L + case _ => 0L } // Update totaltaskduration of stageIdToTaskEndSum to include transitions time val stageIdToTasksMetrics = stageIdToTaskEndSum.get(stageId).orElse(None) @@ -282,17 +276,12 @@ class QualificationAppInfo( // Get the total number of transitions between CPU and GPU for each stage and // store it in a Map. allStagesToExecs.foreach { case (stageId, execs) => - val topLevelExecs = execs.filterNot( - x => x.exec.startsWith("WholeStage")) - val childrenExecs = execs.flatMap { e => - e.children.map(x => x) - }.flatten + val topLevelExecs = execs.filterNot(x => x.exec.startsWith("WholeStage")) + val childrenExecs = execs.flatMap(_.children).flatten val allExecs = topLevelExecs ++ childrenExecs - val transitions = if (allExecs.size > 1) { - allExecs.sliding(2).count(x => (x(0).isSupported && !x(1).isSupported) - || (!x(0).isSupported && x(1).isSupported)) - } else { - 0 + val transitions = allExecs.zip(allExecs.drop(1)).count { + case (exec1, exec2) => + (exec1.isSupported && !exec2.isSupported) || (!exec1.isSupported && exec2.isSupported) } stageIdToGpuCpuTransitions(stageId) = transitions } From 1ae0eaa621c918048356fe4ee5d29cfadeeee8e8 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Mon, 7 Aug 2023 18:31:57 -0700 Subject: [PATCH 03/16] addressed review comments --- .../tool/qualification/QualOutputWriter.scala | 7 +++- .../qualification/QualificationAppInfo.scala | 40 +++++++++++++------ 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala index 9953893c7..704136422 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala @@ -391,6 +391,7 @@ object QualOutputWriter { val ESTIMATED_GPU_SPEEDUP = "Estimated GPU Speedup" val ESTIMATED_GPU_TIMESAVED = "Estimated GPU Time Saved" val STAGE_ESTIMATED_STR = "Stage Estimated" + val NUM_TRANSITIONS = "Number of CPU-GPU Transitions" val UNSUPPORTED_EXECS = "Unsupported Execs" val UNSUPPORTED_EXPRS = "Unsupported Expressions" val CLUSTER_TAGS = "Cluster Tags" @@ -856,7 +857,8 @@ object QualOutputWriter { AVERAGE_SPEEDUP_STR -> AVERAGE_SPEEDUP_STR.size, STAGE_DUR_STR -> STAGE_DUR_STR.size, UNSUPPORTED_TASK_DURATION_STR -> UNSUPPORTED_TASK_DURATION_STR.size, - STAGE_ESTIMATED_STR -> STAGE_ESTIMATED_STR.size + STAGE_ESTIMATED_STR -> STAGE_ESTIMATED_STR.size, + NUM_TRANSITIONS -> NUM_TRANSITIONS.size ) detailedHeadersAndFields } @@ -878,7 +880,8 @@ object QualOutputWriter { headersAndSizes(AVERAGE_SPEEDUP_STR), info.stageTaskTime.toString -> headersAndSizes(STAGE_DUR_STR), info.unsupportedTaskDur.toString -> headersAndSizes(UNSUPPORTED_TASK_DURATION_STR), - info.estimated.toString -> headersAndSizes(STAGE_ESTIMATED_STR)) + info.estimated.toString -> headersAndSizes(STAGE_ESTIMATED_STR), + info.numTransitions.toString -> headersAndSizes(NUM_TRANSITIONS)) constructOutputRow(data, delimiter, prettyPrint) } } diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index 2e76b1bb4..526714630 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -16,6 +16,8 @@ package org.apache.spark.sql.rapids.tool.qualification +import java.util.concurrent.TimeUnit + import scala.collection.mutable.{ArrayBuffer, HashMap} import com.nvidia.spark.rapids.tool.EventLogInfo @@ -242,19 +244,19 @@ class QualificationAppInfo( stages.map { stageId => val stageTaskTime = stageIdToTaskEndSum.get(stageId) .map(_.totalTaskDuration).getOrElse(0L) - val transitionsTime = stageIdToGpuCpuTransitions.getOrElse(stageId, 0) match { + val numTransitions = stageIdToGpuCpuTransitions.getOrElse(stageId, 0) + val transitionsTime = numTransitions match { case gpuCpuTransitions if gpuCpuTransitions > 0 => // Duration to transfer data from GPU to CPU and vice versa. // Assuming it's a PCI-E Gen3, but also assuming that some of the result could be // spilled to disk. - // Duration in Spark metrics is in millisecond, so multiply this by 1000 to make - // it consistent + // Duration in Spark metrics is in milliseconds and CPU-GPU transfer rate is in bytes/sec. + // So we need to convert the transitions time to milliseconds. val totalBytesRead = - stageIdToTaskEndSum.get(stageId).map(_.totalbytesRead).getOrElse(0L).toDouble + stageIdToTaskEndSum.get(stageId).map(_.totalbytesRead).getOrElse(0L) if (totalBytesRead > 0) { - val fallback_duration = (totalBytesRead / QualificationAppInfo.CPU_GPU_TRANSFER_RATE) * - QualificationAppInfo.SECONDS_TO_MILLISECONDS * gpuCpuTransitions - fallback_duration.toLong + TimeUnit.SECONDS.toMillis( + totalBytesRead / QualificationAppInfo.CPU_GPU_TRANSFER_RATE) * gpuCpuTransitions } else { 0L } @@ -266,7 +268,7 @@ class QualificationAppInfo( stageIdToTasksMetrics.get.totalTaskDuration += transitionsTime } StageQualSummaryInfo(stageId, allSpeedupFactorAvg, stageTaskTime, - eachStageUnsupported + transitionsTime, estimated) + eachStageUnsupported + transitionsTime, estimated, numTransitions) }.toSet } @@ -279,9 +281,17 @@ class QualificationAppInfo( val topLevelExecs = execs.filterNot(x => x.exec.startsWith("WholeStage")) val childrenExecs = execs.flatMap(_.children).flatten val allExecs = topLevelExecs ++ childrenExecs + // Create a list of transitions by zipping allExecs with itself but with the first element + // This will create a list of adjacent pairs. + // Example: If allExecs = (ScanExec, FilterExec, SortExec, ProjectExec), then it will create + // a list of tuples as follows: + // (ScanExec, FilterExec), (FilterExec, SortExec), (SortExec, ProjectExec) val transitions = allExecs.zip(allExecs.drop(1)).count { - case (exec1, exec2) => - (exec1.isSupported && !exec2.isSupported) || (!exec1.isSupported && exec2.isSupported) + // If the current execution (currExec) is supported, and the next execution (nextExec) + // is not supported, or if the current execution is not supported and the next execution + // is supported, then we consider this as a transition. + case (currExec, nextExec) => (currExec.isSupported && !nextExec.isSupported) || + (!currExec.isSupported && nextExec.isSupported) } stageIdToGpuCpuTransitions(stageId) = transitions } @@ -774,7 +784,8 @@ case class StageQualSummaryInfo( averageSpeedup: Double, stageTaskTime: Long, unsupportedTaskDur: Long, - estimated: Boolean = false) + estimated: Boolean = false, + numTransitions: Int) object QualificationAppInfo extends Logging { // define recommendation constants @@ -784,8 +795,13 @@ object QualificationAppInfo extends Logging { val NOT_APPLICABLE = "Not Applicable" val LOWER_BOUND_RECOMMENDED = 1.3 val LOWER_BOUND_STRONGLY_RECOMMENDED = 2.5 + // Below is the total time taken whenever there are ColumnarToRow or RowToColumnar transitions + // This includes the time taken to convert the data from one format to another and the time taken + // to transfer the data from CPU to GPU and vice versa. Current transfer rate is 10MB/s and is + // based on the testing on few eventlogs. + // TODO: Need to test this on more eventlogs including NDS queries + // and come up with a better transfer rate. val CPU_GPU_TRANSFER_RATE = 10000000L - val SECONDS_TO_MILLISECONDS = 1000L private def handleException(e: Exception, path: EventLogInfo): String = { val message: String = e match { From e2dbb12ee8458264d685192b521b85ad2395b4ec Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Thu, 17 Aug 2023 19:20:40 -0700 Subject: [PATCH 04/16] addressed review comments and added test Signed-off-by: Niranjan Artal --- .../qualification/QualificationAppInfo.scala | 47 ++++++++++++++++--- .../jdbc_expectation.csv | 2 +- .../qual_test_simple_expectation.csv | 8 ++-- .../qual_test_simple_expectation_persql.csv | 28 +++++------ .../read_dsv1_expectation.csv | 2 +- .../read_dsv2_expectation.csv | 2 +- .../spark2_expectation.csv | 2 +- .../truncated_1_end_expectation.csv | 2 +- .../tool/planparser/SqlPlanParserSuite.scala | 35 ++++++++++++++ 9 files changed, 99 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index 526714630..4794150e9 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -54,6 +54,7 @@ class QualificationAppInfo( val stageIdToTaskEndSum: HashMap[Long, StageTaskQualificationSummary] = HashMap.empty[Long, StageTaskQualificationSummary] val stageIdToGpuCpuTransitions: HashMap[Int, Int] = HashMap.empty[Int, Int] + var execsNoStageTransitions: Int = 0 val stageIdToSqlID: HashMap[Int, Long] = HashMap.empty[Int, Long] val sqlIDtoFailures: HashMap[Long, ArrayBuffer[String]] = HashMap.empty[Long, ArrayBuffer[String]] @@ -164,11 +165,11 @@ class QualificationAppInfo( } private def calculateSQLSupportedTaskDuration(all: Seq[StageQualSummaryInfo]): Long = { - all.map(s => s.stageTaskTime - s.unsupportedTaskDur).sum + all.map(s => s.stageTaskTime - s.unsupportedTaskDur).sum - calculateNoExecsStageDurations(all) } private def calculateSQLUnsupportedTaskDuration(all: Seq[StageQualSummaryInfo]): Long = { - all.map(_.unsupportedTaskDur).sum + all.map(_.unsupportedTaskDur).sum + calculateNoExecsStageDurations(all) } private def calculateSpeedupFactor(all: Seq[StageQualSummaryInfo]): Double = { @@ -177,6 +178,23 @@ class QualificationAppInfo( res } + private def calculateNoExecsStageDurations(all: Seq[StageQualSummaryInfo]): Long = { + // If there are Execs not associated with any stage, then some of the Execs may not be + // supported on GPU. We need to estimate the duration of these Execs and add it to the + // unsupportedTaskDur. We estimate the duration by taking the average of the unsupportedTaskDur + // of all the stages and multiplying it by the number of Execs that are not associated with + // any stage. We multiply with a penalty factor of 0.05 + // TODO: Need to come up with better heuristics for penalty factor. + val unsupportedTasksize= all.map(_.unsupportedTaskDur).size + if (execsNoStageTransitions != 0 && unsupportedTasksize != 0) { + execsNoStageTransitions * ( + all.map(_.unsupportedTaskDur).sum / unsupportedTasksize) * 0.05 + }.toLong + else { + 0L + } + } + private def getAllReadFileFormats: Seq[String] = { dataSourceInfo.map { ds => s"${ds.format.toLowerCase()}[${ds.schema}]" @@ -278,15 +296,29 @@ class QualificationAppInfo( // Get the total number of transitions between CPU and GPU for each stage and // store it in a Map. allStagesToExecs.foreach { case (stageId, execs) => - val topLevelExecs = execs.filterNot(x => x.exec.startsWith("WholeStage")) - val childrenExecs = execs.flatMap(_.children).flatten - val allExecs = topLevelExecs ++ childrenExecs + // Flatten all the Execs within a stage. + // Example: Exchange;WholeStageCodegen (14);Exchange;WholeStageCodegen (13);Exchange + // will be flattened to Exchange;Sort;Exchange;Sort;SortMergeJoin;SortMergeJoin;Exchange; + val allExecs = execs.map(x => if (x.exec.startsWith("WholeStage")) { + x.children.getOrElse(Seq.empty) + } else { + Seq(x) + }).flatten.reverse + + // If it's a shuffle stage, then we need to keep the first and last Exchange and remove + // all the intermediate Exchanges as input size is captured in Exchange node. + val dedupedExecs = if (allExecs.size > 2) { + allExecs.head +: + allExecs.tail.init.filter(x => x.exec != "Exchange") :+ allExecs.last + } else { + allExecs + } // Create a list of transitions by zipping allExecs with itself but with the first element // This will create a list of adjacent pairs. // Example: If allExecs = (ScanExec, FilterExec, SortExec, ProjectExec), then it will create // a list of tuples as follows: // (ScanExec, FilterExec), (FilterExec, SortExec), (SortExec, ProjectExec) - val transitions = allExecs.zip(allExecs.drop(1)).count { + val transitions = dedupedExecs.zip(dedupedExecs.drop(1)).count { // If the current execution (currExec) is supported, and the next execution (nextExec) // is not supported, or if the current execution is not supported and the next execution // is supported, then we consider this as a transition. @@ -295,6 +327,9 @@ class QualificationAppInfo( } stageIdToGpuCpuTransitions(stageId) = transitions } + if (execsNoStage.nonEmpty) { + execsNoStageTransitions += execsNoStage.filterNot(exec => exec.isSupported).size + } if (allStagesToExecs.isEmpty) { // use job level // also get the job ids associated with the SQLId diff --git a/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv b/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv index eb8a72b10..8baceabd6 100644 --- a/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","app-20211019113801-0001","Not Recommended",1.0,569385.42,2581.57,3627,19894,571967,3503,28.41,"","JDBC[*]","","","","",1812,544575,677,19217,3.8,false,"Scan JDBCRelation(TBLS) [numPartitions=1];Execute CreateViewCommand;CollectLimit","",30 +"Spark shell","app-20211019113801-0001","Not Recommended",1.0,569387.57,2579.42,3627,19894,571967,3500,28.41,"","JDBC[*]","","","","",1812,544575,693,19201,3.8,false,"Scan JDBCRelation(TBLS) [numPartitions=1];Execute CreateViewCommand;CollectLimit","",30 diff --git a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv index f43f88c8d..a9a0c66a4 100644 --- a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv @@ -1,5 +1,5 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Recommended",1.92,8472.65,7846.34,12434,132257,16319,10589,37.7,"","","JSON","","","",7143,4717,19616,112641,3.86,false,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",1 -"Spark shell","local-1651187225439","Not Recommended",1.0,355483.43,153.56,760,180,355637,350,87.88,"","JSON[string:bigint:int]","","","","",498,343411,97,83,1.78,false,"SerializeFromObject;CollectLimit;DeserializeToObject;Scan json;Filter;MapElements","",1 -"Spark shell","local-1651188809790","Not Recommended",1.0,166199.97,15.02,911,283,166215,45,81.18,"","JSON[string:bigint:int]","","","","UDF",715,133608,269,14,1.5,false,"CollectLimit;Scan json;Project","UDF",1 -"Rapids Spark Profiling Tool Unit Tests","local-1623281204390","Not Recommended",1.0,6240.0,0.0,2032,4666,6240,0,46.27,"","JSON[string:bigint:int]","JSON","","","UDF",1209,5793,4664,2,1.0,false,"Execute InsertIntoHadoopFsRelationCommand json;LocalTableScan;Project;Scan json;Execute CreateViewCommand","UDF",1 +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Recommended",1.92,8477.87,7841.12,12434,132257,16319,10582,37.7,"","","JSON","","","",7143,4717,19691,112566,3.86,false,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",1 +"Spark shell","local-1651187225439","Not Recommended",1.0,355490.83,146.16,760,180,355637,333,87.88,"","JSON[string:bigint:int]","","","","",498,343411,101,79,1.78,false,"SerializeFromObject;CollectLimit;DeserializeToObject;Scan json;Filter;MapElements","",1 +"Spark shell","local-1651188809790","Not Recommended",1.0,166213.92,1.07,911,283,166215,3,81.18,"","JSON[string:bigint:int]","","","","UDF",715,133608,282,1,1.5,false,"CollectLimit;Scan json;Project","UDF",1 +"Rapids Spark Profiling Tool Unit Tests","local-1623281204390","Not Recommended",1.0,6240.0,0.0,2032,4666,6240,-151,46.27,"","JSON[string:bigint:int]","JSON","","","UDF",1209,5793,5013,-347,1.0,false,"Execute InsertIntoHadoopFsRelationCommand json;LocalTableScan;Project;Scan json;Execute CreateViewCommand","UDF",1 diff --git a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation_persql.csv b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation_persql.csv index 33a0ceccd..7d4667863 100644 --- a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation_persql.csv +++ b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation_persql.csv @@ -1,18 +1,18 @@ App Name,App ID,SQL ID,SQL Description,SQL DF Duration,GPU Opportunity,Estimated GPU Duration,Estimated GPU Speedup,Estimated GPU Time Saved,Recommendation -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",1,"count at QualificationInfoUtils.scala:94",7143,6719,2078.49,3.43,5064.5,"Strongly Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",3,"count at QualificationInfoUtils.scala:94",2052,1660,800.56,2.56,1251.43,"Strongly Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",2,"count at QualificationInfoUtils.scala:94",1933,1551,763.96,2.53,1169.03,"Strongly Recommended" -"Spark shell","local-1651187225439",0,"show at :26",498,249,373.5,1.33,124.5,"Recommended" -"Spark shell","local-1651188809790",1,"show at :26",196,98,147.0,1.33,49.0,"Recommended" -"Spark shell","local-1651187225439",1,"show at :26",262,60,240.54,1.08,21.45,"Not Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",0,"json at QualificationInfoUtils.scala:76",1306,187,1246.97,1.04,59.02,"Not Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",0,"json at QualificationInfoUtils.scala:130",1209,0,1209.0,1.0,0.0,"Not Recommended" -"Spark shell","local-1651188809790",0,"show at :26",715,2,715.0,1.0,0.0,"Not Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",2,"json at QualificationInfoUtils.scala:136",321,0,321.0,1.0,0.0,"Not Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",5,"json at QualificationInfoUtils.scala:136",129,0,129.0,1.0,0.0,"Not Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",8,"json at QualificationInfoUtils.scala:136",127,0,127.0,1.0,0.0,"Not Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",6,"json at QualificationInfoUtils.scala:130",110,0,110.0,1.0,0.0,"Not Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",3,"json at QualificationInfoUtils.scala:130",108,0,108.0,1.0,0.0,"Not Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",1,"count at QualificationInfoUtils.scala:94",7143,6714,2082.44,3.43,5060.55,"Strongly Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",3,"count at QualificationInfoUtils.scala:94",2052,1655,804.22,2.55,1247.77,"Strongly Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",2,"count at QualificationInfoUtils.scala:94",1933,1546,767.5,2.51,1165.49,"Strongly Recommended" +"Spark shell","local-1651188809790",1,"show at :26",196,90,150.76,1.3,45.23,"Recommended" +"Spark shell","local-1651187225439",0,"show at :26",498,226,384.81,1.29,113.18,"Not Recommended" +"Spark shell","local-1651187225439",1,"show at :26",262,40,247.69,1.05,14.3,"Not Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",0,"json at QualificationInfoUtils.scala:76",1306,131,1264.57,1.03,41.42,"Not Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",0,"json at QualificationInfoUtils.scala:130",1209,-543,1209.0,1.0,0.0,"Not Recommended" +"Spark shell","local-1651188809790",0,"show at :26",715,-66,715.0,1.0,0.0,"Not Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",2,"json at QualificationInfoUtils.scala:136",321,-144,321.0,1.0,0.0,"Not Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",5,"json at QualificationInfoUtils.scala:136",129,-57,129.0,1.0,0.0,"Not Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",8,"json at QualificationInfoUtils.scala:136",127,-56,127.0,1.0,0.0,"Not Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",3,"json at QualificationInfoUtils.scala:130",108,-48,108.0,1.0,0.0,"Not Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",4,"createOrReplaceTempView at QualificationInfoUtils.scala:133",22,22,22.0,1.0,0.0,"Not Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",7,"createOrReplaceTempView at QualificationInfoUtils.scala:133",4,4,4.0,1.0,0.0,"Not Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",1,"createOrReplaceTempView at QualificationInfoUtils.scala:133",2,2,2.0,1.0,0.0,"Not Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",6,"json at QualificationInfoUtils.scala:130",110,-49,110.0,0.99,-0.01,"Not Recommended" diff --git a/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv b/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv index f624cc260..2dfc1fe19 100644 --- a/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1624371544219","Not Recommended",1.0,174691.42,601.57,6695,20421,175293,1034,72.15,"","JSON[string:double:date:int:bigint];Text[*]","JSON","","","",1859,175857,17266,3155,2.39,false,"CollectLimit;Scan json;Execute InsertIntoHadoopFsRelationCommand json;Scan text","",30 +"Spark shell","local-1624371544219","Not Recommended",1.0,174808.87,484.12,6695,20421,175293,832,72.15,"","JSON[string:double:date:int:bigint];Text[*]","JSON","","","",1859,175857,17882,2539,2.39,false,"CollectLimit;Scan json;Execute InsertIntoHadoopFsRelationCommand json;Scan text","",30 \ No newline at end of file diff --git a/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv b/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv index 5cb64b660..2802a08ce 100644 --- a/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1624371906627","Not Recommended",1.0,83172.84,565.15,6760,21802,83738,971,71.3,"","Text[*];json[double]","JSON","","","",1984,82505,18668,3134,2.39,false,"BatchScan json;Execute InsertIntoHadoopFsRelationCommand json;CollectLimit;Scan text","",30 +"Spark shell","local-1624371906627","Not Recommended",1.0,83316.93,421.06,6760,21802,83738,723,71.3,"","Text[*];json[double]","JSON","","","",1984,82505,19467,2335,2.39,false,"BatchScan json;Execute InsertIntoHadoopFsRelationCommand json;CollectLimit;Scan text","",30 diff --git a/core/src/test/resources/QualificationExpectations/spark2_expectation.csv b/core/src/test/resources/QualificationExpectations/spark2_expectation.csv index 7510bbb42..9d4bfcbc2 100644 --- a/core/src/test/resources/QualificationExpectations/spark2_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/spark2_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1634253215009","Not Recommended",1.01,46352.24,710.75,1520,359,47063,1011,67.64,"","Text[*]","","","","",1068,44935,120,239,3.36,false,"CollectLimit;Scan text","",30 +"Spark shell","local-1634253215009","Not Recommended",1.01,46361.16,701.83,1520,359,47063,999,67.64,"","Text[*]","","","","",1068,44935,123,236,3.36,false,"CollectLimit;Scan text","",30 diff --git a/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv b/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv index 07d08a0c3..9c6067163 100644 --- a/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Not Recommended",1.09,4468.98,403.01,1306,14353,4872,570,62.67,"","","JSON","","","",1306,4477,8086,6267,3.41,true,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",30 +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Not Recommended",1.08,4484.54,387.45,1306,14353,4872,548,62.67,"","","JSON","","","",1306,4477,8328,6025,3.41,true,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",30 diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala index a765c233c..197bd358f 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala @@ -217,6 +217,41 @@ class SQLPlanParserSuite extends BaseTestSuite { } } + test("Parse Execs within WholeStageCodeGen in Order") { + TrampolineUtil.withTempDir { eventLogDir => + val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir, + "Execs within WSCG ") { spark => + import spark.implicits._ + val df = Seq(("foo", 1L, 1.2), ("foo", 2L, 2.2), ("bar", 2L, 3.2), + ("bar", 2L, 4.2)).toDF("x", "y", "z") + df.cube($"x", ceil($"y")).count // hex is not supported in GPU yet. + } + val pluginTypeChecker = new PluginTypeChecker() + val app = createAppFromEventlog(eventLog) + assert(app.sqlPlans.size == 1) + app.sqlPlans.foreach { case (sqlID, plan) => + val planInfo = SQLPlanParser.parseSQLPlan(app.appId, plan, sqlID, "", + pluginTypeChecker, app) + val allExecInfo = planInfo.execInfo + // allExecInfo is : WholeStageCodegen, Exchange, WholeStageCodegen + assert(allExecInfo.size == 3) + val wholeStages = planInfo.execInfo.filter(_.exec.contains("WholeStageCodegen")) + assert(wholeStages.size == 2) + // Expanding the children of WholeStageCodegen + val allExecs = allExecInfo.map(x => if (x.exec.startsWith("WholeStage")) { + x.children.getOrElse(Seq.empty) + } else { + Seq(x) + }).flatten.reverse + // Order should be: LocalTableScan, Expand, HashAggregate, Exchange, HashAggregate + assert(allExecs.size == 5) + assert(allExecs(0).exec == "LocalTableScan") + assert(allExecs(1).exec == "Expand") + assert(allExecs(2).exec == "HashAggregate") + } + } + } + test("HashAggregate") { TrampolineUtil.withTempDir { eventLogDir => val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir, From 29f0e8cb57fb05783fd2828462feaf6d19785256 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Mon, 21 Aug 2023 18:55:39 -0700 Subject: [PATCH 05/16] fix unit test --- .../tool/planparser/SqlPlanParserSuite.scala | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala index 197bd358f..ec5877adb 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala @@ -224,7 +224,7 @@ class SQLPlanParserSuite extends BaseTestSuite { import spark.implicits._ val df = Seq(("foo", 1L, 1.2), ("foo", 2L, 2.2), ("bar", 2L, 3.2), ("bar", 2L, 4.2)).toDF("x", "y", "z") - df.cube($"x", ceil($"y")).count // hex is not supported in GPU yet. + df.cube($"x", ceil($"y")).count } val pluginTypeChecker = new PluginTypeChecker() val app = createAppFromEventlog(eventLog) @@ -233,8 +233,13 @@ class SQLPlanParserSuite extends BaseTestSuite { val planInfo = SQLPlanParser.parseSQLPlan(app.appId, plan, sqlID, "", pluginTypeChecker, app) val allExecInfo = planInfo.execInfo - // allExecInfo is : WholeStageCodegen, Exchange, WholeStageCodegen - assert(allExecInfo.size == 3) + val expectedAllExecInfoSize = if (ToolUtils.isSpark320OrLater()) { + // AdaptiveSparkPlan, WholeStageCodegen, AQEShuffleRead, Exchange, WholeStageCodegen + 5 + } else { + // WholeStageCodegen, Exchange, WholeStageCodegen + 3 + } val wholeStages = planInfo.execInfo.filter(_.exec.contains("WholeStageCodegen")) assert(wholeStages.size == 2) // Expanding the children of WholeStageCodegen @@ -243,11 +248,16 @@ class SQLPlanParserSuite extends BaseTestSuite { } else { Seq(x) }).flatten.reverse - // Order should be: LocalTableScan, Expand, HashAggregate, Exchange, HashAggregate - assert(allExecs.size == 5) - assert(allExecs(0).exec == "LocalTableScan") - assert(allExecs(1).exec == "Expand") - assert(allExecs(2).exec == "HashAggregate") + val expectedOrder = if (ToolUtils.isSpark320OrLater()) { + // Order should be: LocalTableScan, Expand, HashAggregate, Exchange, + // AQEShuffleRead, HashAggregate, AdaptiveSparkPlan + Seq("LocalTableScan", "Expand", "HashAggregate", "Exchange", "AQEShuffleRead", + "HashAggregate", "AdaptiveSparkPlan") + } else { + // Order should be: LocalTableScan, Expand, HashAggregate, Exchange, HashAggregate + Seq("LocalTableScan", "Expand", "HashAggregate", "Exchange", "HashAggregate") + } + assert(allExecs.map(_.exec) == expectedOrder) } } } From 7b6803fbb3a7ce97008c97482335b26568fa5d10 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Tue, 22 Aug 2023 17:48:02 -0700 Subject: [PATCH 06/16] addressed review comments --- .../rapids/tool/qualification/Qualification.scala | 5 +++-- .../tool/qualification/QualificationArgs.scala | 5 +++++ .../tool/qualification/QualificationMain.scala | 3 ++- .../tool/qualification/QualificationAppInfo.scala | 15 +++++++++++---- .../tool/planparser/SqlPlanParserSuite.scala | 2 +- 5 files changed, 22 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala index f7b624957..0fa13fddb 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala @@ -34,7 +34,8 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration, timeout: Option[Long], nThreads: Int, order: String, pluginTypeChecker: PluginTypeChecker, reportReadSchema: Boolean, printStdout: Boolean, uiEnabled: Boolean, enablePB: Boolean, - reportSqlLevel: Boolean, maxSQLDescLength: Int, mlOpsEnabled:Boolean) extends Logging { + reportSqlLevel: Boolean, maxSQLDescLength: Int, mlOpsEnabled:Boolean, + ignoreTransitions: Boolean) extends Logging { private val allApps = new ConcurrentLinkedQueue[QualificationSummaryInfo]() @@ -166,7 +167,7 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration, try { val startTime = System.currentTimeMillis() val appResult = QualificationAppInfo.createApp(path, hadoopConf, pluginTypeChecker, - reportSqlLevel, mlOpsEnabled) + reportSqlLevel, mlOpsEnabled, ignoreTransitions) val qualAppResult = appResult match { case Left(errorMessage: String) => // Case when an error occurred during QualificationAppInfo creation diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala index 925001724..dda734f68 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala @@ -112,6 +112,11 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* opt[Boolean](required = false, descr = "Whether to parse ML functions in the eventlogs. Default is false.", default = Some(false)) + val ignoreTransitions: ScallopOption[Boolean] = + opt[Boolean](required = false, + descr = "Whether to ignore durations for ColumnarToRow and RowToColumnar transitions " + + "in the eventlogs while calculating the speedup. Default is false.", + default = Some(false)) val sparkProperty: ScallopOption[List[String]] = opt[List[String]](required = false, descr = "Filter applications based on certain Spark properties that were set during " + diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala index 0510b03f0..905a62972 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala @@ -60,6 +60,7 @@ object QualificationMain extends Logging { val reportSqlLevel = appArgs.perSql.getOrElse(false) val platform = appArgs.platform.getOrElse("onprem") val mlOpsEnabled = appArgs.mlFunctions.getOrElse(false) + val ignoreTransitions = appArgs.ignoreTransitions.getOrElse(false) val hadoopConf = RapidsToolsConfUtil.newHadoopConf @@ -93,7 +94,7 @@ object QualificationMain extends Logging { val qual = new Qualification(outputDirectory, numOutputRows, hadoopConf, timeout, nThreads, order, pluginTypeChecker, reportReadSchema, printStdout, uiEnabled, - enablePB, reportSqlLevel, maxSQLDescLength, mlOpsEnabled) + enablePB, reportSqlLevel, maxSQLDescLength, mlOpsEnabled, ignoreTransitions) val res = qual.qualifyApps(filteredLogs) (0, res) } diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index 4794150e9..468d34473 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -39,7 +39,8 @@ class QualificationAppInfo( pluginTypeChecker: PluginTypeChecker, reportSqlLevel: Boolean, perSqlOnly: Boolean = false, - mlOpsEnabled: Boolean = false) + mlOpsEnabled: Boolean = false, + ignoreTransitions: Boolean = false) extends AppBase(eventLogInfo, hadoopConf) with Logging { var appId: String = "" @@ -262,8 +263,13 @@ class QualificationAppInfo( stages.map { stageId => val stageTaskTime = stageIdToTaskEndSum.get(stageId) .map(_.totalTaskDuration).getOrElse(0L) - val numTransitions = stageIdToGpuCpuTransitions.getOrElse(stageId, 0) + val numTransitions = ignoreTransitions match { + case false => stageIdToGpuCpuTransitions.getOrElse(stageId, 0) + case true => 0 + } + // val numTransitions = stageIdToGpuCpuTransitions.getOrElse(stageId, 0) val transitionsTime = numTransitions match { + case 0 => 0L // no transitions case gpuCpuTransitions if gpuCpuTransitions > 0 => // Duration to transfer data from GPU to CPU and vice versa. // Assuming it's a PCI-E Gen3, but also assuming that some of the result could be @@ -929,10 +935,11 @@ object QualificationAppInfo extends Logging { hadoopConf: Configuration, pluginTypeChecker: PluginTypeChecker, reportSqlLevel: Boolean, - mlOpsEnabled: Boolean): Either[String, QualificationAppInfo] = { + mlOpsEnabled: Boolean, + ignoreTransitions: Boolean): Either[String, QualificationAppInfo] = { try { val app = new QualificationAppInfo(Some(path), Some(hadoopConf), pluginTypeChecker, - reportSqlLevel, false, mlOpsEnabled) + reportSqlLevel, false, mlOpsEnabled, ignoreTransitions) logInfo(s"${path.eventLog.toString} has App: ${app.appId}") Right(app) } catch { diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala index ec5877adb..99a5200a4 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala @@ -76,7 +76,7 @@ class SQLPlanParserSuite extends BaseTestSuite { val pluginTypeChecker = new PluginTypeChecker() assert(allEventLogs.size == 1) val appResult = QualificationAppInfo.createApp(allEventLogs.head, hadoopConf, - pluginTypeChecker, reportSqlLevel = false, mlOpsEnabled = false) + pluginTypeChecker, reportSqlLevel = false, mlOpsEnabled = false, ignoreTransitions = false) appResult match { case Right(app) => app case Left(_) => throw new AssertionError("Cannot create application") From 2607489e47780c8e6fd5b096cf1a9330e476dd84 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Sun, 27 Aug 2023 19:04:54 -0700 Subject: [PATCH 07/16] addressed review comments and updated test results Signed-off-by: Niranjan Artal --- .../qualification/QualificationAppInfo.scala | 50 ++++++------------- .../jdbc_expectation.csv | 2 +- .../qual_test_simple_expectation.csv | 8 +-- .../qual_test_simple_expectation_persql.csv | 28 +++++------ .../read_dsv1_expectation.csv | 2 +- .../read_dsv2_expectation.csv | 2 +- .../spark2_expectation.csv | 2 +- .../truncated_1_end_expectation.csv | 2 +- 8 files changed, 37 insertions(+), 59 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index 468d34473..4833cdf92 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -55,7 +55,6 @@ class QualificationAppInfo( val stageIdToTaskEndSum: HashMap[Long, StageTaskQualificationSummary] = HashMap.empty[Long, StageTaskQualificationSummary] val stageIdToGpuCpuTransitions: HashMap[Int, Int] = HashMap.empty[Int, Int] - var execsNoStageTransitions: Int = 0 val stageIdToSqlID: HashMap[Int, Long] = HashMap.empty[Int, Long] val sqlIDtoFailures: HashMap[Long, ArrayBuffer[String]] = HashMap.empty[Long, ArrayBuffer[String]] @@ -152,8 +151,10 @@ class QualificationAppInfo( // Look at the total task times for all jobs/stages that aren't SQL or // SQL but dataset or rdd - private def calculateNonSQLTaskDataframeDuration(taskDFDuration: Long): Long = { - val allTaskTime = stageIdToTaskEndSum.values.map(_.totalTaskDuration).sum + private def calculateNonSQLTaskDataframeDuration( + taskDFDuration: Long, + totalTransitionTime: Long): Long = { + val allTaskTime = stageIdToTaskEndSum.values.map(_.totalTaskDuration).sum + totalTransitionTime val res = allTaskTime - taskDFDuration assert(res >= 0) res @@ -166,11 +167,11 @@ class QualificationAppInfo( } private def calculateSQLSupportedTaskDuration(all: Seq[StageQualSummaryInfo]): Long = { - all.map(s => s.stageTaskTime - s.unsupportedTaskDur).sum - calculateNoExecsStageDurations(all) + all.map(s => s.stageTaskTime - s.unsupportedTaskDur).sum } private def calculateSQLUnsupportedTaskDuration(all: Seq[StageQualSummaryInfo]): Long = { - all.map(_.unsupportedTaskDur).sum + calculateNoExecsStageDurations(all) + all.map(_.unsupportedTaskDur).sum } private def calculateSpeedupFactor(all: Seq[StageQualSummaryInfo]): Double = { @@ -179,23 +180,6 @@ class QualificationAppInfo( res } - private def calculateNoExecsStageDurations(all: Seq[StageQualSummaryInfo]): Long = { - // If there are Execs not associated with any stage, then some of the Execs may not be - // supported on GPU. We need to estimate the duration of these Execs and add it to the - // unsupportedTaskDur. We estimate the duration by taking the average of the unsupportedTaskDur - // of all the stages and multiplying it by the number of Execs that are not associated with - // any stage. We multiply with a penalty factor of 0.05 - // TODO: Need to come up with better heuristics for penalty factor. - val unsupportedTasksize= all.map(_.unsupportedTaskDur).size - if (execsNoStageTransitions != 0 && unsupportedTasksize != 0) { - execsNoStageTransitions * ( - all.map(_.unsupportedTaskDur).sum / unsupportedTasksize) * 0.05 - }.toLong - else { - 0L - } - } - private def getAllReadFileFormats: Seq[String] = { dataSourceInfo.map { ds => s"${ds.format.toLowerCase()}[${ds.schema}]" @@ -267,7 +251,6 @@ class QualificationAppInfo( case false => stageIdToGpuCpuTransitions.getOrElse(stageId, 0) case true => 0 } - // val numTransitions = stageIdToGpuCpuTransitions.getOrElse(stageId, 0) val transitionsTime = numTransitions match { case 0 => 0L // no transitions case gpuCpuTransitions if gpuCpuTransitions > 0 => @@ -286,13 +269,8 @@ class QualificationAppInfo( } case _ => 0L } - // Update totaltaskduration of stageIdToTaskEndSum to include transitions time - val stageIdToTasksMetrics = stageIdToTaskEndSum.get(stageId).orElse(None) - if (stageIdToTasksMetrics.isDefined) { - stageIdToTasksMetrics.get.totalTaskDuration += transitionsTime - } StageQualSummaryInfo(stageId, allSpeedupFactorAvg, stageTaskTime, - eachStageUnsupported + transitionsTime, estimated, numTransitions) + eachStageUnsupported + transitionsTime, numTransitions, transitionsTime, estimated) }.toSet } @@ -333,9 +311,6 @@ class QualificationAppInfo( } stageIdToGpuCpuTransitions(stageId) = transitions } - if (execsNoStage.nonEmpty) { - execsNoStageTransitions += execsNoStage.filterNot(exec => exec.isSupported).size - } if (allStagesToExecs.isEmpty) { // use job level // also get the job ids associated with the SQLId @@ -516,11 +491,12 @@ class QualificationAppInfo( val allStagesSummary = perSqlStageSummary.flatMap(_.stageSum) .map(sum => sum.stageId -> sum).toMap.values.toSeq val sqlDataframeTaskDuration = allStagesSummary.map(s => s.stageTaskTime).sum + val totalTransitionsTime = allStagesSummary.map(s=> s.transitionTime).sum val unsupportedSQLTaskDuration = calculateSQLUnsupportedTaskDuration(allStagesSummary) val endDurationEstimated = this.appEndTime.isEmpty && appDuration > 0 val jobOverheadTime = calculateJobOverHeadTime(info.startTime) val nonSQLDataframeTaskDuration = - calculateNonSQLTaskDataframeDuration(sqlDataframeTaskDuration) + calculateNonSQLTaskDataframeDuration(sqlDataframeTaskDuration, totalTransitionsTime) val nonSQLTaskDuration = nonSQLDataframeTaskDuration + jobOverheadTime // note that these ratios are based off the stage times which may be missing some stage // overhead or execs that didn't have associated stages @@ -577,7 +553,8 @@ class QualificationAppInfo( // get the ratio based on the Task durations that we will use for wall clock durations val estimatedGPURatio = if (sqlDataframeTaskDuration > 0) { - supportedSQLTaskDuration.toDouble / sqlDataframeTaskDuration.toDouble + supportedSQLTaskDuration.toDouble / ( + sqlDataframeTaskDuration.toDouble + totalTransitionsTime.toDouble) } else { 1 } @@ -825,8 +802,9 @@ case class StageQualSummaryInfo( averageSpeedup: Double, stageTaskTime: Long, unsupportedTaskDur: Long, - estimated: Boolean = false, - numTransitions: Int) + numTransitions: Int, + transitionTime: Long, + estimated: Boolean = false) object QualificationAppInfo extends Logging { // define recommendation constants diff --git a/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv b/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv index 8baceabd6..eb8a72b10 100644 --- a/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","app-20211019113801-0001","Not Recommended",1.0,569387.57,2579.42,3627,19894,571967,3500,28.41,"","JDBC[*]","","","","",1812,544575,693,19201,3.8,false,"Scan JDBCRelation(TBLS) [numPartitions=1];Execute CreateViewCommand;CollectLimit","",30 +"Spark shell","app-20211019113801-0001","Not Recommended",1.0,569385.42,2581.57,3627,19894,571967,3503,28.41,"","JDBC[*]","","","","",1812,544575,677,19217,3.8,false,"Scan JDBCRelation(TBLS) [numPartitions=1];Execute CreateViewCommand;CollectLimit","",30 diff --git a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv index a9a0c66a4..f43f88c8d 100644 --- a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv @@ -1,5 +1,5 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Recommended",1.92,8477.87,7841.12,12434,132257,16319,10582,37.7,"","","JSON","","","",7143,4717,19691,112566,3.86,false,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",1 -"Spark shell","local-1651187225439","Not Recommended",1.0,355490.83,146.16,760,180,355637,333,87.88,"","JSON[string:bigint:int]","","","","",498,343411,101,79,1.78,false,"SerializeFromObject;CollectLimit;DeserializeToObject;Scan json;Filter;MapElements","",1 -"Spark shell","local-1651188809790","Not Recommended",1.0,166213.92,1.07,911,283,166215,3,81.18,"","JSON[string:bigint:int]","","","","UDF",715,133608,282,1,1.5,false,"CollectLimit;Scan json;Project","UDF",1 -"Rapids Spark Profiling Tool Unit Tests","local-1623281204390","Not Recommended",1.0,6240.0,0.0,2032,4666,6240,-151,46.27,"","JSON[string:bigint:int]","JSON","","","UDF",1209,5793,5013,-347,1.0,false,"Execute InsertIntoHadoopFsRelationCommand json;LocalTableScan;Project;Scan json;Execute CreateViewCommand","UDF",1 +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Recommended",1.92,8472.65,7846.34,12434,132257,16319,10589,37.7,"","","JSON","","","",7143,4717,19616,112641,3.86,false,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",1 +"Spark shell","local-1651187225439","Not Recommended",1.0,355483.43,153.56,760,180,355637,350,87.88,"","JSON[string:bigint:int]","","","","",498,343411,97,83,1.78,false,"SerializeFromObject;CollectLimit;DeserializeToObject;Scan json;Filter;MapElements","",1 +"Spark shell","local-1651188809790","Not Recommended",1.0,166199.97,15.02,911,283,166215,45,81.18,"","JSON[string:bigint:int]","","","","UDF",715,133608,269,14,1.5,false,"CollectLimit;Scan json;Project","UDF",1 +"Rapids Spark Profiling Tool Unit Tests","local-1623281204390","Not Recommended",1.0,6240.0,0.0,2032,4666,6240,0,46.27,"","JSON[string:bigint:int]","JSON","","","UDF",1209,5793,4664,2,1.0,false,"Execute InsertIntoHadoopFsRelationCommand json;LocalTableScan;Project;Scan json;Execute CreateViewCommand","UDF",1 diff --git a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation_persql.csv b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation_persql.csv index 7d4667863..33a0ceccd 100644 --- a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation_persql.csv +++ b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation_persql.csv @@ -1,18 +1,18 @@ App Name,App ID,SQL ID,SQL Description,SQL DF Duration,GPU Opportunity,Estimated GPU Duration,Estimated GPU Speedup,Estimated GPU Time Saved,Recommendation -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",1,"count at QualificationInfoUtils.scala:94",7143,6714,2082.44,3.43,5060.55,"Strongly Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",3,"count at QualificationInfoUtils.scala:94",2052,1655,804.22,2.55,1247.77,"Strongly Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",2,"count at QualificationInfoUtils.scala:94",1933,1546,767.5,2.51,1165.49,"Strongly Recommended" -"Spark shell","local-1651188809790",1,"show at :26",196,90,150.76,1.3,45.23,"Recommended" -"Spark shell","local-1651187225439",0,"show at :26",498,226,384.81,1.29,113.18,"Not Recommended" -"Spark shell","local-1651187225439",1,"show at :26",262,40,247.69,1.05,14.3,"Not Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",0,"json at QualificationInfoUtils.scala:76",1306,131,1264.57,1.03,41.42,"Not Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",0,"json at QualificationInfoUtils.scala:130",1209,-543,1209.0,1.0,0.0,"Not Recommended" -"Spark shell","local-1651188809790",0,"show at :26",715,-66,715.0,1.0,0.0,"Not Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",2,"json at QualificationInfoUtils.scala:136",321,-144,321.0,1.0,0.0,"Not Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",5,"json at QualificationInfoUtils.scala:136",129,-57,129.0,1.0,0.0,"Not Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",8,"json at QualificationInfoUtils.scala:136",127,-56,127.0,1.0,0.0,"Not Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",3,"json at QualificationInfoUtils.scala:130",108,-48,108.0,1.0,0.0,"Not Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",1,"count at QualificationInfoUtils.scala:94",7143,6719,2078.49,3.43,5064.5,"Strongly Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",3,"count at QualificationInfoUtils.scala:94",2052,1660,800.56,2.56,1251.43,"Strongly Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",2,"count at QualificationInfoUtils.scala:94",1933,1551,763.96,2.53,1169.03,"Strongly Recommended" +"Spark shell","local-1651187225439",0,"show at :26",498,249,373.5,1.33,124.5,"Recommended" +"Spark shell","local-1651188809790",1,"show at :26",196,98,147.0,1.33,49.0,"Recommended" +"Spark shell","local-1651187225439",1,"show at :26",262,60,240.54,1.08,21.45,"Not Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",0,"json at QualificationInfoUtils.scala:76",1306,187,1246.97,1.04,59.02,"Not Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",0,"json at QualificationInfoUtils.scala:130",1209,0,1209.0,1.0,0.0,"Not Recommended" +"Spark shell","local-1651188809790",0,"show at :26",715,2,715.0,1.0,0.0,"Not Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",2,"json at QualificationInfoUtils.scala:136",321,0,321.0,1.0,0.0,"Not Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",5,"json at QualificationInfoUtils.scala:136",129,0,129.0,1.0,0.0,"Not Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",8,"json at QualificationInfoUtils.scala:136",127,0,127.0,1.0,0.0,"Not Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",6,"json at QualificationInfoUtils.scala:130",110,0,110.0,1.0,0.0,"Not Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",3,"json at QualificationInfoUtils.scala:130",108,0,108.0,1.0,0.0,"Not Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",4,"createOrReplaceTempView at QualificationInfoUtils.scala:133",22,22,22.0,1.0,0.0,"Not Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",7,"createOrReplaceTempView at QualificationInfoUtils.scala:133",4,4,4.0,1.0,0.0,"Not Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",1,"createOrReplaceTempView at QualificationInfoUtils.scala:133",2,2,2.0,1.0,0.0,"Not Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",6,"json at QualificationInfoUtils.scala:130",110,-49,110.0,0.99,-0.01,"Not Recommended" diff --git a/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv b/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv index 2dfc1fe19..f624cc260 100644 --- a/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1624371544219","Not Recommended",1.0,174808.87,484.12,6695,20421,175293,832,72.15,"","JSON[string:double:date:int:bigint];Text[*]","JSON","","","",1859,175857,17882,2539,2.39,false,"CollectLimit;Scan json;Execute InsertIntoHadoopFsRelationCommand json;Scan text","",30 \ No newline at end of file +"Spark shell","local-1624371544219","Not Recommended",1.0,174691.42,601.57,6695,20421,175293,1034,72.15,"","JSON[string:double:date:int:bigint];Text[*]","JSON","","","",1859,175857,17266,3155,2.39,false,"CollectLimit;Scan json;Execute InsertIntoHadoopFsRelationCommand json;Scan text","",30 diff --git a/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv b/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv index 2802a08ce..5cb64b660 100644 --- a/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1624371906627","Not Recommended",1.0,83316.93,421.06,6760,21802,83738,723,71.3,"","Text[*];json[double]","JSON","","","",1984,82505,19467,2335,2.39,false,"BatchScan json;Execute InsertIntoHadoopFsRelationCommand json;CollectLimit;Scan text","",30 +"Spark shell","local-1624371906627","Not Recommended",1.0,83172.84,565.15,6760,21802,83738,971,71.3,"","Text[*];json[double]","JSON","","","",1984,82505,18668,3134,2.39,false,"BatchScan json;Execute InsertIntoHadoopFsRelationCommand json;CollectLimit;Scan text","",30 diff --git a/core/src/test/resources/QualificationExpectations/spark2_expectation.csv b/core/src/test/resources/QualificationExpectations/spark2_expectation.csv index 9d4bfcbc2..7510bbb42 100644 --- a/core/src/test/resources/QualificationExpectations/spark2_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/spark2_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1634253215009","Not Recommended",1.01,46361.16,701.83,1520,359,47063,999,67.64,"","Text[*]","","","","",1068,44935,123,236,3.36,false,"CollectLimit;Scan text","",30 +"Spark shell","local-1634253215009","Not Recommended",1.01,46352.24,710.75,1520,359,47063,1011,67.64,"","Text[*]","","","","",1068,44935,120,239,3.36,false,"CollectLimit;Scan text","",30 diff --git a/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv b/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv index 9c6067163..07d08a0c3 100644 --- a/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Not Recommended",1.08,4484.54,387.45,1306,14353,4872,548,62.67,"","","JSON","","","",1306,4477,8328,6025,3.41,true,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",30 +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Not Recommended",1.09,4468.98,403.01,1306,14353,4872,570,62.67,"","","JSON","","","",1306,4477,8086,6267,3.41,true,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",30 From 2ba211a9c104fc041dc02e78f4d4628ccdde5343 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Sun, 8 Oct 2023 18:26:32 -0700 Subject: [PATCH 08/16] Address review comments Signed-off-by: Niranjan Artal --- .../qualification/QualificationAppInfo.scala | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index 4833cdf92..322fc04e4 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -259,18 +259,26 @@ class QualificationAppInfo( // spilled to disk. // Duration in Spark metrics is in milliseconds and CPU-GPU transfer rate is in bytes/sec. // So we need to convert the transitions time to milliseconds. - val totalBytesRead = + val totalBytesRead = { stageIdToTaskEndSum.get(stageId).map(_.totalbytesRead).getOrElse(0L) + } if (totalBytesRead > 0) { - TimeUnit.SECONDS.toMillis( - totalBytesRead / QualificationAppInfo.CPU_GPU_TRANSFER_RATE) * gpuCpuTransitions + (TimeUnit.SECONDS.toMillis(totalBytesRead / + QualificationAppInfo.CPU_GPU_TRANSFER_RATE) * gpuCpuTransitions) } else { 0L } + case _ => 0L } + val finalEachStageUnsupported = if (transitionsTime != 0) { + (allStageTaskTime * numUnsupported.size / allFlattenedExecs.size.toDouble).toLong + } else { + eachStageUnsupported + } + StageQualSummaryInfo(stageId, allSpeedupFactorAvg, stageTaskTime, - eachStageUnsupported + transitionsTime, numTransitions, transitionsTime, estimated) + finalEachStageUnsupported, numTransitions, transitionsTime, estimated) }.toSet } @@ -365,7 +373,7 @@ class QualificationAppInfo( val numUnsupportedExecs = execInfos.filterNot(_.isSupported).size // This is a guestimate at how much wall clock was supported val numExecs = execInfos.size.toDouble - val numSupportedExecs = (numExecs - numUnsupportedExecs).toDouble + val numSupportedExecs = (numExecs - numUnsupportedExecs) val ratio = numSupportedExecs / numExecs val estimateWallclockSupported = (sqlWallClockDuration * ratio).toInt // don't worry about supported execs for these are these are mostly indicator of I/O @@ -816,11 +824,11 @@ object QualificationAppInfo extends Logging { val LOWER_BOUND_STRONGLY_RECOMMENDED = 2.5 // Below is the total time taken whenever there are ColumnarToRow or RowToColumnar transitions // This includes the time taken to convert the data from one format to another and the time taken - // to transfer the data from CPU to GPU and vice versa. Current transfer rate is 10MB/s and is - // based on the testing on few eventlogs. + // to transfer the data from CPU to GPU and vice versa. Current transfer rate is 1GB/s and is + // based on the testing on few candidate eventlogs. // TODO: Need to test this on more eventlogs including NDS queries // and come up with a better transfer rate. - val CPU_GPU_TRANSFER_RATE = 10000000L + val CPU_GPU_TRANSFER_RATE = 1000000000L private def handleException(e: Exception, path: EventLogInfo): String = { val message: String = e match { From 8dfa245ceb6e6f33a12789d9781413276b1a2995 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Tue, 10 Oct 2023 15:04:04 -0700 Subject: [PATCH 09/16] address review comments --- .../rapids/tool/qualification/QualOutputWriter.scala | 2 +- .../tool/qualification/QualificationArgs.scala | 12 +++++++----- .../tool/qualification/QualificationMain.scala | 2 +- .../rapids/tool/planparser/SqlPlanParserSuite.scala | 2 +- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala index 704136422..d4eff43fc 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala @@ -391,7 +391,7 @@ object QualOutputWriter { val ESTIMATED_GPU_SPEEDUP = "Estimated GPU Speedup" val ESTIMATED_GPU_TIMESAVED = "Estimated GPU Time Saved" val STAGE_ESTIMATED_STR = "Stage Estimated" - val NUM_TRANSITIONS = "Number of CPU-GPU Transitions" + val NUM_TRANSITIONS = "Number of transitions from or to GPU" val UNSUPPORTED_EXECS = "Unsupported Execs" val UNSUPPORTED_EXPRS = "Unsupported Expressions" val CLUSTER_TAGS = "Cluster Tags" diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala index d4f359061..883f48478 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala @@ -112,11 +112,13 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* opt[Boolean](required = false, descr = "Whether to parse ML functions in the eventlogs. Default is false.", default = Some(false)) - val ignoreTransitions: ScallopOption[Boolean] = - opt[Boolean](required = false, - descr = "Whether to ignore durations for ColumnarToRow and RowToColumnar transitions " + - "in the eventlogs while calculating the speedup. Default is false.", - default = Some(false)) + val penalizeTransitions: ScallopOption[Boolean] = + toggle("penalize-transitions", + default = Some(true), + prefix = "no-", + descrYes = "Add penalty for ColumnarToRow and RowToColumnar transitions. " + + "Enabled by default.", + descrNo = "Do not add penalty for ColumnarToRow and RowToColumnar transitions.") val sparkProperty: ScallopOption[List[String]] = opt[List[String]](required = false, descr = "Filter applications based on certain Spark properties that were set during " + diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala index 905a62972..3de5826b8 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala @@ -60,7 +60,7 @@ object QualificationMain extends Logging { val reportSqlLevel = appArgs.perSql.getOrElse(false) val platform = appArgs.platform.getOrElse("onprem") val mlOpsEnabled = appArgs.mlFunctions.getOrElse(false) - val ignoreTransitions = appArgs.ignoreTransitions.getOrElse(false) + val ignoreTransitions = appArgs.penalizeTransitions.getOrElse(true) val hadoopConf = RapidsToolsConfUtil.newHadoopConf diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala index efe51e5d6..f8d372995 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala @@ -76,7 +76,7 @@ class SQLPlanParserSuite extends BaseTestSuite { val pluginTypeChecker = new PluginTypeChecker() assert(allEventLogs.size == 1) val appResult = QualificationAppInfo.createApp(allEventLogs.head, hadoopConf, - pluginTypeChecker, reportSqlLevel = false, mlOpsEnabled = false, ignoreTransitions = false) + pluginTypeChecker, reportSqlLevel = false, mlOpsEnabled = false, penalizeTransitions = true) appResult match { case Right(app) => app case Left(_) => throw new AssertionError("Cannot create application") From 7e1ebe032e9430eacda1587fbfb854b750dd8480 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Tue, 10 Oct 2023 18:28:38 -0700 Subject: [PATCH 10/16] update tests --- .../qualification/QualificationAppInfo.scala | 42 ++++++++++++------- .../db_sim_test_expectation.csv | 2 +- .../directory_test_expectation.csv | 2 +- .../jdbc_expectation.csv | 2 +- .../qual_test_missing_sql_end_expectation.csv | 2 +- .../qual_test_simple_expectation.csv | 6 +-- .../qual_test_simple_expectation_persql.csv | 14 +++---- .../read_dsv1_expectation.csv | 2 +- .../read_dsv2_expectation.csv | 2 +- .../spark2_expectation.csv | 2 +- .../truncated_1_end_expectation.csv | 2 +- .../qualification/QualificationSuite.scala | 2 +- 12 files changed, 47 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index 322fc04e4..a5cacb675 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -40,7 +40,7 @@ class QualificationAppInfo( reportSqlLevel: Boolean, perSqlOnly: Boolean = false, mlOpsEnabled: Boolean = false, - ignoreTransitions: Boolean = false) + penalizeTransitions: Boolean = true) extends AppBase(eventLogInfo, hadoopConf) with Logging { var appId: String = "" @@ -247,13 +247,13 @@ class QualificationAppInfo( stages.map { stageId => val stageTaskTime = stageIdToTaskEndSum.get(stageId) .map(_.totalTaskDuration).getOrElse(0L) - val numTransitions = ignoreTransitions match { - case false => stageIdToGpuCpuTransitions.getOrElse(stageId, 0) - case true => 0 + val numTransitions = penalizeTransitions match { + case true => stageIdToGpuCpuTransitions.getOrElse(stageId, 0) + case false => 0 } val transitionsTime = numTransitions match { case 0 => 0L // no transitions - case gpuCpuTransitions if gpuCpuTransitions > 0 => + case gpuCpuTransitions => // Duration to transfer data from GPU to CPU and vice versa. // Assuming it's a PCI-E Gen3, but also assuming that some of the result could be // spilled to disk. @@ -271,22 +271,27 @@ class QualificationAppInfo( case _ => 0L } + // Add additional time to unsupportedDurations to account for the transitions time. val finalEachStageUnsupported = if (transitionsTime != 0) { - (allStageTaskTime * numUnsupported.size / allFlattenedExecs.size.toDouble).toLong + allStageTaskTime / allFlattenedExecs.size + eachStageUnsupported + transitionsTime } else { eachStageUnsupported } + // If we have unsupported execs, then there would be transtions between CPU-GPU, + // we want to reduce the speedup factor by the ratio of unsupported execs to total execs. + val updatedSpeedupFactor = if (numTransitions > 0) { + val unsupportedExecRatio = numUnsupported.size / allFlattenedExecs.size.toDouble + math.min(allSpeedupFactorAvg, allSpeedupFactorAvg * (1 - unsupportedExecRatio)) + } else { + allSpeedupFactorAvg + } - StageQualSummaryInfo(stageId, allSpeedupFactorAvg, stageTaskTime, + StageQualSummaryInfo(stageId, updatedSpeedupFactor, stageTaskTime, finalEachStageUnsupported, numTransitions, transitionsTime, estimated) }.toSet } - def summarizeStageLevel(execInfos: Seq[ExecInfo], sqlID: Long): Set[StageQualSummaryInfo] = { - val (allStagesToExecs, execsNoStage) = getStageToExec(execInfos) - - // Get the total number of transitions between CPU and GPU for each stage and - // store it in a Map. + private def setNumberOfTransitions(allStagesToExecs: Map[Int, Seq[ExecInfo]]): Unit = { allStagesToExecs.foreach { case (stageId, execs) => // Flatten all the Execs within a stage. // Example: Exchange;WholeStageCodegen (14);Exchange;WholeStageCodegen (13);Exchange @@ -319,6 +324,15 @@ class QualificationAppInfo( } stageIdToGpuCpuTransitions(stageId) = transitions } + } + + def summarizeStageLevel(execInfos: Seq[ExecInfo], sqlID: Long): Set[StageQualSummaryInfo] = { + val (allStagesToExecs, execsNoStage) = getStageToExec(execInfos) + + // Get the total number of transitions between CPU and GPU for each stage and + // store it in a Map. + setNumberOfTransitions(allStagesToExecs) + if (allStagesToExecs.isEmpty) { // use job level // also get the job ids associated with the SQLId @@ -922,10 +936,10 @@ object QualificationAppInfo extends Logging { pluginTypeChecker: PluginTypeChecker, reportSqlLevel: Boolean, mlOpsEnabled: Boolean, - ignoreTransitions: Boolean): Either[String, QualificationAppInfo] = { + penalizeTransitions: Boolean): Either[String, QualificationAppInfo] = { try { val app = new QualificationAppInfo(Some(path), Some(hadoopConf), pluginTypeChecker, - reportSqlLevel, false, mlOpsEnabled, ignoreTransitions) + reportSqlLevel, false, mlOpsEnabled, penalizeTransitions) logInfo(s"${path.eventLog.toString} has App: ${app.appId}") Right(app) } catch { diff --git a/core/src/test/resources/QualificationExpectations/db_sim_test_expectation.csv b/core/src/test/resources/QualificationExpectations/db_sim_test_expectation.csv index a35973305..4387b19e2 100644 --- a/core/src/test/resources/QualificationExpectations/db_sim_test_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/db_sim_test_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1623876083964","Recommended",2.1,63691.95,70165.04,119903,1417661,133857,93094,91.14,"","","","","","",119903,9399,316964,1100697,4.05,false,"Scan;SerializeFromObject","",30 +"Spark shell","local-1623876083964","Recommended",1.96,68143.02,65713.97,119903,1417661,133857,93094,91.14,"","","","","","",119903,9399,316964,1100697,3.4,false,"Scan;SerializeFromObject","",30 diff --git a/core/src/test/resources/QualificationExpectations/directory_test_expectation.csv b/core/src/test/resources/QualificationExpectations/directory_test_expectation.csv index a35973305..4387b19e2 100644 --- a/core/src/test/resources/QualificationExpectations/directory_test_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/directory_test_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1623876083964","Recommended",2.1,63691.95,70165.04,119903,1417661,133857,93094,91.14,"","","","","","",119903,9399,316964,1100697,4.05,false,"Scan;SerializeFromObject","",30 +"Spark shell","local-1623876083964","Recommended",1.96,68143.02,65713.97,119903,1417661,133857,93094,91.14,"","","","","","",119903,9399,316964,1100697,3.4,false,"Scan;SerializeFromObject","",30 diff --git a/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv b/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv index eb8a72b10..3fbbbe489 100644 --- a/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","app-20211019113801-0001","Not Recommended",1.0,569385.42,2581.57,3627,19894,571967,3503,28.41,"","JDBC[*]","","","","",1812,544575,677,19217,3.8,false,"Scan JDBCRelation(TBLS) [numPartitions=1];Execute CreateViewCommand;CollectLimit","",30 +"Spark shell","app-20211019113801-0001","Not Recommended",1.0,569544.77,2422.22,3627,19894,571967,3503,28.41,"","JDBC[*]","","","","",1812,544575,677,19217,3.24,false,"Scan JDBCRelation(TBLS) [numPartitions=1];Execute CreateViewCommand;CollectLimit","",30 diff --git a/core/src/test/resources/QualificationExpectations/qual_test_missing_sql_end_expectation.csv b/core/src/test/resources/QualificationExpectations/qual_test_missing_sql_end_expectation.csv index 63a05f495..1fdc8242d 100644 --- a/core/src/test/resources/QualificationExpectations/qual_test_missing_sql_end_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/qual_test_missing_sql_end_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Rapids Spark Profiling Tool Unit Tests","local-1622561780883","Not Recommended",1.0,7673.0,0.0,0,40448,7673,0,55.94,"","","","","","",0,4003,8096,32352,4.05,false,"Scan;SerializeFromObject","",30 +"Rapids Spark Profiling Tool Unit Tests","local-1622561780883","Not Recommended",1.0,7673.0,0.0,0,40448,7673,0,55.94,"","","","","","",0,4003,8096,32352,3.4,false,"Scan;SerializeFromObject","",30 diff --git a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv index f43f88c8d..d9f813069 100644 --- a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv @@ -1,5 +1,5 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Recommended",1.92,8472.65,7846.34,12434,132257,16319,10589,37.7,"","","JSON","","","",7143,4717,19616,112641,3.86,false,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",1 -"Spark shell","local-1651187225439","Not Recommended",1.0,355483.43,153.56,760,180,355637,350,87.88,"","JSON[string:bigint:int]","","","","",498,343411,97,83,1.78,false,"SerializeFromObject;CollectLimit;DeserializeToObject;Scan json;Filter;MapElements","",1 -"Spark shell","local-1651188809790","Not Recommended",1.0,166199.97,15.02,911,283,166215,45,81.18,"","JSON[string:bigint:int]","","","","UDF",715,133608,269,14,1.5,false,"CollectLimit;Scan json;Project","UDF",1 +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Recommended",1.79,9080.38,7238.61,12434,132257,16319,10589,37.7,"","","JSON","","","",7143,4717,19616,112641,3.16,false,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",1 +"Spark shell","local-1651188809790","Not Recommended",1.0,166215.0,0.0,911,283,166215,45,81.18,"","JSON[string:bigint:int]","","","","UDF",715,133608,269,14,1.0,false,"CollectLimit;Scan json;Project","UDF",1 "Rapids Spark Profiling Tool Unit Tests","local-1623281204390","Not Recommended",1.0,6240.0,0.0,2032,4666,6240,0,46.27,"","JSON[string:bigint:int]","JSON","","","UDF",1209,5793,4664,2,1.0,false,"Execute InsertIntoHadoopFsRelationCommand json;LocalTableScan;Project;Scan json;Execute CreateViewCommand","UDF",1 +"Spark shell","local-1651187225439","Not Recommended",0.99,355842.81,-205.82,760,180,355637,350,87.88,"","JSON[string:bigint:int]","","","","",498,343411,97,83,0.63,false,"SerializeFromObject;CollectLimit;DeserializeToObject;Scan json;Filter;MapElements","",1 diff --git a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation_persql.csv b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation_persql.csv index 33a0ceccd..9fe87989b 100644 --- a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation_persql.csv +++ b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation_persql.csv @@ -1,14 +1,12 @@ App Name,App ID,SQL ID,SQL Description,SQL DF Duration,GPU Opportunity,Estimated GPU Duration,Estimated GPU Speedup,Estimated GPU Time Saved,Recommendation -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",1,"count at QualificationInfoUtils.scala:94",7143,6719,2078.49,3.43,5064.5,"Strongly Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",3,"count at QualificationInfoUtils.scala:94",2052,1660,800.56,2.56,1251.43,"Strongly Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",2,"count at QualificationInfoUtils.scala:94",1933,1551,763.96,2.53,1169.03,"Strongly Recommended" -"Spark shell","local-1651187225439",0,"show at :26",498,249,373.5,1.33,124.5,"Recommended" -"Spark shell","local-1651188809790",1,"show at :26",196,98,147.0,1.33,49.0,"Recommended" -"Spark shell","local-1651187225439",1,"show at :26",262,60,240.54,1.08,21.45,"Not Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",0,"json at QualificationInfoUtils.scala:76",1306,187,1246.97,1.04,59.02,"Not Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",1,"count at QualificationInfoUtils.scala:94",7143,6719,2399.76,2.97,4743.23,"Strongly Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",3,"count at QualificationInfoUtils.scala:94",2052,1660,879.95,2.33,1172.04,"Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",2,"count at QualificationInfoUtils.scala:94",1933,1551,838.12,2.3,1094.87,"Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",0,"json at QualificationInfoUtils.scala:130",1209,0,1209.0,1.0,0.0,"Not Recommended" "Spark shell","local-1651188809790",0,"show at :26",715,2,715.0,1.0,0.0,"Not Recommended" +"Spark shell","local-1651187225439",0,"show at :26",498,249,498.0,1.0,0.0,"Not Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",2,"json at QualificationInfoUtils.scala:136",321,0,321.0,1.0,0.0,"Not Recommended" +"Spark shell","local-1651188809790",1,"show at :26",196,98,196.0,1.0,0.0,"Not Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",5,"json at QualificationInfoUtils.scala:136",129,0,129.0,1.0,0.0,"Not Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",8,"json at QualificationInfoUtils.scala:136",127,0,127.0,1.0,0.0,"Not Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",6,"json at QualificationInfoUtils.scala:130",110,0,110.0,1.0,0.0,"Not Recommended" @@ -16,3 +14,5 @@ App Name,App ID,SQL ID,SQL Description,SQL DF Duration,GPU Opportunity,Estimated "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",4,"createOrReplaceTempView at QualificationInfoUtils.scala:133",22,22,22.0,1.0,0.0,"Not Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",7,"createOrReplaceTempView at QualificationInfoUtils.scala:133",4,4,4.0,1.0,0.0,"Not Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",1,"createOrReplaceTempView at QualificationInfoUtils.scala:133",2,2,2.0,1.0,0.0,"Not Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",0,"json at QualificationInfoUtils.scala:76",1306,187,2010.77,0.64,-704.78,"Not Recommended" +"Spark shell","local-1651187225439",1,"show at :26",262,60,434.08,0.6,-172.09,"Not Recommended" diff --git a/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv b/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv index f624cc260..88023a25c 100644 --- a/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1624371544219","Not Recommended",1.0,174691.42,601.57,6695,20421,175293,1034,72.15,"","JSON[string:double:date:int:bigint];Text[*]","JSON","","","",1859,175857,17266,3155,2.39,false,"CollectLimit;Scan json;Execute InsertIntoHadoopFsRelationCommand json;Scan text","",30 +"Spark shell","local-1624371544219","Not Recommended",1.0,174728.8,564.19,6695,20421,175293,1034,72.15,"","JSON[string:double:date:int:bigint];Text[*]","JSON","","","",1859,175857,17266,3155,2.2,false,"CollectLimit;Scan json;Execute InsertIntoHadoopFsRelationCommand json;Scan text","",30 diff --git a/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv b/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv index 5cb64b660..bfc7a8aad 100644 --- a/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1624371906627","Not Recommended",1.0,83172.84,565.15,6760,21802,83738,971,71.3,"","Text[*];json[double]","JSON","","","",1984,82505,18668,3134,2.39,false,"BatchScan json;Execute InsertIntoHadoopFsRelationCommand json;CollectLimit;Scan text","",30 +"Spark shell","local-1624371906627","Not Recommended",1.0,83207.96,530.03,6760,21802,83738,971,71.3,"","Text[*];json[double]","JSON","","","",1984,82505,18668,3134,2.2,false,"BatchScan json;Execute InsertIntoHadoopFsRelationCommand json;CollectLimit;Scan text","",30 diff --git a/core/src/test/resources/QualificationExpectations/spark2_expectation.csv b/core/src/test/resources/QualificationExpectations/spark2_expectation.csv index 7510bbb42..2e278308e 100644 --- a/core/src/test/resources/QualificationExpectations/spark2_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/spark2_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1634253215009","Not Recommended",1.01,46352.24,710.75,1520,359,47063,1011,67.64,"","Text[*]","","","","",1068,44935,120,239,3.36,false,"CollectLimit;Scan text","",30 +"Spark shell","local-1634253215009","Not Recommended",1.01,46400.01,662.98,1520,359,47063,1011,67.64,"","Text[*]","","","","",1068,44935,120,239,2.9,false,"CollectLimit;Scan text","",30 diff --git a/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv b/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv index 07d08a0c3..0b6336f12 100644 --- a/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Not Recommended",1.09,4468.98,403.01,1306,14353,4872,570,62.67,"","","JSON","","","",1306,4477,8086,6267,3.41,true,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",30 +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Not Recommended",1.08,4491.83,380.16,1306,14353,4872,570,62.67,"","","JSON","","","",1306,4477,8086,6267,3.0,true,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",30 diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala index 502ddffc1..e314e98ac 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala @@ -288,7 +288,7 @@ class QualificationSuite extends BaseTestSuite { assert(lines.size == (4 + 4)) // skip the 3 header lines val firstRow = lines(3) - assert(firstRow.contains("local-1623281204390")) + assert(firstRow.contains("local-1651187225439")) } finally { inputSource.close() } From 132605f358fc92cb80af697bf1405f2eba3e6acc Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Wed, 11 Oct 2023 15:33:34 -0700 Subject: [PATCH 11/16] Revert "update tests" This reverts commit 7e1ebe032e9430eacda1587fbfb854b750dd8480. --- .../qualification/QualificationAppInfo.scala | 42 +++++++------------ .../db_sim_test_expectation.csv | 2 +- .../directory_test_expectation.csv | 2 +- .../jdbc_expectation.csv | 2 +- .../qual_test_missing_sql_end_expectation.csv | 2 +- .../qual_test_simple_expectation.csv | 6 +-- .../qual_test_simple_expectation_persql.csv | 14 +++---- .../read_dsv1_expectation.csv | 2 +- .../read_dsv2_expectation.csv | 2 +- .../spark2_expectation.csv | 2 +- .../truncated_1_end_expectation.csv | 2 +- .../qualification/QualificationSuite.scala | 2 +- 12 files changed, 33 insertions(+), 47 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index a5cacb675..322fc04e4 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -40,7 +40,7 @@ class QualificationAppInfo( reportSqlLevel: Boolean, perSqlOnly: Boolean = false, mlOpsEnabled: Boolean = false, - penalizeTransitions: Boolean = true) + ignoreTransitions: Boolean = false) extends AppBase(eventLogInfo, hadoopConf) with Logging { var appId: String = "" @@ -247,13 +247,13 @@ class QualificationAppInfo( stages.map { stageId => val stageTaskTime = stageIdToTaskEndSum.get(stageId) .map(_.totalTaskDuration).getOrElse(0L) - val numTransitions = penalizeTransitions match { - case true => stageIdToGpuCpuTransitions.getOrElse(stageId, 0) - case false => 0 + val numTransitions = ignoreTransitions match { + case false => stageIdToGpuCpuTransitions.getOrElse(stageId, 0) + case true => 0 } val transitionsTime = numTransitions match { case 0 => 0L // no transitions - case gpuCpuTransitions => + case gpuCpuTransitions if gpuCpuTransitions > 0 => // Duration to transfer data from GPU to CPU and vice versa. // Assuming it's a PCI-E Gen3, but also assuming that some of the result could be // spilled to disk. @@ -271,27 +271,22 @@ class QualificationAppInfo( case _ => 0L } - // Add additional time to unsupportedDurations to account for the transitions time. val finalEachStageUnsupported = if (transitionsTime != 0) { - allStageTaskTime / allFlattenedExecs.size + eachStageUnsupported + transitionsTime + (allStageTaskTime * numUnsupported.size / allFlattenedExecs.size.toDouble).toLong } else { eachStageUnsupported } - // If we have unsupported execs, then there would be transtions between CPU-GPU, - // we want to reduce the speedup factor by the ratio of unsupported execs to total execs. - val updatedSpeedupFactor = if (numTransitions > 0) { - val unsupportedExecRatio = numUnsupported.size / allFlattenedExecs.size.toDouble - math.min(allSpeedupFactorAvg, allSpeedupFactorAvg * (1 - unsupportedExecRatio)) - } else { - allSpeedupFactorAvg - } - StageQualSummaryInfo(stageId, updatedSpeedupFactor, stageTaskTime, + StageQualSummaryInfo(stageId, allSpeedupFactorAvg, stageTaskTime, finalEachStageUnsupported, numTransitions, transitionsTime, estimated) }.toSet } - private def setNumberOfTransitions(allStagesToExecs: Map[Int, Seq[ExecInfo]]): Unit = { + def summarizeStageLevel(execInfos: Seq[ExecInfo], sqlID: Long): Set[StageQualSummaryInfo] = { + val (allStagesToExecs, execsNoStage) = getStageToExec(execInfos) + + // Get the total number of transitions between CPU and GPU for each stage and + // store it in a Map. allStagesToExecs.foreach { case (stageId, execs) => // Flatten all the Execs within a stage. // Example: Exchange;WholeStageCodegen (14);Exchange;WholeStageCodegen (13);Exchange @@ -324,15 +319,6 @@ class QualificationAppInfo( } stageIdToGpuCpuTransitions(stageId) = transitions } - } - - def summarizeStageLevel(execInfos: Seq[ExecInfo], sqlID: Long): Set[StageQualSummaryInfo] = { - val (allStagesToExecs, execsNoStage) = getStageToExec(execInfos) - - // Get the total number of transitions between CPU and GPU for each stage and - // store it in a Map. - setNumberOfTransitions(allStagesToExecs) - if (allStagesToExecs.isEmpty) { // use job level // also get the job ids associated with the SQLId @@ -936,10 +922,10 @@ object QualificationAppInfo extends Logging { pluginTypeChecker: PluginTypeChecker, reportSqlLevel: Boolean, mlOpsEnabled: Boolean, - penalizeTransitions: Boolean): Either[String, QualificationAppInfo] = { + ignoreTransitions: Boolean): Either[String, QualificationAppInfo] = { try { val app = new QualificationAppInfo(Some(path), Some(hadoopConf), pluginTypeChecker, - reportSqlLevel, false, mlOpsEnabled, penalizeTransitions) + reportSqlLevel, false, mlOpsEnabled, ignoreTransitions) logInfo(s"${path.eventLog.toString} has App: ${app.appId}") Right(app) } catch { diff --git a/core/src/test/resources/QualificationExpectations/db_sim_test_expectation.csv b/core/src/test/resources/QualificationExpectations/db_sim_test_expectation.csv index 4387b19e2..a35973305 100644 --- a/core/src/test/resources/QualificationExpectations/db_sim_test_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/db_sim_test_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1623876083964","Recommended",1.96,68143.02,65713.97,119903,1417661,133857,93094,91.14,"","","","","","",119903,9399,316964,1100697,3.4,false,"Scan;SerializeFromObject","",30 +"Spark shell","local-1623876083964","Recommended",2.1,63691.95,70165.04,119903,1417661,133857,93094,91.14,"","","","","","",119903,9399,316964,1100697,4.05,false,"Scan;SerializeFromObject","",30 diff --git a/core/src/test/resources/QualificationExpectations/directory_test_expectation.csv b/core/src/test/resources/QualificationExpectations/directory_test_expectation.csv index 4387b19e2..a35973305 100644 --- a/core/src/test/resources/QualificationExpectations/directory_test_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/directory_test_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1623876083964","Recommended",1.96,68143.02,65713.97,119903,1417661,133857,93094,91.14,"","","","","","",119903,9399,316964,1100697,3.4,false,"Scan;SerializeFromObject","",30 +"Spark shell","local-1623876083964","Recommended",2.1,63691.95,70165.04,119903,1417661,133857,93094,91.14,"","","","","","",119903,9399,316964,1100697,4.05,false,"Scan;SerializeFromObject","",30 diff --git a/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv b/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv index 3fbbbe489..eb8a72b10 100644 --- a/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","app-20211019113801-0001","Not Recommended",1.0,569544.77,2422.22,3627,19894,571967,3503,28.41,"","JDBC[*]","","","","",1812,544575,677,19217,3.24,false,"Scan JDBCRelation(TBLS) [numPartitions=1];Execute CreateViewCommand;CollectLimit","",30 +"Spark shell","app-20211019113801-0001","Not Recommended",1.0,569385.42,2581.57,3627,19894,571967,3503,28.41,"","JDBC[*]","","","","",1812,544575,677,19217,3.8,false,"Scan JDBCRelation(TBLS) [numPartitions=1];Execute CreateViewCommand;CollectLimit","",30 diff --git a/core/src/test/resources/QualificationExpectations/qual_test_missing_sql_end_expectation.csv b/core/src/test/resources/QualificationExpectations/qual_test_missing_sql_end_expectation.csv index 1fdc8242d..63a05f495 100644 --- a/core/src/test/resources/QualificationExpectations/qual_test_missing_sql_end_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/qual_test_missing_sql_end_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Rapids Spark Profiling Tool Unit Tests","local-1622561780883","Not Recommended",1.0,7673.0,0.0,0,40448,7673,0,55.94,"","","","","","",0,4003,8096,32352,3.4,false,"Scan;SerializeFromObject","",30 +"Rapids Spark Profiling Tool Unit Tests","local-1622561780883","Not Recommended",1.0,7673.0,0.0,0,40448,7673,0,55.94,"","","","","","",0,4003,8096,32352,4.05,false,"Scan;SerializeFromObject","",30 diff --git a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv index d9f813069..f43f88c8d 100644 --- a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv @@ -1,5 +1,5 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Recommended",1.79,9080.38,7238.61,12434,132257,16319,10589,37.7,"","","JSON","","","",7143,4717,19616,112641,3.16,false,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",1 -"Spark shell","local-1651188809790","Not Recommended",1.0,166215.0,0.0,911,283,166215,45,81.18,"","JSON[string:bigint:int]","","","","UDF",715,133608,269,14,1.0,false,"CollectLimit;Scan json;Project","UDF",1 +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Recommended",1.92,8472.65,7846.34,12434,132257,16319,10589,37.7,"","","JSON","","","",7143,4717,19616,112641,3.86,false,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",1 +"Spark shell","local-1651187225439","Not Recommended",1.0,355483.43,153.56,760,180,355637,350,87.88,"","JSON[string:bigint:int]","","","","",498,343411,97,83,1.78,false,"SerializeFromObject;CollectLimit;DeserializeToObject;Scan json;Filter;MapElements","",1 +"Spark shell","local-1651188809790","Not Recommended",1.0,166199.97,15.02,911,283,166215,45,81.18,"","JSON[string:bigint:int]","","","","UDF",715,133608,269,14,1.5,false,"CollectLimit;Scan json;Project","UDF",1 "Rapids Spark Profiling Tool Unit Tests","local-1623281204390","Not Recommended",1.0,6240.0,0.0,2032,4666,6240,0,46.27,"","JSON[string:bigint:int]","JSON","","","UDF",1209,5793,4664,2,1.0,false,"Execute InsertIntoHadoopFsRelationCommand json;LocalTableScan;Project;Scan json;Execute CreateViewCommand","UDF",1 -"Spark shell","local-1651187225439","Not Recommended",0.99,355842.81,-205.82,760,180,355637,350,87.88,"","JSON[string:bigint:int]","","","","",498,343411,97,83,0.63,false,"SerializeFromObject;CollectLimit;DeserializeToObject;Scan json;Filter;MapElements","",1 diff --git a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation_persql.csv b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation_persql.csv index 9fe87989b..33a0ceccd 100644 --- a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation_persql.csv +++ b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation_persql.csv @@ -1,12 +1,14 @@ App Name,App ID,SQL ID,SQL Description,SQL DF Duration,GPU Opportunity,Estimated GPU Duration,Estimated GPU Speedup,Estimated GPU Time Saved,Recommendation -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",1,"count at QualificationInfoUtils.scala:94",7143,6719,2399.76,2.97,4743.23,"Strongly Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",3,"count at QualificationInfoUtils.scala:94",2052,1660,879.95,2.33,1172.04,"Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",2,"count at QualificationInfoUtils.scala:94",1933,1551,838.12,2.3,1094.87,"Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",1,"count at QualificationInfoUtils.scala:94",7143,6719,2078.49,3.43,5064.5,"Strongly Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",3,"count at QualificationInfoUtils.scala:94",2052,1660,800.56,2.56,1251.43,"Strongly Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",2,"count at QualificationInfoUtils.scala:94",1933,1551,763.96,2.53,1169.03,"Strongly Recommended" +"Spark shell","local-1651187225439",0,"show at :26",498,249,373.5,1.33,124.5,"Recommended" +"Spark shell","local-1651188809790",1,"show at :26",196,98,147.0,1.33,49.0,"Recommended" +"Spark shell","local-1651187225439",1,"show at :26",262,60,240.54,1.08,21.45,"Not Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",0,"json at QualificationInfoUtils.scala:76",1306,187,1246.97,1.04,59.02,"Not Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",0,"json at QualificationInfoUtils.scala:130",1209,0,1209.0,1.0,0.0,"Not Recommended" "Spark shell","local-1651188809790",0,"show at :26",715,2,715.0,1.0,0.0,"Not Recommended" -"Spark shell","local-1651187225439",0,"show at :26",498,249,498.0,1.0,0.0,"Not Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",2,"json at QualificationInfoUtils.scala:136",321,0,321.0,1.0,0.0,"Not Recommended" -"Spark shell","local-1651188809790",1,"show at :26",196,98,196.0,1.0,0.0,"Not Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",5,"json at QualificationInfoUtils.scala:136",129,0,129.0,1.0,0.0,"Not Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",8,"json at QualificationInfoUtils.scala:136",127,0,127.0,1.0,0.0,"Not Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",6,"json at QualificationInfoUtils.scala:130",110,0,110.0,1.0,0.0,"Not Recommended" @@ -14,5 +16,3 @@ App Name,App ID,SQL ID,SQL Description,SQL DF Duration,GPU Opportunity,Estimated "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",4,"createOrReplaceTempView at QualificationInfoUtils.scala:133",22,22,22.0,1.0,0.0,"Not Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",7,"createOrReplaceTempView at QualificationInfoUtils.scala:133",4,4,4.0,1.0,0.0,"Not Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",1,"createOrReplaceTempView at QualificationInfoUtils.scala:133",2,2,2.0,1.0,0.0,"Not Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",0,"json at QualificationInfoUtils.scala:76",1306,187,2010.77,0.64,-704.78,"Not Recommended" -"Spark shell","local-1651187225439",1,"show at :26",262,60,434.08,0.6,-172.09,"Not Recommended" diff --git a/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv b/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv index 88023a25c..f624cc260 100644 --- a/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1624371544219","Not Recommended",1.0,174728.8,564.19,6695,20421,175293,1034,72.15,"","JSON[string:double:date:int:bigint];Text[*]","JSON","","","",1859,175857,17266,3155,2.2,false,"CollectLimit;Scan json;Execute InsertIntoHadoopFsRelationCommand json;Scan text","",30 +"Spark shell","local-1624371544219","Not Recommended",1.0,174691.42,601.57,6695,20421,175293,1034,72.15,"","JSON[string:double:date:int:bigint];Text[*]","JSON","","","",1859,175857,17266,3155,2.39,false,"CollectLimit;Scan json;Execute InsertIntoHadoopFsRelationCommand json;Scan text","",30 diff --git a/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv b/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv index bfc7a8aad..5cb64b660 100644 --- a/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1624371906627","Not Recommended",1.0,83207.96,530.03,6760,21802,83738,971,71.3,"","Text[*];json[double]","JSON","","","",1984,82505,18668,3134,2.2,false,"BatchScan json;Execute InsertIntoHadoopFsRelationCommand json;CollectLimit;Scan text","",30 +"Spark shell","local-1624371906627","Not Recommended",1.0,83172.84,565.15,6760,21802,83738,971,71.3,"","Text[*];json[double]","JSON","","","",1984,82505,18668,3134,2.39,false,"BatchScan json;Execute InsertIntoHadoopFsRelationCommand json;CollectLimit;Scan text","",30 diff --git a/core/src/test/resources/QualificationExpectations/spark2_expectation.csv b/core/src/test/resources/QualificationExpectations/spark2_expectation.csv index 2e278308e..7510bbb42 100644 --- a/core/src/test/resources/QualificationExpectations/spark2_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/spark2_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1634253215009","Not Recommended",1.01,46400.01,662.98,1520,359,47063,1011,67.64,"","Text[*]","","","","",1068,44935,120,239,2.9,false,"CollectLimit;Scan text","",30 +"Spark shell","local-1634253215009","Not Recommended",1.01,46352.24,710.75,1520,359,47063,1011,67.64,"","Text[*]","","","","",1068,44935,120,239,3.36,false,"CollectLimit;Scan text","",30 diff --git a/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv b/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv index 0b6336f12..07d08a0c3 100644 --- a/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Not Recommended",1.08,4491.83,380.16,1306,14353,4872,570,62.67,"","","JSON","","","",1306,4477,8086,6267,3.0,true,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",30 +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Not Recommended",1.09,4468.98,403.01,1306,14353,4872,570,62.67,"","","JSON","","","",1306,4477,8086,6267,3.41,true,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",30 diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala index e314e98ac..502ddffc1 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala @@ -288,7 +288,7 @@ class QualificationSuite extends BaseTestSuite { assert(lines.size == (4 + 4)) // skip the 3 header lines val firstRow = lines(3) - assert(firstRow.contains("local-1651187225439")) + assert(firstRow.contains("local-1623281204390")) } finally { inputSource.close() } From f6083650dec81e95563d5ac9ce2e054d2a894ebe Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Thu, 12 Oct 2023 11:34:08 -0700 Subject: [PATCH 12/16] add penalty to durations --- .../qualification/QualificationAppInfo.scala | 32 +++++++++++-------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index 322fc04e4..9be9d92a5 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -40,7 +40,7 @@ class QualificationAppInfo( reportSqlLevel: Boolean, perSqlOnly: Boolean = false, mlOpsEnabled: Boolean = false, - ignoreTransitions: Boolean = false) + penalizeTransitions: Boolean = true) extends AppBase(eventLogInfo, hadoopConf) with Logging { var appId: String = "" @@ -247,13 +247,13 @@ class QualificationAppInfo( stages.map { stageId => val stageTaskTime = stageIdToTaskEndSum.get(stageId) .map(_.totalTaskDuration).getOrElse(0L) - val numTransitions = ignoreTransitions match { - case false => stageIdToGpuCpuTransitions.getOrElse(stageId, 0) - case true => 0 + val numTransitions = penalizeTransitions match { + case true => stageIdToGpuCpuTransitions.getOrElse(stageId, 0) + case false => 0 } val transitionsTime = numTransitions match { case 0 => 0L // no transitions - case gpuCpuTransitions if gpuCpuTransitions > 0 => + case gpuCpuTransitions => // Duration to transfer data from GPU to CPU and vice versa. // Assuming it's a PCI-E Gen3, but also assuming that some of the result could be // spilled to disk. @@ -272,7 +272,8 @@ class QualificationAppInfo( case _ => 0L } val finalEachStageUnsupported = if (transitionsTime != 0) { - (allStageTaskTime * numUnsupported.size / allFlattenedExecs.size.toDouble).toLong + // Add 20% penalty for unsupported duration if there are transitions. + (eachStageUnsupported * 0.2 + eachStageUnsupported).toLong } else { eachStageUnsupported } @@ -282,11 +283,7 @@ class QualificationAppInfo( }.toSet } - def summarizeStageLevel(execInfos: Seq[ExecInfo], sqlID: Long): Set[StageQualSummaryInfo] = { - val (allStagesToExecs, execsNoStage) = getStageToExec(execInfos) - - // Get the total number of transitions between CPU and GPU for each stage and - // store it in a Map. + private def setNumberOfTransitions(allStagesToExecs: Map[Int, Seq[ExecInfo]]): Unit = { allStagesToExecs.foreach { case (stageId, execs) => // Flatten all the Execs within a stage. // Example: Exchange;WholeStageCodegen (14);Exchange;WholeStageCodegen (13);Exchange @@ -319,6 +316,15 @@ class QualificationAppInfo( } stageIdToGpuCpuTransitions(stageId) = transitions } + } + + def summarizeStageLevel(execInfos: Seq[ExecInfo], sqlID: Long): Set[StageQualSummaryInfo] = { + val (allStagesToExecs, execsNoStage) = getStageToExec(execInfos) + + // Get the total number of transitions between CPU and GPU for each stage and + // store it in a Map. + setNumberOfTransitions(allStagesToExecs) + if (allStagesToExecs.isEmpty) { // use job level // also get the job ids associated with the SQLId @@ -922,10 +928,10 @@ object QualificationAppInfo extends Logging { pluginTypeChecker: PluginTypeChecker, reportSqlLevel: Boolean, mlOpsEnabled: Boolean, - ignoreTransitions: Boolean): Either[String, QualificationAppInfo] = { + penalizeTransitions: Boolean): Either[String, QualificationAppInfo] = { try { val app = new QualificationAppInfo(Some(path), Some(hadoopConf), pluginTypeChecker, - reportSqlLevel, false, mlOpsEnabled, ignoreTransitions) + reportSqlLevel, false, mlOpsEnabled, penalizeTransitions) logInfo(s"${path.eventLog.toString} has App: ${app.appId}") Right(app) } catch { From f8c8d4076a1bcaf6cdf92849c64399f75614ee50 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Thu, 12 Oct 2023 17:11:29 -0700 Subject: [PATCH 13/16] change transitiontime calculation --- .../sql/rapids/tool/qualification/QualificationAppInfo.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index 9be9d92a5..fe34b9bea 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -263,8 +263,9 @@ class QualificationAppInfo( stageIdToTaskEndSum.get(stageId).map(_.totalbytesRead).getOrElse(0L) } if (totalBytesRead > 0) { - (TimeUnit.SECONDS.toMillis(totalBytesRead / - QualificationAppInfo.CPU_GPU_TRANSFER_RATE) * gpuCpuTransitions) + val transitionTime = (totalBytesRead / + QualificationAppInfo.CPU_GPU_TRANSFER_RATE.toDouble) * gpuCpuTransitions + (transitionTime * 1000).toLong // convert to milliseconds } else { 0L } From 36f8d56d9697bccddef971e773a840b75402bb1a Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Fri, 13 Oct 2023 09:11:40 -0700 Subject: [PATCH 14/16] update variable name --- .../spark/rapids/tool/qualification/Qualification.scala | 4 ++-- .../spark/rapids/tool/qualification/QualificationMain.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala index 0fa13fddb..e733d73c2 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala @@ -35,7 +35,7 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration, pluginTypeChecker: PluginTypeChecker, reportReadSchema: Boolean, printStdout: Boolean, uiEnabled: Boolean, enablePB: Boolean, reportSqlLevel: Boolean, maxSQLDescLength: Int, mlOpsEnabled:Boolean, - ignoreTransitions: Boolean) extends Logging { + penalizeTransitions: Boolean) extends Logging { private val allApps = new ConcurrentLinkedQueue[QualificationSummaryInfo]() @@ -167,7 +167,7 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration, try { val startTime = System.currentTimeMillis() val appResult = QualificationAppInfo.createApp(path, hadoopConf, pluginTypeChecker, - reportSqlLevel, mlOpsEnabled, ignoreTransitions) + reportSqlLevel, mlOpsEnabled, penalizeTransitions) val qualAppResult = appResult match { case Left(errorMessage: String) => // Case when an error occurred during QualificationAppInfo creation diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala index 3de5826b8..cb8a3c583 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala @@ -60,7 +60,7 @@ object QualificationMain extends Logging { val reportSqlLevel = appArgs.perSql.getOrElse(false) val platform = appArgs.platform.getOrElse("onprem") val mlOpsEnabled = appArgs.mlFunctions.getOrElse(false) - val ignoreTransitions = appArgs.penalizeTransitions.getOrElse(true) + val penalizeTransitions = appArgs.penalizeTransitions.getOrElse(true) val hadoopConf = RapidsToolsConfUtil.newHadoopConf @@ -94,7 +94,7 @@ object QualificationMain extends Logging { val qual = new Qualification(outputDirectory, numOutputRows, hadoopConf, timeout, nThreads, order, pluginTypeChecker, reportReadSchema, printStdout, uiEnabled, - enablePB, reportSqlLevel, maxSQLDescLength, mlOpsEnabled, ignoreTransitions) + enablePB, reportSqlLevel, maxSQLDescLength, mlOpsEnabled, penalizeTransitions) val res = qual.qualifyApps(filteredLogs) (0, res) } From 4a8ab847bd0499d31c2df2d36ae5b90a01e673a3 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Fri, 13 Oct 2023 11:28:12 -0700 Subject: [PATCH 15/16] addressed review comments --- .../tool/qualification/QualificationAppInfo.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index fe34b9bea..a74193dd7 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -284,7 +284,7 @@ class QualificationAppInfo( }.toSet } - private def setNumberOfTransitions(allStagesToExecs: Map[Int, Seq[ExecInfo]]): Unit = { + private def calculateNumberOfTransitions(allStagesToExecs: Map[Int, Seq[ExecInfo]]): Unit = { allStagesToExecs.foreach { case (stageId, execs) => // Flatten all the Execs within a stage. // Example: Exchange;WholeStageCodegen (14);Exchange;WholeStageCodegen (13);Exchange @@ -324,7 +324,7 @@ class QualificationAppInfo( // Get the total number of transitions between CPU and GPU for each stage and // store it in a Map. - setNumberOfTransitions(allStagesToExecs) + calculateNumberOfTransitions(allStagesToExecs) if (allStagesToExecs.isEmpty) { // use job level @@ -506,7 +506,7 @@ class QualificationAppInfo( val allStagesSummary = perSqlStageSummary.flatMap(_.stageSum) .map(sum => sum.stageId -> sum).toMap.values.toSeq val sqlDataframeTaskDuration = allStagesSummary.map(s => s.stageTaskTime).sum - val totalTransitionsTime = allStagesSummary.map(s=> s.transitionTime).sum + val totalTransitionsTime = allStagesSummary.map(s => s.transitionTime).sum val unsupportedSQLTaskDuration = calculateSQLUnsupportedTaskDuration(allStagesSummary) val endDurationEstimated = this.appEndTime.isEmpty && appDuration > 0 val jobOverheadTime = calculateJobOverHeadTime(info.startTime) @@ -567,6 +567,8 @@ class QualificationAppInfo( } // get the ratio based on the Task durations that we will use for wall clock durations + // totalTransitionTime is the overhead time for ColumnarToRow/RowToColumnar transitions + // which impacts the GPU ratio. val estimatedGPURatio = if (sqlDataframeTaskDuration > 0) { supportedSQLTaskDuration.toDouble / ( sqlDataframeTaskDuration.toDouble + totalTransitionsTime.toDouble) @@ -833,8 +835,6 @@ object QualificationAppInfo extends Logging { // This includes the time taken to convert the data from one format to another and the time taken // to transfer the data from CPU to GPU and vice versa. Current transfer rate is 1GB/s and is // based on the testing on few candidate eventlogs. - // TODO: Need to test this on more eventlogs including NDS queries - // and come up with a better transfer rate. val CPU_GPU_TRANSFER_RATE = 1000000000L private def handleException(e: Exception, path: EventLogInfo): String = { From 052b22f66ac249aa3d72f2f7e8a7eb1b98487ec6 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Sun, 15 Oct 2023 21:42:29 -0700 Subject: [PATCH 16/16] change penaly percentage --- .../rapids/tool/qualification/QualificationAppInfo.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index a74193dd7..028c904ed 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -273,8 +273,10 @@ class QualificationAppInfo( case _ => 0L } val finalEachStageUnsupported = if (transitionsTime != 0) { - // Add 20% penalty for unsupported duration if there are transitions. - (eachStageUnsupported * 0.2 + eachStageUnsupported).toLong + // Add 50% penalty for unsupported duration if there are transitions. This number + // was randomly picked because it matched roughly what we saw on the experiments + // with customer/nds event logs + (eachStageUnsupported * 0.5 + eachStageUnsupported).toLong } else { eachStageUnsupported }