Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qualification tool: Add penalty for row conversions #471

Merged
merged 25 commits into from
Oct 16, 2023

Conversation

nartal1
Copy link
Collaborator

@nartal1 nartal1 commented Aug 2, 2023

This fixes #385

Until now, the qualification tool didn't consider the time taken if there are CPU fallbacks(ColumnarToRow conversions) due to Execs not supported on GPU and RowToColumnar conversions. This PR attempts to add these to total durations so that the qualification tool speedup estimation can be closer to the actual speedup for Spark jobs.

There could be multiple transitions within a stage. i.e there could be few Execs supported on GPU and other which are not supported. But we cannot get the durations per Exec from the Spark metrics i.e there is no mapping between tasks and Execs. We know the input size of each stage whether is read from external source or when there is shuffle. So current implementation takes total input size for each stage and total number of transitions to estimate the total time taken by the transitions.
Have to run benchmarks to get the accurate transfer speed between CPU and GPU and vice versa. Have added the transfer rate based on the speedups obeserved on couple of eventlogs.

@nartal1 nartal1 added the core_tools Scope the core module (scala) label Aug 2, 2023
@nartal1 nartal1 self-assigned this Aug 2, 2023
@nartal1 nartal1 marked this pull request as draft August 2, 2023 00:06
@nartal1 nartal1 marked this pull request as ready for review August 2, 2023 21:21
Copy link
Collaborator

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any other information in the execs like number of rows in the examples we are trying to hit?

I would definitely like to see what output here is on various other workloads. I guess as long as we are more conservative its ok. I'm worried about the cases we read a ton of data but then filter it smaller very quickly, this may be very off.

stageIdToTaskEndSum.get(stageId).map(_.totalbytesRead).getOrElse(0L).toDouble
if (totalBytesRead > 0) {
val fallback_duration = (totalBytesRead / QualificationAppInfo.CPU_GPU_TRANSFER_RATE) *
QualificationAppInfo.SECONDS_TO_MILLISECONDS * gpuCpuTransitions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a toMillis and others in java.util.concurrent.TimeUnit

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// Assuming it's a PCI-E Gen3, but also assuming that some of the result could be
// spilled to disk.
// Duration in Spark metrics is in millisecond, so multiply this by 1000 to make
// it consistent
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expand comment as its kind of left hanging, make it consistent between what 2 things

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the comment.

val topLevelExecs = execs.filterNot(x => x.exec.startsWith("WholeStage"))
val childrenExecs = execs.flatMap(_.children).flatten
val allExecs = topLevelExecs ++ childrenExecs
val transitions = allExecs.zip(allExecs.drop(1)).count {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment about what this is doing, this is definitely relying on the order and I'm not sure we are great about making sure that is guaranteed. If its required we need to make sure its documented that it has to be in the right order. It would also be nice to have a test to make sure its in the right order when it gets here to make sure someone doesn't break it. I was originally thinking this would be in plan parser but if this is easier I'm ok with it as long as its not brittle.

Copy link
Collaborator Author

@nartal1 nartal1 Aug 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment. You are right that ordered needs to be preserved. Will add a test.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order is not preserved in all the cases. Need to fix this.

val childrenExecs = execs.flatMap(_.children).flatten
val allExecs = topLevelExecs ++ childrenExecs
val transitions = allExecs.zip(allExecs.drop(1)).count {
case (exec1, exec2) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to be like currExec and nextExec, might help readability

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

StageQualSummaryInfo(stageId, allSpeedupFactorAvg, stageTaskTime,
eachStageUnsupported, estimated)
eachStageUnsupported + transitionsTime, estimated)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be nice to report the number of transition we expect in the qual tool stage output, the idea behind that output was to be able to debug or figure out why we came out with a certain recommendation number

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. Added a column for number of transitions in qual tool stage output .

@@ -241,13 +262,74 @@ class QualificationAppInfo(
stages.map { stageId =>
val stageTaskTime = stageIdToTaskEndSum.get(stageId)
.map(_.totalTaskDuration).getOrElse(0L)
val numTransitions = stageIdToGpuCpuTransitions.getOrElse(stageId, 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a flag that forces numTransitions to be 0, then theoretically we can disable the fall-back penalty, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the PR which adds a config for transitions. Default is true, we can disable by setting the config. Please let me know if it's fine.

@amahussein
Copy link
Collaborator

Needs an up-merge to get rid of the python failures

amahussein
amahussein previously approved these changes Aug 23, 2023
Copy link
Collaborator

@amahussein amahussein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @nartal1 !

@@ -174,6 +179,23 @@ class QualificationAppInfo(
res
}

private def calculateNoExecsStageDurations(all: Seq[StageQualSummaryInfo]): Long = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit rename to calculateExecsNoStageDurations or actually this is durations due to transitions? then it should have something liek that in the name.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this method. Filed a follow on issue to add penalties for execs not associated with any stages - #514

case false => stageIdToGpuCpuTransitions.getOrElse(stageId, 0)
case true => 0
}
// val numTransitions = stageIdToGpuCpuTransitions.getOrElse(stageId, 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove commented out line

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// Update totaltaskduration of stageIdToTaskEndSum to include transitions time
val stageIdToTasksMetrics = stageIdToTaskEndSum.get(stageId).orElse(None)
if (stageIdToTasksMetrics.isDefined) {
stageIdToTasksMetrics.get.totalTaskDuration += transitionsTime
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by this, why are we changing the task durations here? this has traditionally been the real task durations then we add/remove things later. Is this because supported + unsupported is now longer due to the transition Time? it seems odd to change it in this datastructure that is the real values from file.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done as we are adding transitionTime to unsupportedTaskDuration i.e unsupportedTaskDuration=eachStageUnsupported + transitionsTime . So the totalTaskDuration should also incude transitionTime else we will end up in a case where unsupportedTaskDuration > totalTaskDuration (which would be incorrect)

Copy link
Collaborator Author

@nartal1 nartal1 Aug 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated this code. Now we are considering transitionTime in unsupportedDurations only. stageTaskTime is the totalTaskDuration from the eventlog. Returning transitionTime from StageQualSummaryInfo so that it could be used in calculation of calculateNonSQLTaskDataframeDuration

@@ -161,11 +166,11 @@ class QualificationAppInfo(
}

private def calculateSQLSupportedTaskDuration(all: Seq[StageQualSummaryInfo]): Long = {
all.map(s => s.stageTaskTime - s.unsupportedTaskDur).sum
all.map(s => s.stageTaskTime - s.unsupportedTaskDur).sum - calculateNoExecsStageDurations(all)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so I'm a bit unclear how this work with the job overhead we add later and/or the mapping we try to do with execs without stages already.

is this counting it twice?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this.

private def calculateNoExecsStageDurations(all: Seq[StageQualSummaryInfo]): Long = {
// If there are Execs not associated with any stage, then some of the Execs may not be
// supported on GPU. We need to estimate the duration of these Execs and add it to the
// unsupportedTaskDur. We estimate the duration by taking the average of the unsupportedTaskDur
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow this estimation. We are trying to give some penalty for execs that have transitions but don't map to a stage (ie we don't have a duration), correct?

I'm wondering if we are already calculating this with like either the stages with no execs or job overhead time.

@nartal1
Copy link
Collaborator Author

nartal1 commented Sep 6, 2023

Marking this as draft for doing more tests to get better a bandwidth number.

@nartal1 nartal1 marked this pull request as draft September 6, 2023 19:08
@nartal1 nartal1 marked this pull request as ready for review October 9, 2023 18:24
@nartal1 nartal1 requested a review from tgravescs October 10, 2023 02:03
@nartal1
Copy link
Collaborator Author

nartal1 commented Oct 16, 2023

Thanks for the review @tgravescs and @mattahrens! Merging this.

@nartal1 nartal1 merged commit 89361b1 into NVIDIA:dev Oct 16, 2023
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core_tools Scope the core module (scala)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] Qualification tool recommends jobs with many expensive row conversions
4 participants