Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qualification tool: Add penalty for row conversions #471

Merged
merged 25 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5008ad8
Qualification tool: Add penalty for row conversions
nartal1 Aug 1, 2023
a32d81c
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into apple…
nartal1 Aug 1, 2023
af962b8
optimize code
nartal1 Aug 2, 2023
1ae0eaa
addressed review comments
nartal1 Aug 8, 2023
7da1acf
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
nartal1 Aug 14, 2023
e2dbb12
addressed review comments and added test
nartal1 Aug 18, 2023
29f0e8c
fix unit test
nartal1 Aug 22, 2023
a52803a
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
nartal1 Aug 22, 2023
7b6803f
addressed review comments
nartal1 Aug 23, 2023
2607489
addressed review comments and updated test results
nartal1 Aug 28, 2023
f8a867f
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
nartal1 Sep 19, 2023
0c5b342
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
nartal1 Sep 22, 2023
89fccf4
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
nartal1 Oct 8, 2023
2ba211a
Address review comments
nartal1 Oct 9, 2023
5e06370
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
nartal1 Oct 9, 2023
8dfa245
address review comments
nartal1 Oct 10, 2023
7e1ebe0
update tests
nartal1 Oct 11, 2023
132605f
Revert "update tests"
nartal1 Oct 11, 2023
f608365
add penalty to durations
nartal1 Oct 12, 2023
f8c8d40
change transitiontime calculation
nartal1 Oct 13, 2023
db6eaf0
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
nartal1 Oct 13, 2023
36f8d56
update variable name
nartal1 Oct 13, 2023
4a8ab84
addressed review comments
nartal1 Oct 13, 2023
b17df32
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
nartal1 Oct 14, 2023
052b22f
change penaly percentage
nartal1 Oct 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class QualificationAppInfo(
HashMap.empty[Long, StageTaskQualificationSummary]
val stageIdToTaskEndSum: HashMap[Long, StageTaskQualificationSummary] =
HashMap.empty[Long, StageTaskQualificationSummary]
val stageIdToGpuCpuTransitions: HashMap[Int, Int] = HashMap.empty[Int, Int]

val stageIdToSqlID: HashMap[Int, Long] = HashMap.empty[Int, Long]
val sqlIDtoFailures: HashMap[Long, ArrayBuffer[String]] = HashMap.empty[Long, ArrayBuffer[String]]
Expand Down Expand Up @@ -241,13 +242,49 @@ class QualificationAppInfo(
stages.map { stageId =>
val stageTaskTime = stageIdToTaskEndSum.get(stageId)
.map(_.totalTaskDuration).getOrElse(0L)
val transitionsTime = stageIdToGpuCpuTransitions.getOrElse(stageId, 0) match {
case gpuCpuTransitions if gpuCpuTransitions > 0 =>
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
// Duration to transfer data from GPU to CPU and vice versa.
// Assuming it's a PCI-E Gen3, but also assuming that some of the result could be
// spilled to disk.
// Duration in Spark metrics is in millisecond, so multiply this by 1000 to make
// it consistent
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expand comment as its kind of left hanging, make it consistent between what 2 things

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the comment.

val totalBytesRead =
stageIdToTaskEndSum.get(stageId).map(_.totalbytesRead).getOrElse(0L).toDouble
if (totalBytesRead > 0) {
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
val fallback_duration = (totalBytesRead / QualificationAppInfo.CPU_GPU_TRANSFER_RATE) *
QualificationAppInfo.SECONDS_TO_MILLISECONDS * gpuCpuTransitions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a toMillis and others in java.util.concurrent.TimeUnit

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

fallback_duration.toLong
} else {
0L
}
case _ => 0L
}
// Update totaltaskduration of stageIdToTaskEndSum to include transitions time
val stageIdToTasksMetrics = stageIdToTaskEndSum.get(stageId).orElse(None)
if (stageIdToTasksMetrics.isDefined) {
stageIdToTasksMetrics.get.totalTaskDuration += transitionsTime
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by this, why are we changing the task durations here? this has traditionally been the real task durations then we add/remove things later. Is this because supported + unsupported is now longer due to the transition Time? it seems odd to change it in this datastructure that is the real values from file.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done as we are adding transitionTime to unsupportedTaskDuration i.e unsupportedTaskDuration=eachStageUnsupported + transitionsTime . So the totalTaskDuration should also incude transitionTime else we will end up in a case where unsupportedTaskDuration > totalTaskDuration (which would be incorrect)

Copy link
Collaborator Author

@nartal1 nartal1 Aug 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated this code. Now we are considering transitionTime in unsupportedDurations only. stageTaskTime is the totalTaskDuration from the eventlog. Returning transitionTime from StageQualSummaryInfo so that it could be used in calculation of calculateNonSQLTaskDataframeDuration

}
StageQualSummaryInfo(stageId, allSpeedupFactorAvg, stageTaskTime,
eachStageUnsupported, estimated)
eachStageUnsupported + transitionsTime, estimated)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be nice to report the number of transition we expect in the qual tool stage output, the idea behind that output was to be able to debug or figure out why we came out with a certain recommendation number

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. Added a column for number of transitions in qual tool stage output .

}.toSet
}

def summarizeStageLevel(execInfos: Seq[ExecInfo], sqlID: Long): Set[StageQualSummaryInfo] = {
val (allStagesToExecs, execsNoStage) = getStageToExec(execInfos)

// Get the total number of transitions between CPU and GPU for each stage and
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
// store it in a Map.
allStagesToExecs.foreach { case (stageId, execs) =>
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
val topLevelExecs = execs.filterNot(x => x.exec.startsWith("WholeStage"))
val childrenExecs = execs.flatMap(_.children).flatten
val allExecs = topLevelExecs ++ childrenExecs
val transitions = allExecs.zip(allExecs.drop(1)).count {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment about what this is doing, this is definitely relying on the order and I'm not sure we are great about making sure that is guaranteed. If its required we need to make sure its documented that it has to be in the right order. It would also be nice to have a test to make sure its in the right order when it gets here to make sure someone doesn't break it. I was originally thinking this would be in plan parser but if this is easier I'm ok with it as long as its not brittle.

Copy link
Collaborator Author

@nartal1 nartal1 Aug 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment. You are right that ordered needs to be preserved. Will add a test.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order is not preserved in all the cases. Need to fix this.

case (exec1, exec2) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to be like currExec and nextExec, might help readability

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

(exec1.isSupported && !exec2.isSupported) || (!exec1.isSupported && exec2.isSupported)
}
stageIdToGpuCpuTransitions(stageId) = transitions
}
if (allStagesToExecs.isEmpty) {
// use job level
// also get the job ids associated with the SQLId
Expand Down Expand Up @@ -670,7 +707,8 @@ class StageTaskQualificationSummary(
val stageAttemptId: Int,
var executorRunTime: Long,
var executorCPUTime: Long,
var totalTaskDuration: Long)
var totalTaskDuration: Long,
var totalbytesRead: Long)

case class QualApplicationInfo(
appName: String,
Expand Down Expand Up @@ -746,6 +784,8 @@ object QualificationAppInfo extends Logging {
val NOT_APPLICABLE = "Not Applicable"
val LOWER_BOUND_RECOMMENDED = 1.3
val LOWER_BOUND_STRONGLY_RECOMMENDED = 2.5
val CPU_GPU_TRANSFER_RATE = 10000000L
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
val SECONDS_TO_MILLISECONDS = 1000L

private def handleException(e: Exception, path: EventLogInfo): String = {
val message: String = e match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,26 @@ class QualificationEventProcessor(app: QualificationAppInfo, perSqlOnly: Boolean
super.doSparkListenerTaskEnd(app, event)
// keep all stage task times to see for nonsql duration
val taskSum = app.stageIdToTaskEndSum.getOrElseUpdate(event.stageId, {
new StageTaskQualificationSummary(event.stageId, event.stageAttemptId, 0, 0, 0)
new StageTaskQualificationSummary(event.stageId, event.stageAttemptId, 0, 0, 0, 0)
})
taskSum.executorRunTime += event.taskMetrics.executorRunTime
taskSum.executorCPUTime += NANOSECONDS.toMillis(event.taskMetrics.executorCpuTime)
taskSum.totalTaskDuration += event.taskInfo.duration
// Add the total bytes read from the task if it's available. This is from inputMetrics if
// it is reading from datasource, or shuffleReadMetrics if it is reading from shuffle.
val inputMetrics = event.taskMetrics.inputMetrics
if (inputMetrics != null) {
taskSum.totalbytesRead += inputMetrics.bytesRead
}
val shuffleReadMetrics = event.taskMetrics.shuffleReadMetrics
if (shuffleReadMetrics != null) {
taskSum.totalbytesRead += shuffleReadMetrics.totalBytesRead
}

// Adds in everything (including failures)
app.stageIdToSqlID.get(event.stageId).foreach { sqlID =>
val taskSum = app.sqlIDToTaskEndSum.getOrElseUpdate(sqlID, {
new StageTaskQualificationSummary(event.stageId, event.stageAttemptId, 0, 0, 0)
new StageTaskQualificationSummary(event.stageId, event.stageAttemptId, 0, 0, 0, 0)
})
taskSum.executorRunTime += event.taskMetrics.executorRunTime
taskSum.executorCPUTime += NANOSECONDS.toMillis(event.taskMetrics.executorCpuTime)
Expand Down