Skip to content

Commit

Permalink
Qualification tool: Add penalty for row conversions (#471)
Browse files Browse the repository at this point in the history
* Qualification tool: Add penalty for row conversions

Signed-off-by: Niranjan Artal <[email protected]>

* optimize code

* addressed review comments

* addressed review comments and added test

Signed-off-by: Niranjan Artal <[email protected]>

* fix unit test

* addressed review comments

* addressed review comments and updated test results

Signed-off-by: Niranjan Artal <[email protected]>

* Address review comments

Signed-off-by: Niranjan Artal <[email protected]>

* address review comments

* update tests

* Revert "update tests"

This reverts commit 7e1ebe0.

* add penalty to durations

* change transitiontime calculation

* update variable name

* addressed review comments

* change penaly percentage

---------

Signed-off-by: Niranjan Artal <[email protected]>
  • Loading branch information
nartal1 authored Oct 16, 2023
1 parent 5fd7299 commit 89361b1
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ object QualOutputWriter {
val ESTIMATED_GPU_SPEEDUP = "Estimated GPU Speedup"
val ESTIMATED_GPU_TIMESAVED = "Estimated GPU Time Saved"
val STAGE_ESTIMATED_STR = "Stage Estimated"
val NUM_TRANSITIONS = "Number of transitions from or to GPU"
val UNSUPPORTED_EXECS = "Unsupported Execs"
val UNSUPPORTED_EXPRS = "Unsupported Expressions"
val CLUSTER_TAGS = "Cluster Tags"
Expand Down Expand Up @@ -856,7 +857,8 @@ object QualOutputWriter {
AVERAGE_SPEEDUP_STR -> AVERAGE_SPEEDUP_STR.size,
STAGE_DUR_STR -> STAGE_DUR_STR.size,
UNSUPPORTED_TASK_DURATION_STR -> UNSUPPORTED_TASK_DURATION_STR.size,
STAGE_ESTIMATED_STR -> STAGE_ESTIMATED_STR.size
STAGE_ESTIMATED_STR -> STAGE_ESTIMATED_STR.size,
NUM_TRANSITIONS -> NUM_TRANSITIONS.size
)
detailedHeadersAndFields
}
Expand All @@ -878,7 +880,8 @@ object QualOutputWriter {
headersAndSizes(AVERAGE_SPEEDUP_STR),
info.stageTaskTime.toString -> headersAndSizes(STAGE_DUR_STR),
info.unsupportedTaskDur.toString -> headersAndSizes(UNSUPPORTED_TASK_DURATION_STR),
info.estimated.toString -> headersAndSizes(STAGE_ESTIMATED_STR))
info.estimated.toString -> headersAndSizes(STAGE_ESTIMATED_STR),
info.numTransitions.toString -> headersAndSizes(NUM_TRANSITIONS))
constructOutputRow(data, delimiter, prettyPrint)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
timeout: Option[Long], nThreads: Int, order: String,
pluginTypeChecker: PluginTypeChecker, reportReadSchema: Boolean,
printStdout: Boolean, uiEnabled: Boolean, enablePB: Boolean,
reportSqlLevel: Boolean, maxSQLDescLength: Int, mlOpsEnabled:Boolean) extends Logging {
reportSqlLevel: Boolean, maxSQLDescLength: Int, mlOpsEnabled:Boolean,
penalizeTransitions: Boolean) extends Logging {

private val allApps = new ConcurrentLinkedQueue[QualificationSummaryInfo]()

Expand Down Expand Up @@ -166,7 +167,7 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
try {
val startTime = System.currentTimeMillis()
val appResult = QualificationAppInfo.createApp(path, hadoopConf, pluginTypeChecker,
reportSqlLevel, mlOpsEnabled)
reportSqlLevel, mlOpsEnabled, penalizeTransitions)
val qualAppResult = appResult match {
case Left(errorMessage: String) =>
// Case when an error occurred during QualificationAppInfo creation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
opt[Boolean](required = false,
descr = "Whether to parse ML functions in the eventlogs. Default is false.",
default = Some(false))
val penalizeTransitions: ScallopOption[Boolean] =
toggle("penalize-transitions",
default = Some(true),
prefix = "no-",
descrYes = "Add penalty for ColumnarToRow and RowToColumnar transitions. " +
"Enabled by default.",
descrNo = "Do not add penalty for ColumnarToRow and RowToColumnar transitions.")
val sparkProperty: ScallopOption[List[String]] =
opt[List[String]](required = false,
descr = "Filter applications based on certain Spark properties that were set during " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ object QualificationMain extends Logging {
val reportSqlLevel = appArgs.perSql.getOrElse(false)
val platform = appArgs.platform.getOrElse("onprem")
val mlOpsEnabled = appArgs.mlFunctions.getOrElse(false)
val penalizeTransitions = appArgs.penalizeTransitions.getOrElse(true)

val hadoopConf = RapidsToolsConfUtil.newHadoopConf

Expand Down Expand Up @@ -93,7 +94,7 @@ object QualificationMain extends Logging {

val qual = new Qualification(outputDirectory, numOutputRows, hadoopConf, timeout,
nThreads, order, pluginTypeChecker, reportReadSchema, printStdout, uiEnabled,
enablePB, reportSqlLevel, maxSQLDescLength, mlOpsEnabled)
enablePB, reportSqlLevel, maxSQLDescLength, mlOpsEnabled, penalizeTransitions)
val res = qual.qualifyApps(filteredLogs)
(0, res)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.apache.spark.sql.rapids.tool.qualification

import java.util.concurrent.TimeUnit

import scala.collection.mutable.{ArrayBuffer, HashMap}

import com.nvidia.spark.rapids.tool.EventLogInfo
Expand All @@ -37,7 +39,8 @@ class QualificationAppInfo(
pluginTypeChecker: PluginTypeChecker,
reportSqlLevel: Boolean,
perSqlOnly: Boolean = false,
mlOpsEnabled: Boolean = false)
mlOpsEnabled: Boolean = false,
penalizeTransitions: Boolean = true)
extends AppBase(eventLogInfo, hadoopConf) with Logging {

var appId: String = ""
Expand All @@ -51,6 +54,7 @@ class QualificationAppInfo(
HashMap.empty[Long, StageTaskQualificationSummary]
val stageIdToTaskEndSum: HashMap[Long, StageTaskQualificationSummary] =
HashMap.empty[Long, StageTaskQualificationSummary]
val stageIdToGpuCpuTransitions: HashMap[Int, Int] = HashMap.empty[Int, Int]

val stageIdToSqlID: HashMap[Int, Long] = HashMap.empty[Int, Long]
val sqlIDtoFailures: HashMap[Long, ArrayBuffer[String]] = HashMap.empty[Long, ArrayBuffer[String]]
Expand Down Expand Up @@ -147,8 +151,10 @@ class QualificationAppInfo(

// Look at the total task times for all jobs/stages that aren't SQL or
// SQL but dataset or rdd
private def calculateNonSQLTaskDataframeDuration(taskDFDuration: Long): Long = {
val allTaskTime = stageIdToTaskEndSum.values.map(_.totalTaskDuration).sum
private def calculateNonSQLTaskDataframeDuration(
taskDFDuration: Long,
totalTransitionTime: Long): Long = {
val allTaskTime = stageIdToTaskEndSum.values.map(_.totalTaskDuration).sum + totalTransitionTime
val res = allTaskTime - taskDFDuration
assert(res >= 0)
res
Expand Down Expand Up @@ -241,13 +247,87 @@ class QualificationAppInfo(
stages.map { stageId =>
val stageTaskTime = stageIdToTaskEndSum.get(stageId)
.map(_.totalTaskDuration).getOrElse(0L)
val numTransitions = penalizeTransitions match {
case true => stageIdToGpuCpuTransitions.getOrElse(stageId, 0)
case false => 0
}
val transitionsTime = numTransitions match {
case 0 => 0L // no transitions
case gpuCpuTransitions =>
// Duration to transfer data from GPU to CPU and vice versa.
// Assuming it's a PCI-E Gen3, but also assuming that some of the result could be
// spilled to disk.
// Duration in Spark metrics is in milliseconds and CPU-GPU transfer rate is in bytes/sec.
// So we need to convert the transitions time to milliseconds.
val totalBytesRead = {
stageIdToTaskEndSum.get(stageId).map(_.totalbytesRead).getOrElse(0L)
}
if (totalBytesRead > 0) {
val transitionTime = (totalBytesRead /
QualificationAppInfo.CPU_GPU_TRANSFER_RATE.toDouble) * gpuCpuTransitions
(transitionTime * 1000).toLong // convert to milliseconds
} else {
0L
}

case _ => 0L
}
val finalEachStageUnsupported = if (transitionsTime != 0) {
// Add 50% penalty for unsupported duration if there are transitions. This number
// was randomly picked because it matched roughly what we saw on the experiments
// with customer/nds event logs
(eachStageUnsupported * 0.5 + eachStageUnsupported).toLong
} else {
eachStageUnsupported
}

StageQualSummaryInfo(stageId, allSpeedupFactorAvg, stageTaskTime,
eachStageUnsupported, estimated)
finalEachStageUnsupported, numTransitions, transitionsTime, estimated)
}.toSet
}

private def calculateNumberOfTransitions(allStagesToExecs: Map[Int, Seq[ExecInfo]]): Unit = {
allStagesToExecs.foreach { case (stageId, execs) =>
// Flatten all the Execs within a stage.
// Example: Exchange;WholeStageCodegen (14);Exchange;WholeStageCodegen (13);Exchange
// will be flattened to Exchange;Sort;Exchange;Sort;SortMergeJoin;SortMergeJoin;Exchange;
val allExecs = execs.map(x => if (x.exec.startsWith("WholeStage")) {
x.children.getOrElse(Seq.empty)
} else {
Seq(x)
}).flatten.reverse

// If it's a shuffle stage, then we need to keep the first and last Exchange and remove
// all the intermediate Exchanges as input size is captured in Exchange node.
val dedupedExecs = if (allExecs.size > 2) {
allExecs.head +:
allExecs.tail.init.filter(x => x.exec != "Exchange") :+ allExecs.last
} else {
allExecs
}
// Create a list of transitions by zipping allExecs with itself but with the first element
// This will create a list of adjacent pairs.
// Example: If allExecs = (ScanExec, FilterExec, SortExec, ProjectExec), then it will create
// a list of tuples as follows:
// (ScanExec, FilterExec), (FilterExec, SortExec), (SortExec, ProjectExec)
val transitions = dedupedExecs.zip(dedupedExecs.drop(1)).count {
// If the current execution (currExec) is supported, and the next execution (nextExec)
// is not supported, or if the current execution is not supported and the next execution
// is supported, then we consider this as a transition.
case (currExec, nextExec) => (currExec.isSupported && !nextExec.isSupported) ||
(!currExec.isSupported && nextExec.isSupported)
}
stageIdToGpuCpuTransitions(stageId) = transitions
}
}

def summarizeStageLevel(execInfos: Seq[ExecInfo], sqlID: Long): Set[StageQualSummaryInfo] = {
val (allStagesToExecs, execsNoStage) = getStageToExec(execInfos)

// Get the total number of transitions between CPU and GPU for each stage and
// store it in a Map.
calculateNumberOfTransitions(allStagesToExecs)

if (allStagesToExecs.isEmpty) {
// use job level
// also get the job ids associated with the SQLId
Expand Down Expand Up @@ -302,7 +382,7 @@ class QualificationAppInfo(
val numUnsupportedExecs = execInfos.filterNot(_.isSupported).size
// This is a guestimate at how much wall clock was supported
val numExecs = execInfos.size.toDouble
val numSupportedExecs = (numExecs - numUnsupportedExecs).toDouble
val numSupportedExecs = (numExecs - numUnsupportedExecs)
val ratio = numSupportedExecs / numExecs
val estimateWallclockSupported = (sqlWallClockDuration * ratio).toInt
// don't worry about supported execs for these are these are mostly indicator of I/O
Expand Down Expand Up @@ -428,11 +508,12 @@ class QualificationAppInfo(
val allStagesSummary = perSqlStageSummary.flatMap(_.stageSum)
.map(sum => sum.stageId -> sum).toMap.values.toSeq
val sqlDataframeTaskDuration = allStagesSummary.map(s => s.stageTaskTime).sum
val totalTransitionsTime = allStagesSummary.map(s => s.transitionTime).sum
val unsupportedSQLTaskDuration = calculateSQLUnsupportedTaskDuration(allStagesSummary)
val endDurationEstimated = this.appEndTime.isEmpty && appDuration > 0
val jobOverheadTime = calculateJobOverHeadTime(info.startTime)
val nonSQLDataframeTaskDuration =
calculateNonSQLTaskDataframeDuration(sqlDataframeTaskDuration)
calculateNonSQLTaskDataframeDuration(sqlDataframeTaskDuration, totalTransitionsTime)
val nonSQLTaskDuration = nonSQLDataframeTaskDuration + jobOverheadTime
// note that these ratios are based off the stage times which may be missing some stage
// overhead or execs that didn't have associated stages
Expand Down Expand Up @@ -488,8 +569,11 @@ class QualificationAppInfo(
}

// get the ratio based on the Task durations that we will use for wall clock durations
// totalTransitionTime is the overhead time for ColumnarToRow/RowToColumnar transitions
// which impacts the GPU ratio.
val estimatedGPURatio = if (sqlDataframeTaskDuration > 0) {
supportedSQLTaskDuration.toDouble / sqlDataframeTaskDuration.toDouble
supportedSQLTaskDuration.toDouble / (
sqlDataframeTaskDuration.toDouble + totalTransitionsTime.toDouble)
} else {
1
}
Expand Down Expand Up @@ -670,7 +754,8 @@ class StageTaskQualificationSummary(
val stageAttemptId: Int,
var executorRunTime: Long,
var executorCPUTime: Long,
var totalTaskDuration: Long)
var totalTaskDuration: Long,
var totalbytesRead: Long)

case class QualApplicationInfo(
appName: String,
Expand Down Expand Up @@ -736,6 +821,8 @@ case class StageQualSummaryInfo(
averageSpeedup: Double,
stageTaskTime: Long,
unsupportedTaskDur: Long,
numTransitions: Int,
transitionTime: Long,
estimated: Boolean = false)

object QualificationAppInfo extends Logging {
Expand All @@ -746,6 +833,11 @@ object QualificationAppInfo extends Logging {
val NOT_APPLICABLE = "Not Applicable"
val LOWER_BOUND_RECOMMENDED = 1.3
val LOWER_BOUND_STRONGLY_RECOMMENDED = 2.5
// Below is the total time taken whenever there are ColumnarToRow or RowToColumnar transitions
// This includes the time taken to convert the data from one format to another and the time taken
// to transfer the data from CPU to GPU and vice versa. Current transfer rate is 1GB/s and is
// based on the testing on few candidate eventlogs.
val CPU_GPU_TRANSFER_RATE = 1000000000L

private def handleException(e: Exception, path: EventLogInfo): String = {
val message: String = e match {
Expand Down Expand Up @@ -838,10 +930,11 @@ object QualificationAppInfo extends Logging {
hadoopConf: Configuration,
pluginTypeChecker: PluginTypeChecker,
reportSqlLevel: Boolean,
mlOpsEnabled: Boolean): Either[String, QualificationAppInfo] = {
mlOpsEnabled: Boolean,
penalizeTransitions: Boolean): Either[String, QualificationAppInfo] = {
try {
val app = new QualificationAppInfo(Some(path), Some(hadoopConf), pluginTypeChecker,
reportSqlLevel, false, mlOpsEnabled)
reportSqlLevel, false, mlOpsEnabled, penalizeTransitions)
logInfo(s"${path.eventLog.toString} has App: ${app.appId}")
Right(app)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,26 @@ class QualificationEventProcessor(app: QualificationAppInfo, perSqlOnly: Boolean
super.doSparkListenerTaskEnd(app, event)
// keep all stage task times to see for nonsql duration
val taskSum = app.stageIdToTaskEndSum.getOrElseUpdate(event.stageId, {
new StageTaskQualificationSummary(event.stageId, event.stageAttemptId, 0, 0, 0)
new StageTaskQualificationSummary(event.stageId, event.stageAttemptId, 0, 0, 0, 0)
})
taskSum.executorRunTime += event.taskMetrics.executorRunTime
taskSum.executorCPUTime += NANOSECONDS.toMillis(event.taskMetrics.executorCpuTime)
taskSum.totalTaskDuration += event.taskInfo.duration
// Add the total bytes read from the task if it's available. This is from inputMetrics if
// it is reading from datasource, or shuffleReadMetrics if it is reading from shuffle.
val inputMetrics = event.taskMetrics.inputMetrics
if (inputMetrics != null) {
taskSum.totalbytesRead += inputMetrics.bytesRead
}
val shuffleReadMetrics = event.taskMetrics.shuffleReadMetrics
if (shuffleReadMetrics != null) {
taskSum.totalbytesRead += shuffleReadMetrics.totalBytesRead
}

// Adds in everything (including failures)
app.stageIdToSqlID.get(event.stageId).foreach { sqlID =>
val taskSum = app.sqlIDToTaskEndSum.getOrElseUpdate(sqlID, {
new StageTaskQualificationSummary(event.stageId, event.stageAttemptId, 0, 0, 0)
new StageTaskQualificationSummary(event.stageId, event.stageAttemptId, 0, 0, 0, 0)
})
taskSum.executorRunTime += event.taskMetrics.executorRunTime
taskSum.executorCPUTime += NANOSECONDS.toMillis(event.taskMetrics.executorCpuTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class SQLPlanParserSuite extends BaseTestSuite {
val pluginTypeChecker = new PluginTypeChecker()
assert(allEventLogs.size == 1)
val appResult = QualificationAppInfo.createApp(allEventLogs.head, hadoopConf,
pluginTypeChecker, reportSqlLevel = false, mlOpsEnabled = false)
pluginTypeChecker, reportSqlLevel = false, mlOpsEnabled = false, penalizeTransitions = true)
appResult match {
case Right(app) => app
case Left(_) => throw new AssertionError("Cannot create application")
Expand Down Expand Up @@ -217,6 +217,51 @@ class SQLPlanParserSuite extends BaseTestSuite {
}
}

test("Parse Execs within WholeStageCodeGen in Order") {
TrampolineUtil.withTempDir { eventLogDir =>
val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir,
"Execs within WSCG ") { spark =>
import spark.implicits._
val df = Seq(("foo", 1L, 1.2), ("foo", 2L, 2.2), ("bar", 2L, 3.2),
("bar", 2L, 4.2)).toDF("x", "y", "z")
df.cube($"x", ceil($"y")).count
}
val pluginTypeChecker = new PluginTypeChecker()
val app = createAppFromEventlog(eventLog)
assert(app.sqlPlans.size == 1)
app.sqlPlans.foreach { case (sqlID, plan) =>
val planInfo = SQLPlanParser.parseSQLPlan(app.appId, plan, sqlID, "",
pluginTypeChecker, app)
val allExecInfo = planInfo.execInfo
val expectedAllExecInfoSize = if (ToolUtils.isSpark320OrLater()) {
// AdaptiveSparkPlan, WholeStageCodegen, AQEShuffleRead, Exchange, WholeStageCodegen
5
} else {
// WholeStageCodegen, Exchange, WholeStageCodegen
3
}
val wholeStages = planInfo.execInfo.filter(_.exec.contains("WholeStageCodegen"))
assert(wholeStages.size == 2)
// Expanding the children of WholeStageCodegen
val allExecs = allExecInfo.map(x => if (x.exec.startsWith("WholeStage")) {
x.children.getOrElse(Seq.empty)
} else {
Seq(x)
}).flatten.reverse
val expectedOrder = if (ToolUtils.isSpark320OrLater()) {
// Order should be: LocalTableScan, Expand, HashAggregate, Exchange,
// AQEShuffleRead, HashAggregate, AdaptiveSparkPlan
Seq("LocalTableScan", "Expand", "HashAggregate", "Exchange", "AQEShuffleRead",
"HashAggregate", "AdaptiveSparkPlan")
} else {
// Order should be: LocalTableScan, Expand, HashAggregate, Exchange, HashAggregate
Seq("LocalTableScan", "Expand", "HashAggregate", "Exchange", "HashAggregate")
}
assert(allExecs.map(_.exec) == expectedOrder)
}
}
}

test("HashAggregate") {
TrampolineUtil.withTempDir { eventLogDir =>
val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir,
Expand Down

0 comments on commit 89361b1

Please sign in to comment.