Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize implementation of getAggregateRawMetrics in core-tools #1468

Merged
merged 2 commits into from
Dec 18, 2024

Conversation

amahussein
Copy link
Collaborator

@amahussein amahussein commented Dec 17, 2024

Signed-off-by: Ahmed Hussein (amahussein) [email protected]

Contributes to #1461

This commit improves the implementation of aggregation accross raw metrics by replacing the builtin scala collections with accumulators.

In legacy implementation:

The cost of aggregation was too high:

  • O(m X n): where n is the number of the tasks and m is the number of metric fields in TaskModel
  • Using filterKeys turned to be very expensive because
    • It would eventually visit all the keys in a hashMap. For aggrgation this means that we do O(m X n^2) where m is the number of keys in the hashMap and n is the number of stages per SQL/Stage
    • Internally, it will create a complete hashMap which adds to the memory stress

In new implementation:

  • Using a single case class with var fields is way cheaper because:
    • It costs a single allocation
    • It reduces cache-pollution because the accumulator object is likely to stay in the cache Vs. visiting all the tasks for each time we accumulate a field.
    • This reduced the cost to O(Kn) where n is the number of tasks, and K is a constant (number of fields)
  • Replacing filterKeys with a loop on the stageIDs and using filter and contains:
    • This guarantees that we only visit the keys we are interested in.
    • There are no implicit allocations.

Impact of performance

total CPU Time: 305,576 ms ->  down by 80%
total time: 2,228,968 ms       ->  down by 80% (from 23-28 minutes to 5-6 minutes)
total allocation: 360.43 GB   ->  down by 94%

getAggRawMetrics: 
	Memory 2.55 GB
	CPU 740
    Time 740

Code changes in details

This pull request introduces significant changes to the AppSparkMetricsAnalyzer class and related utility classes to improve the aggregation of Spark metrics. The changes include the addition of helper classes for accumulating metrics and the refactoring of existing methods to use these helpers. The most important changes are outlined below:

Refactoring and Code Simplification:

  • Refactored the AppSparkMetricsAnalyzer class to use the new AggAccumHelper and AggAccumPhotonHelper classes for aggregating metrics, simplifying the code and improving readability. [1] [2] [3] [4]

New Helper Classes:

  • Added AggAccumHelper class to facilitate the accumulation of aggregate metrics, allowing for future customization and parallel processing.
  • Added AggAccumPhotonHelper class to extend AggAccumHelper for Photon-specific metrics, handling shuffle write values and peak memory values.

New Accumulator Classes:

  • Introduced JobAggAccum class to optimize the aggregation of job-level metrics by avoiding the use of the Scala collections API on each field for the entire number of tasks/stages in a job.
  • Introduced SQLAggAccum class to optimize the aggregation of SQL-level metrics, including the calculation of executor CPU ratio and average input bytes read.

Import and Dependency Cleanup:

  • Cleaned up imports in AppSparkMetricsAnalyzer.scala, removing unused imports and consolidating others for better organization. [1] [2]

These changes collectively improve the maintainability and performance of the AppSparkMetricsAnalyzer class by leveraging the new helper and accumulator classes.

Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>

Contributes to NVIDIA#1461

This commit improves the implementation of aggregation accross raw
metrics by replacing the builtin scala collections with accumulators.
@amahussein amahussein added the core_tools Scope the core module (scala) label Dec 17, 2024
@amahussein amahussein self-assigned this Dec 17, 2024
@@ -517,12 +485,4 @@ object AppSparkMetricsAnalyzer {
arr.max
}
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed because it is not used anymore

stageLevelSparkMetrics(index).put(sm.stageInfo.stageId, stageRow)
}
}
}


object AppSparkMetricsAnalyzer {
def getDurations(tcs: Iterable[TaskModel]): (Long, Long, Long, Double) = {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed because it is not used anymore

Copy link
Collaborator

@parthosa parthosa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @amahussein for this design refactor. Minor comments.

@@ -182,66 +176,55 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
if (app.sqlIdToStages.contains(sqlId)) {
val stagesInSQL = app.sqlIdToStages(sqlId)
// TODO: Should we only consider successful tasks?
val cachedResBySQL = stageLevelSparkMetrics(index).filterKeys(stagesInSQL.contains).values
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we combine filter and map and use collect to process in single pass?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

if (profResultsInJob.isEmpty) {
val jobAggAccumulator = new AggAccumHelper()
val perJobRec = jobAggAccumulator.accumPerJob(
jc.stageIds.filter(stageLevelSparkMetrics(index).contains)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, can we replace filter and map by collect?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

*/
def isEmptyAggregates: Boolean = numTasks == 0

def resetFields(): Unit = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we add a comment on why do we need to reset fields here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! Also refactored the code to do that within the class which is better OOP

Copy link
Collaborator Author

@amahussein amahussein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @parthosa and @nartal1 for your comments.
I addressed your comments in the last commit and did a small change in the aggDiagnostics.

In order to test that this PR did not change the behavior, I did a diff between the output folders of the Profiler tool. the output matched perfectly.

*/
def isEmptyAggregates: Boolean = numTasks == 0

def resetFields(): Unit = {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! Also refactored the code to do that within the class which is better OOP

@@ -182,66 +176,55 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
if (app.sqlIdToStages.contains(sqlId)) {
val stagesInSQL = app.sqlIdToStages(sqlId)
// TODO: Should we only consider successful tasks?
val cachedResBySQL = stageLevelSparkMetrics(index).filterKeys(stagesInSQL.contains).values
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

if (profResultsInJob.isEmpty) {
val jobAggAccumulator = new AggAccumHelper()
val perJobRec = jobAggAccumulator.accumPerJob(
jc.stageIds.filter(stageLevelSparkMetrics(index).contains)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

}
}

def minWithEmptyHandling(arr: Iterable[Long]): Long = {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed because it is unused

val nodeNames = sqlAnalyzer.stageToNodeNames.getOrElse(sm.stageInfo.stageId, emptyNodeNames)
val diagnosticMetricsMap =
sqlAnalyzer.stageToDiagnosticMetrics
.getOrElse(sm.stageInfo.stageId, emptyDiagnosticMetrics)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reformated the code because it was not easy to read that withDefaultValue is applied on getOrElse

Comment on lines +325 to +327
AccumProfileResults(0, 0, AccumMetaRef.EMPTY_ACCUM_META_REF, 0L, 0L, 0L, 0L)
val emptyNodeNames = Seq.empty[String]
val emptyDiagnosticMetrics = HashMap.empty[String, AccumProfileResults]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cindyyuanjiang

  • It is better to avoid creating metrics/nodeNames with empty Strings. Because it is harder to notice them and then it could lead to other problems in the CSV files or on joining based on metric names when the string is empty. That's why I replaced empty string with "N/A"
  • Moved the creation of default values outside the map block.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @amahussein!

Copy link
Collaborator

@nartal1 nartal1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @amahussein ! This is a great refactor. Runtime down by 80% and memory usage optimization is nice indeed.

Copy link
Collaborator

@parthosa parthosa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @amahussein. LGTME.

@@ -45,22 +40,19 @@ class AggAccumHelper {

def accumPerStage(taskRecords: Iterable[TaskModel]): TaskMetricsAccumRec = {
val resRec = createStageAccumRecord()
initializeRecord(resRec, taskRecords)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also wondering the need to initializeRecord() before.

@amahussein amahussein merged commit 18b0472 into NVIDIA:dev Dec 18, 2024
15 checks passed
@amahussein amahussein deleted the rapids-tools-1461-part02 branch December 18, 2024 17:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core_tools Scope the core module (scala)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants