From c34a38966529e0e30f046e8de038a50528029f9a Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Thu, 26 Oct 2023 17:25:28 -0700 Subject: [PATCH 1/6] Qualification tool: Enhance mapping of Execs to stages Signed-off-by: Niranjan Artal --- .../qualification/QualificationAppInfo.scala | 85 ++++++++++++++++--- .../jdbc_expectation.csv | 2 +- .../nds_q86_fail_test_expectation.csv | 2 +- .../nds_q86_fail_test_expectation_persql.csv | 2 +- .../nds_q86_test_expectation.csv | 2 +- .../nds_q86_test_expectation_persql.csv | 2 +- .../qual_test_simple_expectation.csv | 2 +- .../read_dsv1_expectation.csv | 2 +- .../read_dsv2_expectation.csv | 2 +- .../spark2_expectation.csv | 2 +- 10 files changed, 80 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index 028c904ed..e107268bb 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -200,22 +200,79 @@ class QualificationAppInfo( private def getStageToExec(execInfos: Seq[ExecInfo]): (Map[Int, Seq[ExecInfo]], Seq[ExecInfo]) = { val execsWithoutStages = new ArrayBuffer[ExecInfo]() - val perStageSum = execInfos.flatMap { execInfo => - if (execInfo.stages.size > 1) { - execInfo.stages.map((_, execInfo)) - } else if (execInfo.stages.size < 1) { - // we don't know what stage its in or its duration - logDebug(s"No stage associated with ${execInfo.exec} " + - s"so speedup factor isn't applied anywhere.") - execsWithoutStages += execInfo - Seq.empty - } else { - Seq((execInfo.stages.head, execInfo)) - } - }.groupBy(_._1).map { case (stage, execInfos) => + + // This is to get the mapping between stageId and execs. This is primarily done based on + // accumulatorId. If an Exec has some metrics generated, then an accumulatorId will be + // generated for that Exec. This accumulatorId is used to get the stageId. If an Exec + // doesn't have any metrics, then we will try to get the stageId by looking at the + // neighbor Execs. If either of the neighbor Execs has a stageId, then we will use that + // to assign the same stageId to the current Exec as it's most likely that the current + // Exec is part of the same stage as the neighbor Exec. + val execInfosInOrder = execInfos.reverse + val execInfosCombined = Iterator(Seq(execInfosInOrder.head)) ++ + execInfosInOrder.sliding(3) ++ Iterator(Seq(execInfosInOrder.last)) + val perStageSum = execInfosCombined.map { + case Seq(prev, execInfo, next) => + val associatedStages = + if (execInfo.stages.size > 1) execInfo.stages.toSeq + else if (execInfo.stages.size < 1) { + val prevStages = if (prev.stages.size > 1) prev.stages else Seq.empty + val nextStages = if (next.stages.size > 1) next.stages else Seq.empty + val singlePrevStage = prev.stages.headOption.toSeq + val singleNextStage = next.stages.headOption.toSeq + val dedupedStages = Set(singlePrevStage ++ singleNextStage ++ prevStages ++ nextStages). + flatten.toSeq + if (dedupedStages.nonEmpty) { + dedupedStages + } else { + // we don't know what stage its in or its duration + logDebug(s"No stage associated with ${execInfo.exec} " + + s"so speedup factor isn't applied anywhere.") + execsWithoutStages += execInfo + Seq.empty + } + } else { + Seq(execInfo.stages.head) + } + associatedStages.map((_, execInfo)) + + case Seq(prev, execInfo) => + val associatedStages = + if (execInfo.stages.size > 1) execInfo.stages.toSeq + else if (execInfo.stages.size < 1) { + val prevStages = if (prev.stages.size > 1) prev.stages.toSeq else Seq.empty + val singlePrevStage = prev.stages.headOption.toSeq + val dedupedStages = Set(singlePrevStage ++ prevStages).flatten.toSeq + if (dedupedStages.nonEmpty) { + dedupedStages + } else { + // we don't know what stage its in or its duration + logDebug(s"No stage associated with ${execInfo.exec} " + + s"so speedup factor isn't applied anywhere.") + execsWithoutStages += execInfo + Seq.empty + } + } else { + Seq(execInfo.stages.head) + } + associatedStages.map((_, execInfo)) + + case Seq(execInfo) => + val associatedStages = + if (execInfo.stages.size > 1) execInfo.stages.toSeq + else if (execInfo.stages.size < 1) { + execsWithoutStages += execInfo + Seq.empty + } else { + Seq(execInfo.stages.head) + } + associatedStages.map((_, execInfo)) + + case _ => Seq.empty + }.toSeq.flatten.groupBy(_._1).map { case (stage, execInfos) => (stage, execInfos.map(_._2)) } - (perStageSum, execsWithoutStages.toSeq) + (perStageSum, execsWithoutStages) } private def flattenedExecs(execs: Seq[ExecInfo]): Seq[ExecInfo] = { diff --git a/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv b/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv index eb8a72b10..7dbea470a 100644 --- a/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","app-20211019113801-0001","Not Recommended",1.0,569385.42,2581.57,3627,19894,571967,3503,28.41,"","JDBC[*]","","","","",1812,544575,677,19217,3.8,false,"Scan JDBCRelation(TBLS) [numPartitions=1];Execute CreateViewCommand;CollectLimit","",30 +"Spark shell","app-20211019113801-0001","Not Recommended",1.0,569439.65,2527.34,3627,19894,571967,3470,28.41,"","JDBC[*]","","","","",1812,544575,859,19035,3.68,false,"Scan JDBCRelation(TBLS) [numPartitions=1];Execute CreateViewCommand;CollectLimit","",30 diff --git a/core/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation.csv b/core/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation.csv index c646df568..76de7bc3a 100644 --- a/core/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"TPC-DS Like Bench q86","app-20210319163812-1778","Not Applicable",1.36,19120.15,7050.84,9569,4320658,26171,9569,0.0,"24","","","","","",9565,3595714,0,4320658,3.8,false,"Execute CreateViewCommand","",30 +"TPC-DS Like Bench q86","app-20210319163812-1778","Not Applicable",1.36,19216.48,6954.51,9569,4320658,26171,9569,0.0,"24","","","","","",9565,3595714,0,4320658,3.66,false,"Execute CreateViewCommand","",30 diff --git a/core/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation_persql.csv b/core/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation_persql.csv index 26953acab..e1dc2018c 100644 --- a/core/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation_persql.csv +++ b/core/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation_persql.csv @@ -23,4 +23,4 @@ App Name,App ID,SQL ID,SQL Description,SQL DF Duration,GPU Opportunity,Estimated "TPC-DS Like Bench q86","app-20210319163812-1778",17,"Register input tables",0,0,0.0,0.0,0.0,"Not Recommended" "TPC-DS Like Bench q86","app-20210319163812-1778",8,"Register input tables",0,0,0.0,0.0,0.0,"Not Recommended" "TPC-DS Like Bench q86","app-20210319163812-1778",23,"Register input tables",0,0,0.0,0.0,0.0,"Not Recommended" -"TPC-DS Like Bench q86","app-20210319163812-1778",24,"Benchmark Run: query=q86; iteration=0",9565,9565,2517.1,3.79,7047.89,"Not Applicable" +"TPC-DS Like Bench q86","app-20210319163812-1778",24,"Benchmark Run: query=q86; iteration=0",9565,9565,2613.38,3.66,6951.61,"Not Applicable" diff --git a/core/src/test/resources/QualificationExpectations/nds_q86_test_expectation.csv b/core/src/test/resources/QualificationExpectations/nds_q86_test_expectation.csv index b58f5b8ef..985fa65d9 100644 --- a/core/src/test/resources/QualificationExpectations/nds_q86_test_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/nds_q86_test_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"TPC-DS Like Bench q86","app-20210319163812-1778","Recommended",1.36,19120.15,7050.84,9569,4320658,26171,9569,35.34,"","","","","","",9565,3595714,0,4320658,3.8,false,"Execute CreateViewCommand","",30 +"TPC-DS Like Bench q86","app-20210319163812-1778","Recommended",1.36,19216.48,6954.51,9569,4320658,26171,9569,35.34,"","","","","","",9565,3595714,0,4320658,3.66,false,"Execute CreateViewCommand","",30 diff --git a/core/src/test/resources/QualificationExpectations/nds_q86_test_expectation_persql.csv b/core/src/test/resources/QualificationExpectations/nds_q86_test_expectation_persql.csv index 8dfe3cf65..67746ff8b 100644 --- a/core/src/test/resources/QualificationExpectations/nds_q86_test_expectation_persql.csv +++ b/core/src/test/resources/QualificationExpectations/nds_q86_test_expectation_persql.csv @@ -1,5 +1,5 @@ App Name,App ID,SQL ID,SQL Description,SQL DF Duration,GPU Opportunity,Estimated GPU Duration,Estimated GPU Speedup,Estimated GPU Time Saved,Recommendation -"TPC-DS Like Bench q86","app-20210319163812-1778",24,"Benchmark Run: query=q86; iteration=0",9565,9565,2517.1,3.79,7047.89,"Strongly Recommended" +"TPC-DS Like Bench q86","app-20210319163812-1778",24,"Benchmark Run: query=q86; iteration=0",9565,9565,2613.38,3.66,6951.61,"Strongly Recommended" "TPC-DS Like Bench q86","app-20210319163812-1778",0,"Register input tables",2,2,2.0,1.0,0.0,"Not Recommended" "TPC-DS Like Bench q86","app-20210319163812-1778",21,"Register input tables",1,1,1.0,1.0,0.0,"Not Recommended" "TPC-DS Like Bench q86","app-20210319163812-1778",5,"Register input tables",1,1,1.0,1.0,0.0,"Not Recommended" diff --git a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv index f43f88c8d..60ec0150e 100644 --- a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv @@ -2,4 +2,4 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Esti "Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Recommended",1.92,8472.65,7846.34,12434,132257,16319,10589,37.7,"","","JSON","","","",7143,4717,19616,112641,3.86,false,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",1 "Spark shell","local-1651187225439","Not Recommended",1.0,355483.43,153.56,760,180,355637,350,87.88,"","JSON[string:bigint:int]","","","","",498,343411,97,83,1.78,false,"SerializeFromObject;CollectLimit;DeserializeToObject;Scan json;Filter;MapElements","",1 "Spark shell","local-1651188809790","Not Recommended",1.0,166199.97,15.02,911,283,166215,45,81.18,"","JSON[string:bigint:int]","","","","UDF",715,133608,269,14,1.5,false,"CollectLimit;Scan json;Project","UDF",1 -"Rapids Spark Profiling Tool Unit Tests","local-1623281204390","Not Recommended",1.0,6240.0,0.0,2032,4666,6240,0,46.27,"","JSON[string:bigint:int]","JSON","","","UDF",1209,5793,4664,2,1.0,false,"Execute InsertIntoHadoopFsRelationCommand json;LocalTableScan;Project;Scan json;Execute CreateViewCommand","UDF",1 +"Rapids Spark Profiling Tool Unit Tests","local-1623281204390","Not Recommended",1.0,6240.0,0.0,2032,4666,6240,1,46.27,"","JSON[string:bigint:int]","JSON","","","UDF",1209,5793,4662,4,1.0,false,"Execute InsertIntoHadoopFsRelationCommand json;LocalTableScan;Project;Scan json;Execute CreateViewCommand","UDF",1 diff --git a/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv b/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv index f624cc260..c261e2db4 100644 --- a/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1624371544219","Not Recommended",1.0,174691.42,601.57,6695,20421,175293,1034,72.15,"","JSON[string:double:date:int:bigint];Text[*]","JSON","","","",1859,175857,17266,3155,2.39,false,"CollectLimit;Scan json;Execute InsertIntoHadoopFsRelationCommand json;Scan text","",30 +"Spark shell","local-1624371544219","Not Recommended",1.0,173961.11,1331.88,6695,20421,175293,2290,72.15,"","JSON[string:double:date:int:bigint];Text[*]","JSON","","","",1859,175953,13403,7018,2.39,false,"CollectLimit;Scan json;Execute InsertIntoHadoopFsRelationCommand json;Scan text","",30 diff --git a/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv b/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv index 5cb64b660..fbfaec015 100644 --- a/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1624371906627","Not Recommended",1.0,83172.84,565.15,6760,21802,83738,971,71.3,"","Text[*];json[double]","JSON","","","",1984,82505,18668,3134,2.39,false,"BatchScan json;Execute InsertIntoHadoopFsRelationCommand json;CollectLimit;Scan text","",30 +"Spark shell","local-1624371906627","Not Recommended",1.01,82225.67,1512.32,6760,21802,83738,2412,71.3,"","Text[*];json[double]","JSON","","","",1984,82601,13987,7815,2.68,false,"BatchScan json;Execute InsertIntoHadoopFsRelationCommand json;CollectLimit;Scan text","",30 diff --git a/core/src/test/resources/QualificationExpectations/spark2_expectation.csv b/core/src/test/resources/QualificationExpectations/spark2_expectation.csv index 7510bbb42..f2dbf947b 100644 --- a/core/src/test/resources/QualificationExpectations/spark2_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/spark2_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1634253215009","Not Recommended",1.01,46352.24,710.75,1520,359,47063,1011,67.64,"","Text[*]","","","","",1068,44935,120,239,3.36,false,"CollectLimit;Scan text","",30 +"Spark shell","local-1634253215009","Not Recommended",1.01,46516.07,546.92,1520,359,47063,842,67.64,"","Text[*]","","","","",1068,44935,160,199,2.85,false,"CollectLimit;Scan text","",30 From 66c9607114aa6be455a8652141fa62cb832da5e1 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Wed, 1 Nov 2023 17:40:06 -0700 Subject: [PATCH 2/6] addressed review comments --- .../qualification/QualificationAppInfo.scala | 161 +++++++++++------- .../complex_dec_expectation.csv | 2 +- .../nds_q86_fail_test_expectation.csv | 2 +- .../nds_q86_fail_test_expectation_persql.csv | 2 +- .../nds_q86_test_expectation.csv | 2 +- .../nds_q86_test_expectation_persql.csv | 2 +- .../qual_test_simple_expectation.csv | 8 +- .../qual_test_simple_expectation_persql.csv | 10 +- .../read_dsv1_expectation.csv | 2 +- .../read_dsv2_expectation.csv | 2 +- .../spark2_expectation.csv | 2 +- .../truncated_1_end_expectation.csv | 2 +- .../write_format_expectation.csv | 2 +- 13 files changed, 118 insertions(+), 81 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index e107268bb..45e614376 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -198,6 +198,78 @@ class QualificationAppInfo( } } + private def singleExecMatch(execInfo: ExecInfo): (Seq[(Int, ExecInfo)], Option[ExecInfo]) = { + val associatedStages = { + if (execInfo.stages.size > 1) { + execInfo.stages.toSeq + } else if (execInfo.stages.size < 1) { + // we don't know what stage its in or its duration + logDebug(s"No stage associated with ${execInfo.exec} " + + s"so speedup factor isn't applied anywhere.") + Seq.empty + } else { + Seq(execInfo.stages.head) + } + } + if (associatedStages.nonEmpty) { + (associatedStages.map((_, execInfo)), None) + } else { + (Seq.empty, Some(execInfo)) + } + } + + private def doubleExecMatch(neighbor: ExecInfo, execInfo: ExecInfo): ( + Seq[(Int, ExecInfo)], Option[ExecInfo]) = { + val associatedStages = { + if (execInfo.stages.size > 1) { + execInfo.stages.toSeq + } else if (execInfo.stages.size < 1) { + if (neighbor.stages.size >= 1) { + neighbor.stages.headOption.toSeq + } else { + // we don't know what stage its in or its duration + logDebug(s"No stage associated with ${execInfo.exec} " + + s"so speedup factor isn't applied anywhere.") + Seq.empty + } + } else { + Seq(execInfo.stages.head) + } + } + if (associatedStages.nonEmpty) { + (associatedStages.map((_, execInfo)), None) + } else { + (Seq.empty, Some(execInfo)) + } + } + + private def tripleExecMatch(prev: ExecInfo, execInfo: ExecInfo, next: ExecInfo): + (Seq[(Int, ExecInfo)], Option[ExecInfo]) = { + val associatedStages = { + if (execInfo.stages.size > 1) { + execInfo.stages.toSeq + } else if (execInfo.stages.size < 1) { + if (prev.stages.size >= 1) { + prev.stages.headOption.toSeq + } else if (next.stages.size >= 1) { + next.stages.headOption.toSeq + } else { + // we don't know what stage its in or its duration + logDebug(s"No stage associated with ${execInfo.exec} " + + s"so speedup factor isn't applied anywhere.") + Seq.empty + } + } else { + Seq(execInfo.stages.head) + } + } + if (associatedStages.nonEmpty) { + (associatedStages.map((_, execInfo)), None) + } else { + (Seq.empty, Some(execInfo)) + } + } + private def getStageToExec(execInfos: Seq[ExecInfo]): (Map[Int, Seq[ExecInfo]], Seq[ExecInfo]) = { val execsWithoutStages = new ArrayBuffer[ExecInfo]() @@ -209,69 +281,34 @@ class QualificationAppInfo( // to assign the same stageId to the current Exec as it's most likely that the current // Exec is part of the same stage as the neighbor Exec. val execInfosInOrder = execInfos.reverse - val execInfosCombined = Iterator(Seq(execInfosInOrder.head)) ++ - execInfosInOrder.sliding(3) ++ Iterator(Seq(execInfosInOrder.last)) - val perStageSum = execInfosCombined.map { - case Seq(prev, execInfo, next) => - val associatedStages = - if (execInfo.stages.size > 1) execInfo.stages.toSeq - else if (execInfo.stages.size < 1) { - val prevStages = if (prev.stages.size > 1) prev.stages else Seq.empty - val nextStages = if (next.stages.size > 1) next.stages else Seq.empty - val singlePrevStage = prev.stages.headOption.toSeq - val singleNextStage = next.stages.headOption.toSeq - val dedupedStages = Set(singlePrevStage ++ singleNextStage ++ prevStages ++ nextStages). - flatten.toSeq - if (dedupedStages.nonEmpty) { - dedupedStages - } else { - // we don't know what stage its in or its duration - logDebug(s"No stage associated with ${execInfo.exec} " + - s"so speedup factor isn't applied anywhere.") - execsWithoutStages += execInfo - Seq.empty - } - } else { - Seq(execInfo.stages.head) - } - associatedStages.map((_, execInfo)) - - case Seq(prev, execInfo) => - val associatedStages = - if (execInfo.stages.size > 1) execInfo.stages.toSeq - else if (execInfo.stages.size < 1) { - val prevStages = if (prev.stages.size > 1) prev.stages.toSeq else Seq.empty - val singlePrevStage = prev.stages.headOption.toSeq - val dedupedStages = Set(singlePrevStage ++ prevStages).flatten.toSeq - if (dedupedStages.nonEmpty) { - dedupedStages - } else { - // we don't know what stage its in or its duration - logDebug(s"No stage associated with ${execInfo.exec} " + - s"so speedup factor isn't applied anywhere.") - execsWithoutStages += execInfo - Seq.empty - } - } else { - Seq(execInfo.stages.head) - } - associatedStages.map((_, execInfo)) - - case Seq(execInfo) => - val associatedStages = - if (execInfo.stages.size > 1) execInfo.stages.toSeq - else if (execInfo.stages.size < 1) { - execsWithoutStages += execInfo - Seq.empty - } else { - Seq(execInfo.stages.head) - } - associatedStages.map((_, execInfo)) - - case _ => Seq.empty - }.toSeq.flatten.groupBy(_._1).map { case (stage, execInfos) => - (stage, execInfos.map(_._2)) + val execsToStageMap = execInfosInOrder.indices.map { + // corner case to handle first element + case 0 => if (execInfosInOrder.size > 1) { + // If there are more than one Execs, then check if the next Exec has a stageId. + doubleExecMatch(execInfosInOrder(1), execInfosInOrder(0)) + } else { + singleExecMatch(execInfosInOrder(0)) + } + // corner case to handle last element + case i if i == execInfosInOrder.size - 1 => + if (execInfosInOrder.size > 1) { + // If there are more than one Execs, then check if the previous Exec has a stageId. + doubleExecMatch(execInfosInOrder(i - 1), execInfosInOrder(i)) + } else { + singleExecMatch(execInfosInOrder(i)) + } + case i => + tripleExecMatch(execInfosInOrder(i - 1), execInfosInOrder(i), execInfosInOrder(i + 1)) + case _ => + (Seq.empty, None) } + val perStageSum = execsToStageMap.map(_._1).toList.flatten + .groupBy(_._1).map { case (stage, execInfo) => + (stage, execInfo.map(_._2))} + + // Add all the execs that don't have a stageId to execsWithoutStages. + execsWithoutStages ++= execsToStageMap.map(_._2).toList.flatten + (perStageSum, execsWithoutStages) } diff --git a/core/src/test/resources/QualificationExpectations/complex_dec_expectation.csv b/core/src/test/resources/QualificationExpectations/complex_dec_expectation.csv index 9d29c9106..f2fb0a114 100644 --- a/core/src/test/resources/QualificationExpectations/complex_dec_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/complex_dec_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1626104300434","Not Recommended",1.01,129484.66,1619.33,2429,1469,131104,2429,88.35,"","","","struct,lastname:string>;struct,previous:struct,city:string>>;array>;map;map>;map>;array>;array","struct,lastname:string>;struct,previous:struct,city:string>>;array>;map>;map>;array>","NESTED COMPLEX TYPE",1260,128847,0,1469,3.0,false,"CollectLimit","",30 +"Spark shell","local-1626104300434","Not Recommended",1.0,129898.52,1205.47,2429,1469,131104,1923,88.35,"","","","struct,lastname:string>;struct,previous:struct,city:string>>;array>;map;map>;map>;array>;array","struct,lastname:string>;struct,previous:struct,city:string>>;array>;map>;map>;array>","NESTED COMPLEX TYPE",1260,128847,306,1163,2.68,false,"CollectLimit","",30 \ No newline at end of file diff --git a/core/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation.csv b/core/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation.csv index 76de7bc3a..84fe8ae97 100644 --- a/core/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"TPC-DS Like Bench q86","app-20210319163812-1778","Not Applicable",1.36,19216.48,6954.51,9569,4320658,26171,9569,0.0,"24","","","","","",9565,3595714,0,4320658,3.66,false,"Execute CreateViewCommand","",30 +"TPC-DS Like Bench q86","app-20210319163812-1778","Not Applicable",1.36,19230.84,6940.15,9569,4320658,26171,9569,0.0,"24","","","","","",9565,3595714,0,4320658,3.64,false,"Execute CreateViewCommand","",30 diff --git a/core/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation_persql.csv b/core/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation_persql.csv index e1dc2018c..d88fcb188 100644 --- a/core/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation_persql.csv +++ b/core/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation_persql.csv @@ -23,4 +23,4 @@ App Name,App ID,SQL ID,SQL Description,SQL DF Duration,GPU Opportunity,Estimated "TPC-DS Like Bench q86","app-20210319163812-1778",17,"Register input tables",0,0,0.0,0.0,0.0,"Not Recommended" "TPC-DS Like Bench q86","app-20210319163812-1778",8,"Register input tables",0,0,0.0,0.0,0.0,"Not Recommended" "TPC-DS Like Bench q86","app-20210319163812-1778",23,"Register input tables",0,0,0.0,0.0,0.0,"Not Recommended" -"TPC-DS Like Bench q86","app-20210319163812-1778",24,"Benchmark Run: query=q86; iteration=0",9565,9565,2613.38,3.66,6951.61,"Not Applicable" +"TPC-DS Like Bench q86","app-20210319163812-1778",24,"Benchmark Run: query=q86; iteration=0",9565,9565,2627.74,3.64,6937.25,"Not Applicable" diff --git a/core/src/test/resources/QualificationExpectations/nds_q86_test_expectation.csv b/core/src/test/resources/QualificationExpectations/nds_q86_test_expectation.csv index 985fa65d9..3cd646abf 100644 --- a/core/src/test/resources/QualificationExpectations/nds_q86_test_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/nds_q86_test_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"TPC-DS Like Bench q86","app-20210319163812-1778","Recommended",1.36,19216.48,6954.51,9569,4320658,26171,9569,35.34,"","","","","","",9565,3595714,0,4320658,3.66,false,"Execute CreateViewCommand","",30 +"TPC-DS Like Bench q86","app-20210319163812-1778","Recommended",1.36,19230.84,6940.15,9569,4320658,26171,9569,35.34,"","","","","","",9565,3595714,0,4320658,3.64,false,"Execute CreateViewCommand","",30 diff --git a/core/src/test/resources/QualificationExpectations/nds_q86_test_expectation_persql.csv b/core/src/test/resources/QualificationExpectations/nds_q86_test_expectation_persql.csv index 67746ff8b..383bd4d73 100644 --- a/core/src/test/resources/QualificationExpectations/nds_q86_test_expectation_persql.csv +++ b/core/src/test/resources/QualificationExpectations/nds_q86_test_expectation_persql.csv @@ -1,5 +1,5 @@ App Name,App ID,SQL ID,SQL Description,SQL DF Duration,GPU Opportunity,Estimated GPU Duration,Estimated GPU Speedup,Estimated GPU Time Saved,Recommendation -"TPC-DS Like Bench q86","app-20210319163812-1778",24,"Benchmark Run: query=q86; iteration=0",9565,9565,2613.38,3.66,6951.61,"Strongly Recommended" +"TPC-DS Like Bench q86","app-20210319163812-1778",24,"Benchmark Run: query=q86; iteration=0",9565,9565,2627.74,3.64,6937.25,"Strongly Recommended" "TPC-DS Like Bench q86","app-20210319163812-1778",0,"Register input tables",2,2,2.0,1.0,0.0,"Not Recommended" "TPC-DS Like Bench q86","app-20210319163812-1778",21,"Register input tables",1,1,1.0,1.0,0.0,"Not Recommended" "TPC-DS Like Bench q86","app-20210319163812-1778",5,"Register input tables",1,1,1.0,1.0,0.0,"Not Recommended" diff --git a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv index 60ec0150e..e41039a69 100644 --- a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv @@ -1,5 +1,5 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Recommended",1.92,8472.65,7846.34,12434,132257,16319,10589,37.7,"","","JSON","","","",7143,4717,19616,112641,3.86,false,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",1 -"Spark shell","local-1651187225439","Not Recommended",1.0,355483.43,153.56,760,180,355637,350,87.88,"","JSON[string:bigint:int]","","","","",498,343411,97,83,1.78,false,"SerializeFromObject;CollectLimit;DeserializeToObject;Scan json;Filter;MapElements","",1 -"Spark shell","local-1651188809790","Not Recommended",1.0,166199.97,15.02,911,283,166215,45,81.18,"","JSON[string:bigint:int]","","","","UDF",715,133608,269,14,1.5,false,"CollectLimit;Scan json;Project","UDF",1 -"Rapids Spark Profiling Tool Unit Tests","local-1623281204390","Not Recommended",1.0,6240.0,0.0,2032,4666,6240,1,46.27,"","JSON[string:bigint:int]","JSON","","","UDF",1209,5793,4662,4,1.0,false,"Execute InsertIntoHadoopFsRelationCommand json;LocalTableScan;Project;Scan json;Execute CreateViewCommand","UDF",1 +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Recommended",1.92,8488.68,7830.31,12434,132257,16319,10577,37.7,"","","JSON","","","",7143,4717,19744,112513,3.85,false,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",1 +"Spark shell","local-1651187225439","Not Recommended",1.0,355550.33,86.66,760,180,355637,253,87.88,"","JSON[string:bigint:int]","","","","",498,343411,120,60,1.52,false,"SerializeFromObject;CollectLimit;DeserializeToObject;Scan json;Filter;MapElements","",1 +"Spark shell","local-1651188809790","Not Recommended",1.0,166205.19,9.8,911,283,166215,38,81.18,"","JSON[string:bigint:int]","","","","UDF",715,133608,271,12,1.34,false,"CollectLimit;Scan json;Project","UDF",1 +"Rapids Spark Profiling Tool Unit Tests","local-1623281204390","Not Recommended",1.0,6240.0,0.0,2032,4666,6240,2,46.27,"","JSON[string:bigint:int]","JSON","","","UDF",1209,5793,4661,5,1.0,false,"Execute InsertIntoHadoopFsRelationCommand json;LocalTableScan;Project;Scan json;Execute CreateViewCommand","UDF",1 diff --git a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation_persql.csv b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation_persql.csv index 33a0ceccd..c1f77372d 100644 --- a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation_persql.csv +++ b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation_persql.csv @@ -2,12 +2,12 @@ App Name,App ID,SQL ID,SQL Description,SQL DF Duration,GPU Opportunity,Estimated "Rapids Spark Profiling Tool Unit Tests","local-1622043423018",1,"count at QualificationInfoUtils.scala:94",7143,6719,2078.49,3.43,5064.5,"Strongly Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1622043423018",3,"count at QualificationInfoUtils.scala:94",2052,1660,800.56,2.56,1251.43,"Strongly Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1622043423018",2,"count at QualificationInfoUtils.scala:94",1933,1551,763.96,2.53,1169.03,"Strongly Recommended" -"Spark shell","local-1651187225439",0,"show at :26",498,249,373.5,1.33,124.5,"Recommended" -"Spark shell","local-1651188809790",1,"show at :26",196,98,147.0,1.33,49.0,"Recommended" -"Spark shell","local-1651187225439",1,"show at :26",262,60,240.54,1.08,21.45,"Not Recommended" -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",0,"json at QualificationInfoUtils.scala:76",1306,187,1246.97,1.04,59.02,"Not Recommended" +"Spark shell","local-1651188809790",1,"show at :26",196,75,165.75,1.18,30.24,"Not Recommended" +"Spark shell","local-1651187225439",0,"show at :26",498,168,430.53,1.15,67.46,"Not Recommended" +"Spark shell","local-1651187225439",1,"show at :26",262,80,240.22,1.09,21.77,"Not Recommended" +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",0,"json at QualificationInfoUtils.scala:76",1306,164,1267.14,1.03,38.85,"Not Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",0,"json at QualificationInfoUtils.scala:130",1209,0,1209.0,1.0,0.0,"Not Recommended" -"Spark shell","local-1651188809790",0,"show at :26",715,2,715.0,1.0,0.0,"Not Recommended" +"Spark shell","local-1651188809790",0,"show at :26",715,5,715.0,1.0,0.0,"Not Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",2,"json at QualificationInfoUtils.scala:136",321,0,321.0,1.0,0.0,"Not Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",5,"json at QualificationInfoUtils.scala:136",129,0,129.0,1.0,0.0,"Not Recommended" "Rapids Spark Profiling Tool Unit Tests","local-1623281204390",8,"json at QualificationInfoUtils.scala:136",127,0,127.0,1.0,0.0,"Not Recommended" diff --git a/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv b/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv index c261e2db4..318ea6b28 100644 --- a/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1624371544219","Not Recommended",1.0,173961.11,1331.88,6695,20421,175293,2290,72.15,"","JSON[string:double:date:int:bigint];Text[*]","JSON","","","",1859,175953,13403,7018,2.39,false,"CollectLimit;Scan json;Execute InsertIntoHadoopFsRelationCommand json;Scan text","",30 +"Spark shell","local-1624371544219","Not Recommended",1.0,174006.51,1286.48,6695,20421,175293,2268,72.15,"","JSON[string:double:date:int:bigint];Text[*]","JSON","","","",1859,175953,13469,6952,2.31,false,"CollectLimit;Scan json;Execute InsertIntoHadoopFsRelationCommand json;Scan text","",30 diff --git a/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv b/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv index fbfaec015..17c155356 100644 --- a/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1624371906627","Not Recommended",1.01,82225.67,1512.32,6760,21802,83738,2412,71.3,"","Text[*];json[double]","JSON","","","",1984,82601,13987,7815,2.68,false,"BatchScan json;Execute InsertIntoHadoopFsRelationCommand json;CollectLimit;Scan text","",30 +"Spark shell","local-1624371906627","Not Recommended",1.01,82304.74,1433.25,6760,21802,83738,2388,71.3,"","Text[*];json[double]","JSON","","","",1984,82601,14064,7738,2.5,false,"BatchScan json;Execute InsertIntoHadoopFsRelationCommand json;CollectLimit;Scan text","",30 diff --git a/core/src/test/resources/QualificationExpectations/spark2_expectation.csv b/core/src/test/resources/QualificationExpectations/spark2_expectation.csv index f2dbf947b..f300a45a5 100644 --- a/core/src/test/resources/QualificationExpectations/spark2_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/spark2_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1634253215009","Not Recommended",1.01,46516.07,546.92,1520,359,47063,842,67.64,"","Text[*]","","","","",1068,44935,160,199,2.85,false,"CollectLimit;Scan text","",30 +"Spark shell","local-1634253215009","Not Recommended",1.01,46542.98,520.01,1520,359,47063,817,67.64,"","Text[*]","","","","",1068,44935,166,193,2.75,false,"CollectLimit;Scan text","",30 diff --git a/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv b/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv index 07d08a0c3..22394fe3a 100644 --- a/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Not Recommended",1.09,4468.98,403.01,1306,14353,4872,570,62.67,"","","JSON","","","",1306,4477,8086,6267,3.41,true,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",30 +"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Not Recommended",1.08,4479.65,392.34,1306,14353,4872,558,62.67,"","","JSON","","","",1306,4477,8214,6139,3.36,true,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",30 diff --git a/core/src/test/resources/QualificationExpectations/write_format_expectation.csv b/core/src/test/resources/QualificationExpectations/write_format_expectation.csv index d18cf6f22..2ce60816e 100644 --- a/core/src/test/resources/QualificationExpectations/write_format_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/write_format_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1629442299891","Not Recommended",1.05,18558.0,996.0,1992,920,19554,1992,91.72,"","","CSV;JSON","","","",1235,16325,0,920,2.0,false,"Execute InsertIntoHadoopFsRelationCommand json;Execute InsertIntoHadoopFsRelationCommand csv","",30 +"Spark shell","local-1629442299891","Not Recommended",1.03,18871.95,682.04,1992,920,19554,1364,91.72,"","","CSV;JSON","","","",1235,16325,290,630,2.0,false,"Execute InsertIntoHadoopFsRelationCommand json;Execute InsertIntoHadoopFsRelationCommand csv","",30 From ff83687b69717bca021ef824fb39698aadd7d961 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Thu, 2 Nov 2023 14:30:49 -0700 Subject: [PATCH 3/6] address review comments and consolidate functions --- .../qualification/QualificationAppInfo.scala | 80 ++++--------------- 1 file changed, 16 insertions(+), 64 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index 45e614376..560571a26 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -198,34 +198,16 @@ class QualificationAppInfo( } } - private def singleExecMatch(execInfo: ExecInfo): (Seq[(Int, ExecInfo)], Option[ExecInfo]) = { + private def checkStageIdInExec(prev: Option[ExecInfo], + execInfo: ExecInfo, next: Option[ExecInfo]): (Seq[(Int, ExecInfo)], Option[ExecInfo]) = { val associatedStages = { if (execInfo.stages.size > 1) { execInfo.stages.toSeq } else if (execInfo.stages.size < 1) { - // we don't know what stage its in or its duration - logDebug(s"No stage associated with ${execInfo.exec} " + - s"so speedup factor isn't applied anywhere.") - Seq.empty - } else { - Seq(execInfo.stages.head) - } - } - if (associatedStages.nonEmpty) { - (associatedStages.map((_, execInfo)), None) - } else { - (Seq.empty, Some(execInfo)) - } - } - - private def doubleExecMatch(neighbor: ExecInfo, execInfo: ExecInfo): ( - Seq[(Int, ExecInfo)], Option[ExecInfo]) = { - val associatedStages = { - if (execInfo.stages.size > 1) { - execInfo.stages.toSeq - } else if (execInfo.stages.size < 1) { - if (neighbor.stages.size >= 1) { - neighbor.stages.headOption.toSeq + if (prev.exists(_.stages.size >= 1)) { + prev.flatMap(_.stages.headOption).toSeq + } else if (next.exists(_.stages.size >= 1)) { + next.flatMap(_.stages.headOption).toSeq } else { // we don't know what stage its in or its duration logDebug(s"No stage associated with ${execInfo.exec} " + @@ -243,32 +225,6 @@ class QualificationAppInfo( } } - private def tripleExecMatch(prev: ExecInfo, execInfo: ExecInfo, next: ExecInfo): - (Seq[(Int, ExecInfo)], Option[ExecInfo]) = { - val associatedStages = { - if (execInfo.stages.size > 1) { - execInfo.stages.toSeq - } else if (execInfo.stages.size < 1) { - if (prev.stages.size >= 1) { - prev.stages.headOption.toSeq - } else if (next.stages.size >= 1) { - next.stages.headOption.toSeq - } else { - // we don't know what stage its in or its duration - logDebug(s"No stage associated with ${execInfo.exec} " + - s"so speedup factor isn't applied anywhere.") - Seq.empty - } - } else { - Seq(execInfo.stages.head) - } - } - if (associatedStages.nonEmpty) { - (associatedStages.map((_, execInfo)), None) - } else { - (Seq.empty, Some(execInfo)) - } - } private def getStageToExec(execInfos: Seq[ExecInfo]): (Map[Int, Seq[ExecInfo]], Seq[ExecInfo]) = { val execsWithoutStages = new ArrayBuffer[ExecInfo]() @@ -285,29 +241,25 @@ class QualificationAppInfo( // corner case to handle first element case 0 => if (execInfosInOrder.size > 1) { // If there are more than one Execs, then check if the next Exec has a stageId. - doubleExecMatch(execInfosInOrder(1), execInfosInOrder(0)) + checkStageIdInExec(None, execInfosInOrder(0), Some(execInfosInOrder(1))) } else { - singleExecMatch(execInfosInOrder(0)) + checkStageIdInExec(None, execInfosInOrder(0), None) } // corner case to handle last element - case i if i == execInfosInOrder.size - 1 => - if (execInfosInOrder.size > 1) { - // If there are more than one Execs, then check if the previous Exec has a stageId. - doubleExecMatch(execInfosInOrder(i - 1), execInfosInOrder(i)) - } else { - singleExecMatch(execInfosInOrder(i)) - } + case i if i == execInfosInOrder.size - 1 && execInfosInOrder.size > 1 => + // If there are more than one Execs, then check if the previous Exec has a stageId. + checkStageIdInExec(Some(execInfosInOrder(i - 1)), execInfosInOrder(i), None) case i => - tripleExecMatch(execInfosInOrder(i - 1), execInfosInOrder(i), execInfosInOrder(i + 1)) - case _ => - (Seq.empty, None) + checkStageIdInExec(Some(execInfosInOrder(i - 1)), + execInfosInOrder(i), Some(execInfosInOrder(i + 1))) } val perStageSum = execsToStageMap.map(_._1).toList.flatten .groupBy(_._1).map { case (stage, execInfo) => - (stage, execInfo.map(_._2))} + (stage, execInfo.map(_._2)) + } // Add all the execs that don't have a stageId to execsWithoutStages. - execsWithoutStages ++= execsToStageMap.map(_._2).toList.flatten + execsWithoutStages ++= execsToStageMap.map(_._2).toList.flatten (perStageSum, execsWithoutStages) } From a94e898cc6ac3e799cb01bbe47e054526617e62d Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Thu, 2 Nov 2023 16:16:49 -0700 Subject: [PATCH 4/6] addressed review comments --- .../rapids/tool/qualification/QualificationAppInfo.scala | 8 +++----- .../QualificationExpectations/complex_dec_expectation.csv | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index 560571a26..33cc5c23c 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -201,12 +201,12 @@ class QualificationAppInfo( private def checkStageIdInExec(prev: Option[ExecInfo], execInfo: ExecInfo, next: Option[ExecInfo]): (Seq[(Int, ExecInfo)], Option[ExecInfo]) = { val associatedStages = { - if (execInfo.stages.size > 1) { + if (execInfo.stages.size >= 1) { execInfo.stages.toSeq - } else if (execInfo.stages.size < 1) { + } else { if (prev.exists(_.stages.size >= 1)) { prev.flatMap(_.stages.headOption).toSeq - } else if (next.exists(_.stages.size >= 1)) { + } else if (next.nonEmpty) { next.flatMap(_.stages.headOption).toSeq } else { // we don't know what stage its in or its duration @@ -214,8 +214,6 @@ class QualificationAppInfo( s"so speedup factor isn't applied anywhere.") Seq.empty } - } else { - Seq(execInfo.stages.head) } } if (associatedStages.nonEmpty) { diff --git a/core/src/test/resources/QualificationExpectations/complex_dec_expectation.csv b/core/src/test/resources/QualificationExpectations/complex_dec_expectation.csv index f2fb0a114..753bdcacf 100644 --- a/core/src/test/resources/QualificationExpectations/complex_dec_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/complex_dec_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1626104300434","Not Recommended",1.0,129898.52,1205.47,2429,1469,131104,1923,88.35,"","","","struct,lastname:string>;struct,previous:struct,city:string>>;array>;map;map>;map>;array>;array","struct,lastname:string>;struct,previous:struct,city:string>>;array>;map>;map>;array>","NESTED COMPLEX TYPE",1260,128847,306,1163,2.68,false,"CollectLimit","",30 \ No newline at end of file +"Spark shell","local-1626104300434","Not Recommended",1.0,129898.52,1205.47,2429,1469,131104,1923,88.35,"","","","struct,lastname:string>;struct,previous:struct,city:string>>;array>;map;map>;map>;array>;array","struct,lastname:string>;struct,previous:struct,city:string>>;array>;map>;map>;array>","NESTED COMPLEX TYPE",1260,128847,306,1163,2.68,false,"CollectLimit","",30 From 3573d3ebc323b4f395700fdc37e4734ae2e91e8a Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Thu, 2 Nov 2023 17:16:56 -0700 Subject: [PATCH 5/6] optimize checks Signed-off-by: Niranjan Artal --- .../rapids/tool/qualification/QualificationAppInfo.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index 33cc5c23c..170c5823f 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -201,12 +201,12 @@ class QualificationAppInfo( private def checkStageIdInExec(prev: Option[ExecInfo], execInfo: ExecInfo, next: Option[ExecInfo]): (Seq[(Int, ExecInfo)], Option[ExecInfo]) = { val associatedStages = { - if (execInfo.stages.size >= 1) { + if (execInfo.stages.nonEmpty) { execInfo.stages.toSeq } else { - if (prev.exists(_.stages.size >= 1)) { + if (prev.exists(_.stages.nonEmpty)) { prev.flatMap(_.stages.headOption).toSeq - } else if (next.nonEmpty) { + } else if (next.exists(_.stages.nonEmpty)) { next.flatMap(_.stages.headOption).toSeq } else { // we don't know what stage its in or its duration @@ -223,7 +223,6 @@ class QualificationAppInfo( } } - private def getStageToExec(execInfos: Seq[ExecInfo]): (Map[Int, Seq[ExecInfo]], Seq[ExecInfo]) = { val execsWithoutStages = new ArrayBuffer[ExecInfo]() From 672b26b4815eb484cf3b6b2870fd767a0e221c5d Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Fri, 3 Nov 2023 12:14:26 -0700 Subject: [PATCH 6/6] add documentation --- .../tool/qualification/QualificationAppInfo.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index 170c5823f..61377d0b3 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -198,6 +198,21 @@ class QualificationAppInfo( } } + /** + * Checks the stage ID in the execution information. + * This function determines the associated stages for the given execution information by + * checking the stages in the current execution information, the previous execution information, + * and the next execution information. If there are associated stages, it returns a sequence of + * stage ID and execution information pairs. Otherwise, it returns an optional execution + * information(not associated with any stage). If there is stage ID associated with both the + * previous and the next execution information, then the current execution information is + * associated with the stage ID of the previous execution information. + * @param prev The previous execution information. + * @param execInfo The current execution information. + * @param next The next execution information. + * @return A tuple containing a sequence of stage ID and execution information pairs, + * and an optional execution information. + */ private def checkStageIdInExec(prev: Option[ExecInfo], execInfo: ExecInfo, next: Option[ExecInfo]): (Seq[(Int, ExecInfo)], Option[ExecInfo]) = { val associatedStages = {