Skip to content

Commit

Permalink
minor style updates
Browse files Browse the repository at this point in the history
Signed-off-by: cindyyuanjiang <[email protected]>
  • Loading branch information
cindyyuanjiang committed Dec 24, 2024
1 parent 43f5c84 commit e8c2814
Showing 1 changed file with 7 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,12 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
* to stageToDiagnosticMetrics mapping.
*/
private def updateStageDiagnosticMetrics(accum: AccumProfileResults): Unit = {
val stageId = accum.stageId

// Initialize an empty mapping for the stage if it doesn't already exist
if (!stageToDiagnosticMetrics.contains(stageId)) {
stageToDiagnosticMetrics(stageId) = HashMap.empty[String, AccumProfileResults]
if (!stageToDiagnosticMetrics.contains(accum.stageId)) {
stageToDiagnosticMetrics(accum.stageId) = HashMap.empty[String, AccumProfileResults]
}

stageToDiagnosticMetrics(stageId)(accum.accMetaRef.getName()) = accum
stageToDiagnosticMetrics(accum.stageId)(accum.accMetaRef.getName()) = accum
}

/**
Expand Down Expand Up @@ -401,12 +399,10 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
* @return A sequence of `IODiagnosticResult` objects one per SQL stage and node.
*/
def generateIODiagnosticAccums(): Seq[IODiagnosticResult] = {
// val emptyAccumInfo = new AccumInfo(AccumMetaRef.EMPTY_ACCUM_META_REF)
// Transform the diagnostic metrics map into a sequence of results
IODiagnosticMetricsMap.toSeq.flatMap { case ((sqlId, nodeId), sqlAccums) =>
// Process each stage ID and compute diagnostic results
val stageIds = sqlAccums.head.stageIds
val stageIdsSize = stageIds.size
stageIds.flatMap { stageId =>
val stageTaskIds = getStageTaskIds(stageId)
val nodeName = sqlAccums.head.nodeName
Expand All @@ -417,16 +413,17 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap

// Process each accumulator for the current SQL stage
sqlAccums.foreach { sqlAccum =>
// TODO: need to check if accum ID is in driverAccumMap, currently skipped
// TODO: check if accumulator ID is in driverAccumMap, currently skipped
val accumInfo = app.accumManager.accumInfoMap.get(sqlAccum.accumulatorId)

val metricStats: Option[StatisticsMetrics] =
if (accumInfo.isEmpty || !accumInfo.get.stageValuesMap.contains(stageId)) {
None
} else if (stageIdsSize == 1) {
} else if (stageIds.size == 1) {
// Skip computing statistics when there is only one stage
Some(StatisticsMetrics(sqlAccum.min, sqlAccum.median, sqlAccum.max, sqlAccum.total))
} else {
// Retrieve task updates correspond to the current stage
// Retrieve task updates which correspond to the current stage
val filteredTaskUpdates = filterAccumTaskUpdatesForStage(accumInfo.get, stageTaskIds)
StatisticsMetrics.createOptionalFromArr(filteredTaskUpdates)
}
Expand Down

0 comments on commit e8c2814

Please sign in to comment.