Skip to content

Commit

Permalink
Remove estimated GPU duration from qualification output (#1412)
Browse files Browse the repository at this point in the history
* Remove estimated GPU duration from qualification output

Signed-off-by: Ahmed Hussein <[email protected]>

Fixes #1411

Removes two columns from the core qualification's output
`Estimated GPU Duration` and `Estimated GPU Time Saved` from 2 files `rapids_4_spark_qualification_output.csv` and `rapids_4_spark_qualification_output_persql.csv`

---------

Signed-off-by: Ahmed Hussein <[email protected]>
  • Loading branch information
amahussein authored Nov 5, 2024
1 parent 27ed846 commit 8eaeaa9
Show file tree
Hide file tree
Showing 25 changed files with 136 additions and 229 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,7 @@ class QualOutputWriter(outputDir: String, reportReadSchema: Boolean,
sums: Seq[QualificationSummaryInfo]): Seq[EstimatedPerSQLSummaryInfo] = {
val estSumPerSql = sums.flatMap(_.perSQLEstimatedInfo).flatten
val sortedAsc = estSumPerSql.sortBy(sum => {
(sum.info.recommendation, sum.info.estimatedGpuSpeedup,
sum.info.estimatedGpuTimeSaved, sum.info.appDur, sum.info.appId)
(sum.info.gpuOpportunity, sum.info.appDur, sum.info.appId)
})
if (QualificationArgs.isOrderAsc(prettyPrintOrder)) {
sortedAsc
Expand Down Expand Up @@ -362,8 +361,6 @@ class QualOutputWriter(outputDir: String, reportReadSchema: Boolean,
case class FormattedQualificationSummaryInfo(
appName: String,
appId: String,
estimatedGpuDur: Double,
estimatedGpuTimeSaved: Double,
sqlDataframeDuration: Long,
sqlDataframeTaskDuration: Long,
appDuration: Long,
Expand Down Expand Up @@ -426,8 +423,6 @@ object QualOutputWriter {
val EXEC_CHILDREN = "Exec Children"
val EXEC_CHILDREN_NODE_IDS = "Exec Children Node Ids"
val GPU_OPPORTUNITY_STR = "GPU Opportunity"
val ESTIMATED_GPU_DURATION = "Estimated GPU Duration"
val ESTIMATED_GPU_TIMESAVED = "Estimated GPU Time Saved"
val STAGE_ESTIMATED_STR = "Stage Estimated"
val NUM_TRANSITIONS = "Number of transitions from or to GPU"
val UNSUPPORTED_EXECS = "Unsupported Execs"
Expand Down Expand Up @@ -623,8 +618,6 @@ object QualOutputWriter {
val detailedHeadersAndFields = LinkedHashMap[String, Int](
APP_NAME_STR -> getMaxSizeForHeader(appInfos.map(_.appName.size), APP_NAME_STR),
APP_ID_STR -> QualOutputWriter.getAppIdSize(appInfos),
ESTIMATED_GPU_DURATION -> ESTIMATED_GPU_DURATION.size,
ESTIMATED_GPU_TIMESAVED -> ESTIMATED_GPU_TIMESAVED.size,
SQL_DUR_STR -> SQL_DUR_STR_SIZE,
TASK_DUR_STR -> TASK_DUR_STR.size,
APP_DUR_STR -> APP_DUR_STR_SIZE,
Expand Down Expand Up @@ -683,8 +676,6 @@ object QualOutputWriter {
APP_DUR_STR -> APP_DUR_STR_SIZE,
SQL_DUR_STR -> SQL_DUR_STR_SIZE,
GPU_OPPORTUNITY_STR -> GPU_OPPORTUNITY_STR_SIZE,
ESTIMATED_GPU_DURATION -> ESTIMATED_GPU_DURATION.size,
ESTIMATED_GPU_TIMESAVED -> ESTIMATED_GPU_TIMESAVED.size,
UNSUPPORTED_EXECS -> unSupExecMaxSize,
UNSUPPORTED_EXPRS -> unSupExprMaxSize,
ESTIMATED_FREQUENCY -> estimatedFrequencyMaxSize
Expand Down Expand Up @@ -716,10 +707,6 @@ object QualOutputWriter {
sumInfo.estimatedInfo.appDur.toString -> APP_DUR_STR_SIZE,
sumInfo.estimatedInfo.sqlDfDuration.toString -> SQL_DUR_STR_SIZE,
sumInfo.estimatedInfo.gpuOpportunity.toString -> GPU_OPPORTUNITY_STR_SIZE,
ToolUtils.formatDoublePrecision(sumInfo.estimatedInfo.estimatedGpuDur) ->
ESTIMATED_GPU_DURATION.size,
ToolUtils.formatDoublePrecision(sumInfo.estimatedInfo.estimatedGpuTimeSaved) ->
ESTIMATED_GPU_TIMESAVED.size,
sumInfo.estimatedInfo.unsupportedExecs -> unSupExecMaxSize,
sumInfo.estimatedInfo.unsupportedExprs -> unSupExprMaxSize,
sumInfo.estimatedFrequency.toString -> estimatedFrequencyMaxSize
Expand Down Expand Up @@ -750,9 +737,7 @@ object QualOutputWriter {
SQL_ID_STR -> SQL_ID_STR.size,
SQL_DESC_STR -> sqlDescLength,
SQL_DUR_STR -> SQL_DUR_STR_SIZE,
GPU_OPPORTUNITY_STR -> GPU_OPPORTUNITY_STR_SIZE,
ESTIMATED_GPU_DURATION -> ESTIMATED_GPU_DURATION.size,
ESTIMATED_GPU_TIMESAVED -> ESTIMATED_GPU_TIMESAVED.size
GPU_OPPORTUNITY_STR -> GPU_OPPORTUNITY_STR_SIZE
)
detailedHeadersAndFields
}
Expand Down Expand Up @@ -783,10 +768,7 @@ object QualOutputWriter {
reformatCSVFunc(formatSQLDescription(sumInfo.sqlDesc, maxSQLDescLength, delimiter)) ->
headersAndSizes(SQL_DESC_STR),
sumInfo.info.sqlDfDuration.toString -> SQL_DUR_STR_SIZE,
sumInfo.info.gpuOpportunity.toString -> GPU_OPPORTUNITY_STR_SIZE,
ToolUtils.formatDoublePrecision(sumInfo.info.estimatedGpuDur) -> ESTIMATED_GPU_DURATION.size,
ToolUtils.formatDoublePrecision(sumInfo.info.estimatedGpuTimeSaved) ->
ESTIMATED_GPU_TIMESAVED.size
sumInfo.info.gpuOpportunity.toString -> GPU_OPPORTUNITY_STR_SIZE
)
constructOutputRow(data, delimiter, prettyPrint)
}
Expand Down Expand Up @@ -1113,8 +1095,6 @@ object QualOutputWriter {
FormattedQualificationSummaryInfo(
appInfo.appName,
appInfo.appId,
ToolUtils.truncateDoubleToTwoDecimal(appInfo.estimatedInfo.estimatedGpuDur),
ToolUtils.truncateDoubleToTwoDecimal(appInfo.estimatedInfo.estimatedGpuTimeSaved),
appInfo.estimatedInfo.sqlDfDuration,
appInfo.sqlDataframeTaskDuration,
appInfo.estimatedInfo.appDur,
Expand Down Expand Up @@ -1150,8 +1130,6 @@ object QualOutputWriter {
val data = ListBuffer[(String, Int)](
reformatCSVFunc(appInfo.appName) -> headersAndSizes(APP_NAME_STR),
reformatCSVFunc(appInfo.appId) -> headersAndSizes(APP_ID_STR),
appInfo.estimatedGpuDur.toString -> ESTIMATED_GPU_DURATION.size,
appInfo.estimatedGpuTimeSaved.toString -> ESTIMATED_GPU_TIMESAVED.size,
appInfo.sqlDataframeDuration.toString -> headersAndSizes(SQL_DUR_STR),
appInfo.sqlDataframeTaskDuration.toString -> headersAndSizes(TASK_DUR_STR),
appInfo.appDuration.toString -> headersAndSizes(APP_DUR_STR),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ class Qualification(outputPath: String, numRows: Int, hadoopConf: Configuration,
allAppsSum: Seq[QualificationSummaryInfo]): Seq[QualificationSummaryInfo] = {
// Default sorting for of the csv files. Use the endTime to break the tie.
allAppsSum.sortBy(sum => {
(sum.estimatedInfo.recommendation, sum.estimatedInfo.estimatedGpuSpeedup,
sum.estimatedInfo.estimatedGpuTimeSaved, sum.startTime + sum.estimatedInfo.appDur)
(sum.estimatedInfo.gpuOpportunity, sum.startTime + sum.estimatedInfo.appDur)
}).reverse
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,7 @@ class RunningQualificationApp(
val wallClockDur = sqlInfo.duration.getOrElse(0L)
// get task duration ratio
val sqlStageSums = perSqlStageSummary.filter(_.sqlID == pInfo.sqlID)
val estimatedInfo = getPerSQLWallClockSummary(sqlStageSums, wallClockDur,
sqlIDtoFailures.get(pInfo.sqlID).nonEmpty, getAppName)
val estimatedInfo = getPerSQLWallClockSummary(sqlStageSums, wallClockDur, getAppName)
EstimatedPerSQLSummaryInfo(pInfo.sqlID, sqlInfo.rootExecutionID, pInfo.sqlDesc,
estimatedInfo)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,6 @@ class QualificationAppInfo(
all.map(_.unsupportedTaskDur).sum
}

private def calculateSpeedupFactor(all: Seq[StageQualSummaryInfo]): Double = {
val allSpeedupFactors = all.filter(_.stageTaskTime > 0).map(_.averageSpeedup)
val res = SQLPlanParser.averageSpeedup(allSpeedupFactors)
res
}

private def getAllReadFileFormats: Seq[String] = {
dataSourceInfo.map { ds =>
s"${ds.format.toLowerCase()}[${ds.schema}]"
Expand Down Expand Up @@ -542,8 +536,7 @@ class QualificationAppInfo(
val wallClockDur = info.duration.getOrElse(0L)
// get task duration ratio
val sqlStageSums = perSqlStageSummary.filter(_.sqlID == pInfo.sqlID)
val estimatedInfo = getPerSQLWallClockSummary(sqlStageSums, wallClockDur,
sqlIDtoFailures.get(pInfo.sqlID).nonEmpty, appName)
val estimatedInfo = getPerSQLWallClockSummary(sqlStageSums, wallClockDur, appName)
EstimatedPerSQLSummaryInfo(pInfo.sqlID, info.rootExecutionID, pInfo.sqlDesc,
estimatedInfo)
}
Expand Down Expand Up @@ -576,7 +569,6 @@ class QualificationAppInfo(
// note that these ratios are based off the stage times which may be missing some stage
// overhead or execs that didn't have associated stages
val supportedSQLTaskDuration = calculateSQLSupportedTaskDuration(allStagesSummary)
val taskSpeedupFactor = calculateSpeedupFactor(allStagesSummary)
// Get all the unsupported Execs from the plan
val unSupportedExecs = planInfos.flatMap { p =>
// WholeStageCodeGen is excluded from the result.
Expand Down Expand Up @@ -624,9 +616,8 @@ class QualificationAppInfo(
// set the format to match the output. Removing the truncation from here requires modifying
// TestQualificationSummary to truncate the same fields to match the CSV static samples.
val estimatedInfo = QualificationAppInfo.calculateEstimatedInfoSummary(appDurationNoOverhead,
sqlDfWallEstimatedRatio, supportedsqlDfWallEstimatedRatio, taskSpeedupFactor,
appDuration, appName, appId, sqlIdsWithFailures.nonEmpty, mlFuncReportInfo.speedup,
mlFuncReportInfo.mlWallClockDur, unSupportedExecs, unSupportedExprs, allClusterTagsMap)
sqlDfWallEstimatedRatio, supportedsqlDfWallEstimatedRatio,
appDuration, appName, appId, unSupportedExecs, unSupportedExprs, allClusterTagsMap)

val clusterSummary = ClusterSummary(info.appName, appId,
eventLogInfo.map(_.eventLog.toString), platform.clusterInfoFromEventLog,
Expand All @@ -637,7 +628,7 @@ class QualificationAppInfo(
notSupportFormatAndTypesString, getAllReadFileFormats, writeFormat,
allComplexTypes, nestedComplexTypes, longestSQLDuration, sqlDataframeTaskDuration,
nonSQLTaskDuration, unsupportedSQLTaskDuration, supportedSQLTaskDuration,
taskSpeedupFactor, info.sparkUser, info.startTime, estimatedInfo.sqlDfDuration,
info.sparkUser, info.startTime, estimatedInfo.sqlDfDuration,
origPlanInfos, origPlanInfosSummary.map(_.stageSum).flatten,
perSqlStageSummary.map(_.stageSum).flatten, estimatedInfo, perSqlInfos,
unSupportedExecs, unSupportedExprs, clusterTags, allClusterTagsMap,
Expand Down Expand Up @@ -676,11 +667,10 @@ class QualificationAppInfo(
}

def getPerSQLWallClockSummary(sqlStageSums: Seq[SQLStageSummary], sqlDataFrameDuration: Long,
hasFailures: Boolean, appName: String): EstimatedAppInfo = {
appName: String): EstimatedAppInfo = {
val allStagesSummary = sqlStageSums.flatMap(_.stageSum)
val sqlDataframeTaskDuration = allStagesSummary.map(_.stageTaskTime).sum
val supportedSQLTaskDuration = calculateSQLSupportedTaskDuration(allStagesSummary)
val taskSpeedupFactor = calculateSpeedupFactor(allStagesSummary)

// since this is per sql output we are ignoring stages outside the sql so the
// ratio here should just be 1
Expand All @@ -702,8 +692,8 @@ class QualificationAppInfo(
val appDuration = appDurationNoOverhead

QualificationAppInfo.calculateEstimatedInfoSummary(appDurationNoOverhead,
sqlDfWallEstimatedRatio, supportedsqlDfWallEstimatedRatio, taskSpeedupFactor,
appDuration, appName, appId, hasFailures)
sqlDfWallEstimatedRatio, supportedsqlDfWallEstimatedRatio,
appDuration, appName, appId)
}

private def getAllSQLDurations: Seq[Long] = {
Expand Down Expand Up @@ -897,10 +887,6 @@ case class EstimatedAppInfo(
appDur: Long,
sqlDfDuration: Long,
gpuOpportunity: Long, // Projected opportunity for acceleration on GPU in ms
estimatedGpuDur: Double, // Predicted runtime for the app if it was run on the GPU
estimatedGpuSpeedup: Double, // app_duration / estimated_gpu_duration
estimatedGpuTimeSaved: Double, // app_duration - estimated_gpu_duration
recommendation: String,
unsupportedExecs: String,
unsupportedExprs: String,
allTagsMap: Map[String, String])
Expand Down Expand Up @@ -968,7 +954,6 @@ case class QualificationSummaryInfo(
nonSqlTaskDurationAndOverhead: Long,
unsupportedSQLTaskDuration: Long,
supportedSQLTaskDuration: Long,
taskSpeedupFactor: Double,
user: String,
startTime: Long,
sparkSqlDFWallClockDuration: Long,
Expand Down Expand Up @@ -1002,38 +987,21 @@ case class StageQualSummaryInfo(
unsupportedExecs: Seq[ExecInfo] = Seq.empty)

object QualificationAppInfo extends Logging {
// define recommendation constants
val RECOMMENDED = "Recommended"
val NOT_RECOMMENDED = "Not Recommended"
val STRONGLY_RECOMMENDED = "Strongly Recommended"
val NOT_APPLICABLE = "Not Applicable"
val LOWER_BOUND_RECOMMENDED = 1.3
val LOWER_BOUND_STRONGLY_RECOMMENDED = 2.5
// Below is the total time taken whenever there are ColumnarToRow or RowToColumnar transitions
// This includes the time taken to convert the data from one format to another and the time taken
// to transfer the data from CPU to GPU and vice versa. Current transfer rate is 1GB/s and is
// based on the testing on few candidate eventlogs.
val CPU_GPU_TRANSFER_RATE = 1000000000L

def getRecommendation(totalSpeedup: Double,
hasFailures: Boolean): String = {
if (hasFailures) {
NOT_APPLICABLE
} else if (totalSpeedup >= LOWER_BOUND_STRONGLY_RECOMMENDED) {
STRONGLY_RECOMMENDED
} else if (totalSpeedup >= LOWER_BOUND_RECOMMENDED) {
RECOMMENDED
} else {
NOT_RECOMMENDED
}
}

def calculateEstimatedInfoSummary(appDurationNoOverhead: Long,
def calculateEstimatedInfoSummary(
appDurationNoOverhead: Long,
sqlDfWallEstimatedRatio: Double,
supportedsqlDfWallEstimatedRatio: Double, taskSpeedupFactor: Double,
appDuration: Long, appName: String, appId: String,
hasFailures: Boolean, mlSpeedup: Double = 1.0, mlWallClockDur: Double = 0.0,
unSupportedExecs: String = "", unSupportedExprs: String = "",
supportedsqlDfWallEstimatedRatio: Double,
appDuration: Long,
appName: String,
appId: String,
unSupportedExecs: String = "",
unSupportedExprs: String = "",
allClusterTagsMap: Map[String, String] = Map.empty[String, String]): EstimatedAppInfo = {

// apply the task ratio we calculated above to the app duration wall clock to get
Expand All @@ -1044,27 +1012,6 @@ object QualificationAppInfo extends Logging {
// SQL and Dataframe operations
val supportedSqlDfWallDuration = appDurationNoOverhead * supportedsqlDfWallEstimatedRatio

// this includes the not supported SQL, nonSQL (excluding supported ml) and job overhead
// Note that the wall clock for ml here is not completely the same since using stage
// wall clock and not ratio of task time to app duration but good enough for now
val estWallClockDurNotOnGpuNoSupportedML = appDuration - supportedSqlDfWallDuration -
mlWallClockDur
val mlWallClockDurWithSpeedup = if (mlWallClockDur > 0) {
(mlWallClockDur / mlSpeedup)
} else {
0
}
// this is the new estimated wall clock time when we apply the GPU speedup factors
val estWallClockDurWithGpuAccel = (supportedSqlDfWallDuration / taskSpeedupFactor) +
estWallClockDurNotOnGpuNoSupportedML + mlWallClockDurWithSpeedup
val appDurSpeedupWithGPUAccel = if (appDuration == 0 || estWallClockDurWithGpuAccel == 0) {
0
} else {
appDuration / estWallClockDurWithGpuAccel
}
val estGpuTimeSaved = appDuration - estWallClockDurWithGpuAccel
val recommendation = getRecommendation(appDurSpeedupWithGPUAccel, hasFailures)

// truncate the double fields to double precision to ensure that unit-tests do not explicitly
// set the format to match the output. Removing the truncation from here requires modifying
// TestQualificationSummary to truncate the same fields to match the CSV static samples.
Expand All @@ -1073,10 +1020,6 @@ object QualificationAppInfo extends Logging {
appDuration,
sqlDfWallDuration.toLong,
supportedSqlDfWallDuration.toLong,
estWallClockDurWithGpuAccel,
appDurSpeedupWithGPUAccel,
estGpuTimeSaved,
recommendation,
unSupportedExecs,
unSupportedExprs,
allClusterTagsMap)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,SQL Stage Durations Sum,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly), Total Core Seconds
"Spark shell","local-1626104300434",130285.11,818.88,1500,1469,131104,1315,88.35,"","","","struct<firstname:string,middlename:array<string>,lastname:string>;struct<current:struct<state:string,city:string>,previous:struct<state:map<string,string>,city:string>>;array<struct<city:string,state:string>>;map<string,string>;map<string,array<string>>;map<string,map<string,string>>;array<array<string>>;array<string>","struct<firstname:string,middlename:array<string>,lastname:string>;struct<current:struct<state:string,city:string>,previous:struct<state:map<string,string>,city:string>>;array<struct<city:string,state:string>>;map<string,array<string>>;map<string,map<string,string>>;array<array<string>>","NESTED COMPLEX TYPE",1260,1388,129598,181,1288,false,"CollectLimit","",30,1564
App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,SQL Stage Durations Sum,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly), Total Core Seconds
"Spark shell","local-1626104300434",1500,1469,131104,1315,88.35,"","","","struct<firstname:string,middlename:array<string>,lastname:string>;struct<current:struct<state:string,city:string>,previous:struct<state:map<string,string>,city:string>>;array<struct<city:string,state:string>>;map<string,string>;map<string,array<string>>;map<string,map<string,string>>;array<array<string>>;array<string>","struct<firstname:string,middlename:array<string>,lastname:string>;struct<current:struct<state:string,city:string>,previous:struct<state:map<string,string>,city:string>>;array<struct<city:string,state:string>>;map<string,array<string>>;map<string,map<string,string>>;array<array<string>>","NESTED COMPLEX TYPE",1260,1388,129598,181,1288,false,"CollectLimit","",30,1564
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,SQL Stage Durations Sum,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly),Total Core Seconds
"Spark shell","local-1623876083964",64013.8,69843.19,119353,1417661,133857,92667,91.14,"","","","","","",119903,143821,14504,316964,1100697,false,"Scan;SerializeFromObject","",30,1599
App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,SQL Stage Durations Sum,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly),Total Core Seconds
"Spark shell","local-1623876083964",119353,1417661,133857,92667,91.14,"","","","","","",119903,143821,14504,316964,1100697,false,"Scan;SerializeFromObject","",30,1599
Loading

0 comments on commit 8eaeaa9

Please sign in to comment.