Skip to content

Commit

Permalink
Qualification tool: Enhance mapping of Execs to stages (#634)
Browse files Browse the repository at this point in the history
* Qualification tool: Enhance mapping of Execs to stages

Signed-off-by: Niranjan Artal <[email protected]>

* addressed review comments

* address review comments and consolidate functions

* addressed review comments

* optimize checks

Signed-off-by: Niranjan Artal <[email protected]>

* add documentation

---------

Signed-off-by: Niranjan Artal <[email protected]>
  • Loading branch information
nartal1 authored Nov 3, 2023
1 parent 67200e9 commit a5683c3
Show file tree
Hide file tree
Showing 14 changed files with 91 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,24 +198,82 @@ class QualificationAppInfo(
}
}

/**
* Checks the stage ID in the execution information.
* This function determines the associated stages for the given execution information by
* checking the stages in the current execution information, the previous execution information,
* and the next execution information. If there are associated stages, it returns a sequence of
* stage ID and execution information pairs. Otherwise, it returns an optional execution
* information(not associated with any stage). If there is stage ID associated with both the
* previous and the next execution information, then the current execution information is
* associated with the stage ID of the previous execution information.
* @param prev The previous execution information.
* @param execInfo The current execution information.
* @param next The next execution information.
* @return A tuple containing a sequence of stage ID and execution information pairs,
* and an optional execution information.
*/
private def checkStageIdInExec(prev: Option[ExecInfo],
execInfo: ExecInfo, next: Option[ExecInfo]): (Seq[(Int, ExecInfo)], Option[ExecInfo]) = {
val associatedStages = {
if (execInfo.stages.nonEmpty) {
execInfo.stages.toSeq
} else {
if (prev.exists(_.stages.nonEmpty)) {
prev.flatMap(_.stages.headOption).toSeq
} else if (next.exists(_.stages.nonEmpty)) {
next.flatMap(_.stages.headOption).toSeq
} else {
// we don't know what stage its in or its duration
logDebug(s"No stage associated with ${execInfo.exec} " +
s"so speedup factor isn't applied anywhere.")
Seq.empty
}
}
}
if (associatedStages.nonEmpty) {
(associatedStages.map((_, execInfo)), None)
} else {
(Seq.empty, Some(execInfo))
}
}

private def getStageToExec(execInfos: Seq[ExecInfo]): (Map[Int, Seq[ExecInfo]], Seq[ExecInfo]) = {
val execsWithoutStages = new ArrayBuffer[ExecInfo]()
val perStageSum = execInfos.flatMap { execInfo =>
if (execInfo.stages.size > 1) {
execInfo.stages.map((_, execInfo))
} else if (execInfo.stages.size < 1) {
// we don't know what stage its in or its duration
logDebug(s"No stage associated with ${execInfo.exec} " +
s"so speedup factor isn't applied anywhere.")
execsWithoutStages += execInfo
Seq.empty

// This is to get the mapping between stageId and execs. This is primarily done based on
// accumulatorId. If an Exec has some metrics generated, then an accumulatorId will be
// generated for that Exec. This accumulatorId is used to get the stageId. If an Exec
// doesn't have any metrics, then we will try to get the stageId by looking at the
// neighbor Execs. If either of the neighbor Execs has a stageId, then we will use that
// to assign the same stageId to the current Exec as it's most likely that the current
// Exec is part of the same stage as the neighbor Exec.
val execInfosInOrder = execInfos.reverse
val execsToStageMap = execInfosInOrder.indices.map {
// corner case to handle first element
case 0 => if (execInfosInOrder.size > 1) {
// If there are more than one Execs, then check if the next Exec has a stageId.
checkStageIdInExec(None, execInfosInOrder(0), Some(execInfosInOrder(1)))
} else {
Seq((execInfo.stages.head, execInfo))
checkStageIdInExec(None, execInfosInOrder(0), None)
}
}.groupBy(_._1).map { case (stage, execInfos) =>
(stage, execInfos.map(_._2))
// corner case to handle last element
case i if i == execInfosInOrder.size - 1 && execInfosInOrder.size > 1 =>
// If there are more than one Execs, then check if the previous Exec has a stageId.
checkStageIdInExec(Some(execInfosInOrder(i - 1)), execInfosInOrder(i), None)
case i =>
checkStageIdInExec(Some(execInfosInOrder(i - 1)),
execInfosInOrder(i), Some(execInfosInOrder(i + 1)))
}
(perStageSum, execsWithoutStages.toSeq)
val perStageSum = execsToStageMap.map(_._1).toList.flatten
.groupBy(_._1).map { case (stage, execInfo) =>
(stage, execInfo.map(_._2))
}

// Add all the execs that don't have a stageId to execsWithoutStages.
execsWithoutStages ++= execsToStageMap.map(_._2).toList.flatten

(perStageSum, execsWithoutStages)
}

private def flattenedExecs(execs: Seq[ExecInfo]): Seq[ExecInfo] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly)
"Spark shell","local-1626104300434","Not Recommended",1.01,129484.66,1619.33,2429,1469,131104,2429,88.35,"","","","struct<firstname:string,middlename:array<string>,lastname:string>;struct<current:struct<state:string,city:string>,previous:struct<state:map<string,string>,city:string>>;array<struct<city:string,state:string>>;map<string,string>;map<string,array<string>>;map<string,map<string,string>>;array<array<string>>;array<string>","struct<firstname:string,middlename:array<string>,lastname:string>;struct<current:struct<state:string,city:string>,previous:struct<state:map<string,string>,city:string>>;array<struct<city:string,state:string>>;map<string,array<string>>;map<string,map<string,string>>;array<array<string>>","NESTED COMPLEX TYPE",1260,128847,0,1469,3.0,false,"CollectLimit","",30
"Spark shell","local-1626104300434","Not Recommended",1.0,129898.52,1205.47,2429,1469,131104,1923,88.35,"","","","struct<firstname:string,middlename:array<string>,lastname:string>;struct<current:struct<state:string,city:string>,previous:struct<state:map<string,string>,city:string>>;array<struct<city:string,state:string>>;map<string,string>;map<string,array<string>>;map<string,map<string,string>>;array<array<string>>;array<string>","struct<firstname:string,middlename:array<string>,lastname:string>;struct<current:struct<state:string,city:string>,previous:struct<state:map<string,string>,city:string>>;array<struct<city:string,state:string>>;map<string,array<string>>;map<string,map<string,string>>;array<array<string>>","NESTED COMPLEX TYPE",1260,128847,306,1163,2.68,false,"CollectLimit","",30
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly)
"Spark shell","app-20211019113801-0001","Not Recommended",1.0,569385.42,2581.57,3627,19894,571967,3503,28.41,"","JDBC[*]","","","","",1812,544575,677,19217,3.8,false,"Scan JDBCRelation(TBLS) [numPartitions=1];Execute CreateViewCommand;CollectLimit","",30
"Spark shell","app-20211019113801-0001","Not Recommended",1.0,569439.65,2527.34,3627,19894,571967,3470,28.41,"","JDBC[*]","","","","",1812,544575,859,19035,3.68,false,"Scan JDBCRelation(TBLS) [numPartitions=1];Execute CreateViewCommand;CollectLimit","",30
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly)
"TPC-DS Like Bench q86","app-20210319163812-1778","Not Applicable",1.36,19120.15,7050.84,9569,4320658,26171,9569,0.0,"24","","","","","",9565,3595714,0,4320658,3.8,false,"Execute CreateViewCommand","",30
"TPC-DS Like Bench q86","app-20210319163812-1778","Not Applicable",1.36,19230.84,6940.15,9569,4320658,26171,9569,0.0,"24","","","","","",9565,3595714,0,4320658,3.64,false,"Execute CreateViewCommand","",30
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ App Name,App ID,SQL ID,SQL Description,SQL DF Duration,GPU Opportunity,Estimated
"TPC-DS Like Bench q86","app-20210319163812-1778",17,"Register input tables",0,0,0.0,0.0,0.0,"Not Recommended"
"TPC-DS Like Bench q86","app-20210319163812-1778",8,"Register input tables",0,0,0.0,0.0,0.0,"Not Recommended"
"TPC-DS Like Bench q86","app-20210319163812-1778",23,"Register input tables",0,0,0.0,0.0,0.0,"Not Recommended"
"TPC-DS Like Bench q86","app-20210319163812-1778",24,"Benchmark Run: query=q86; iteration=0",9565,9565,2517.1,3.79,7047.89,"Not Applicable"
"TPC-DS Like Bench q86","app-20210319163812-1778",24,"Benchmark Run: query=q86; iteration=0",9565,9565,2627.74,3.64,6937.25,"Not Applicable"
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly)
"TPC-DS Like Bench q86","app-20210319163812-1778","Recommended",1.36,19120.15,7050.84,9569,4320658,26171,9569,35.34,"","","","","","",9565,3595714,0,4320658,3.8,false,"Execute CreateViewCommand","",30
"TPC-DS Like Bench q86","app-20210319163812-1778","Recommended",1.36,19230.84,6940.15,9569,4320658,26171,9569,35.34,"","","","","","",9565,3595714,0,4320658,3.64,false,"Execute CreateViewCommand","",30
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
App Name,App ID,SQL ID,SQL Description,SQL DF Duration,GPU Opportunity,Estimated GPU Duration,Estimated GPU Speedup,Estimated GPU Time Saved,Recommendation
"TPC-DS Like Bench q86","app-20210319163812-1778",24,"Benchmark Run: query=q86; iteration=0",9565,9565,2517.1,3.79,7047.89,"Strongly Recommended"
"TPC-DS Like Bench q86","app-20210319163812-1778",24,"Benchmark Run: query=q86; iteration=0",9565,9565,2627.74,3.64,6937.25,"Strongly Recommended"
"TPC-DS Like Bench q86","app-20210319163812-1778",0,"Register input tables",2,2,2.0,1.0,0.0,"Not Recommended"
"TPC-DS Like Bench q86","app-20210319163812-1778",21,"Register input tables",1,1,1.0,1.0,0.0,"Not Recommended"
"TPC-DS Like Bench q86","app-20210319163812-1778",5,"Register input tables",1,1,1.0,1.0,0.0,"Not Recommended"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly)
"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Recommended",1.92,8472.65,7846.34,12434,132257,16319,10589,37.7,"","","JSON","","","",7143,4717,19616,112641,3.86,false,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",1
"Spark shell","local-1651187225439","Not Recommended",1.0,355483.43,153.56,760,180,355637,350,87.88,"","JSON[string:bigint:int]","","","","",498,343411,97,83,1.78,false,"SerializeFromObject;CollectLimit;DeserializeToObject;Scan json;Filter;MapElements","",1
"Spark shell","local-1651188809790","Not Recommended",1.0,166199.97,15.02,911,283,166215,45,81.18,"","JSON[string:bigint:int]","","","","UDF",715,133608,269,14,1.5,false,"CollectLimit;Scan json;Project","UDF",1
"Rapids Spark Profiling Tool Unit Tests","local-1623281204390","Not Recommended",1.0,6240.0,0.0,2032,4666,6240,0,46.27,"","JSON[string:bigint:int]","JSON","","","UDF",1209,5793,4664,2,1.0,false,"Execute InsertIntoHadoopFsRelationCommand json;LocalTableScan;Project;Scan json;Execute CreateViewCommand","UDF",1
"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Recommended",1.92,8488.68,7830.31,12434,132257,16319,10577,37.7,"","","JSON","","","",7143,4717,19744,112513,3.85,false,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",1
"Spark shell","local-1651187225439","Not Recommended",1.0,355550.33,86.66,760,180,355637,253,87.88,"","JSON[string:bigint:int]","","","","",498,343411,120,60,1.52,false,"SerializeFromObject;CollectLimit;DeserializeToObject;Scan json;Filter;MapElements","",1
"Spark shell","local-1651188809790","Not Recommended",1.0,166205.19,9.8,911,283,166215,38,81.18,"","JSON[string:bigint:int]","","","","UDF",715,133608,271,12,1.34,false,"CollectLimit;Scan json;Project","UDF",1
"Rapids Spark Profiling Tool Unit Tests","local-1623281204390","Not Recommended",1.0,6240.0,0.0,2032,4666,6240,2,46.27,"","JSON[string:bigint:int]","JSON","","","UDF",1209,5793,4661,5,1.0,false,"Execute InsertIntoHadoopFsRelationCommand json;LocalTableScan;Project;Scan json;Execute CreateViewCommand","UDF",1
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ App Name,App ID,SQL ID,SQL Description,SQL DF Duration,GPU Opportunity,Estimated
"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",1,"count at QualificationInfoUtils.scala:94",7143,6719,2078.49,3.43,5064.5,"Strongly Recommended"
"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",3,"count at QualificationInfoUtils.scala:94",2052,1660,800.56,2.56,1251.43,"Strongly Recommended"
"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",2,"count at QualificationInfoUtils.scala:94",1933,1551,763.96,2.53,1169.03,"Strongly Recommended"
"Spark shell","local-1651187225439",0,"show at <console>:26",498,249,373.5,1.33,124.5,"Recommended"
"Spark shell","local-1651188809790",1,"show at <console>:26",196,98,147.0,1.33,49.0,"Recommended"
"Spark shell","local-1651187225439",1,"show at <console>:26",262,60,240.54,1.08,21.45,"Not Recommended"
"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",0,"json at QualificationInfoUtils.scala:76",1306,187,1246.97,1.04,59.02,"Not Recommended"
"Spark shell","local-1651188809790",1,"show at <console>:26",196,75,165.75,1.18,30.24,"Not Recommended"
"Spark shell","local-1651187225439",0,"show at <console>:26",498,168,430.53,1.15,67.46,"Not Recommended"
"Spark shell","local-1651187225439",1,"show at <console>:26",262,80,240.22,1.09,21.77,"Not Recommended"
"Rapids Spark Profiling Tool Unit Tests","local-1622043423018",0,"json at QualificationInfoUtils.scala:76",1306,164,1267.14,1.03,38.85,"Not Recommended"
"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",0,"json at QualificationInfoUtils.scala:130",1209,0,1209.0,1.0,0.0,"Not Recommended"
"Spark shell","local-1651188809790",0,"show at <console>:26",715,2,715.0,1.0,0.0,"Not Recommended"
"Spark shell","local-1651188809790",0,"show at <console>:26",715,5,715.0,1.0,0.0,"Not Recommended"
"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",2,"json at QualificationInfoUtils.scala:136",321,0,321.0,1.0,0.0,"Not Recommended"
"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",5,"json at QualificationInfoUtils.scala:136",129,0,129.0,1.0,0.0,"Not Recommended"
"Rapids Spark Profiling Tool Unit Tests","local-1623281204390",8,"json at QualificationInfoUtils.scala:136",127,0,127.0,1.0,0.0,"Not Recommended"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly)
"Spark shell","local-1624371544219","Not Recommended",1.0,174691.42,601.57,6695,20421,175293,1034,72.15,"","JSON[string:double:date:int:bigint];Text[*]","JSON","","","",1859,175857,17266,3155,2.39,false,"CollectLimit;Scan json;Execute InsertIntoHadoopFsRelationCommand json;Scan text","",30
"Spark shell","local-1624371544219","Not Recommended",1.0,174006.51,1286.48,6695,20421,175293,2268,72.15,"","JSON[string:double:date:int:bigint];Text[*]","JSON","","","",1859,175953,13469,6952,2.31,false,"CollectLimit;Scan json;Execute InsertIntoHadoopFsRelationCommand json;Scan text","",30
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly)
"Spark shell","local-1624371906627","Not Recommended",1.0,83172.84,565.15,6760,21802,83738,971,71.3,"","Text[*];json[double]","JSON","","","",1984,82505,18668,3134,2.39,false,"BatchScan json;Execute InsertIntoHadoopFsRelationCommand json;CollectLimit;Scan text","",30
"Spark shell","local-1624371906627","Not Recommended",1.01,82304.74,1433.25,6760,21802,83738,2388,71.3,"","Text[*];json[double]","JSON","","","",1984,82601,14064,7738,2.5,false,"BatchScan json;Execute InsertIntoHadoopFsRelationCommand json;CollectLimit;Scan text","",30
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly)
"Spark shell","local-1634253215009","Not Recommended",1.01,46352.24,710.75,1520,359,47063,1011,67.64,"","Text[*]","","","","",1068,44935,120,239,3.36,false,"CollectLimit;Scan text","",30
"Spark shell","local-1634253215009","Not Recommended",1.01,46542.98,520.01,1520,359,47063,817,67.64,"","Text[*]","","","","",1068,44935,166,193,2.75,false,"CollectLimit;Scan text","",30
Loading

0 comments on commit a5683c3

Please sign in to comment.