Skip to content

Commit

Permalink
remove diagnostic from applicationsummaryinfo
Browse files Browse the repository at this point in the history
Signed-off-by: cindyyuanjiang <[email protected]>
  • Loading branch information
cindyyuanjiang committed Nov 13, 2024
1 parent 3a8cf9e commit 1361d53
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ case class ApplicationSummaryInfo(
ioMetrics: Seq[IOAnalysisProfileResult],
sysProps: Seq[RapidsPropertyProfileResult],
sqlCleanedAlignedIds: Seq[SQLCleanAndAlignIdsProfileResult],
sparkRapidsBuildInfo: Seq[SparkRapidsBuildInfoEvent],
stageDiagnostics: Seq[StageDiagnosticResult])
sparkRapidsBuildInfo: Seq[SparkRapidsBuildInfoEvent])

trait AppInfoPropertyGetter {
// returns all the properties (i.e., spark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ class ProfileOutputWriter(outputDir: String, filePrefix: String, numOutputRows:
}
}

def writeCSVTable(headerText: String, outRows: Seq[ProfileResult],
emptyTableText: Option[String] = None, tableDesc: Option[String] = None): Unit = {
ProfileOutputWriter.writeCSVTable(headerText, outRows, outputDir)
}

def close(): Unit = {
textFileWriter.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,11 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
Profiler.COMPARE_LOG_FILE_NAME_PREFIX, numOutputRows, outputCSV = outputCSV)
try {
// we need the info for all of the apps to be able to compare so this happens serially
val (sums, comparedRes) = processApps(apps, printPlans = false, profileOutputWriter)
val (sums, comparedRes, diagnostics) =
processApps(apps, printPlans = false, profileOutputWriter)
progressBar.foreach(_.reportSuccessfulProcesses(apps.size))
writeSafelyToOutput(profileOutputWriter, Seq(sums), false, comparedRes)
writeSafelyToOutput(profileOutputWriter, Seq(sums), false, comparedRes,
Seq(diagnostics))
} catch {
case _: Exception =>
progressBar.foreach(_.reportFailedProcesses(apps.size))
Expand All @@ -97,8 +99,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
// combine them into single tables in the output.
val profileOutputWriter = new ProfileOutputWriter(s"$outputDir/combined",
Profiler.COMBINED_LOG_FILE_NAME_PREFIX, numOutputRows, outputCSV = outputCSV)
val sums = createAppsAndSummarize(eventLogInfos, profileOutputWriter)
writeSafelyToOutput(profileOutputWriter, sums, outputCombined)
val (sums, diagnostics) = createAppsAndSummarize(eventLogInfos, profileOutputWriter)
writeSafelyToOutput(profileOutputWriter, sums, outputCombined, diagnosticSum = diagnostics)
profileOutputWriter.close()
}
} else {
Expand Down Expand Up @@ -226,14 +228,18 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
allApps.asScala.toSeq
}

private def createAppsAndSummarize(allPaths: Seq[EventLogInfo],
profileOutputWriter: ProfileOutputWriter): Seq[ApplicationSummaryInfo] = {
private def createAppsAndSummarize(
allPaths: Seq[EventLogInfo],
profileOutputWriter: ProfileOutputWriter)
: (Seq[ApplicationSummaryInfo], Seq[DiagnosticSummaryInfo]) = {
val allApps = new ConcurrentLinkedQueue[ApplicationSummaryInfo]()
val allDiagnostics = new ConcurrentLinkedQueue[DiagnosticSummaryInfo]()

class ProfileThread(path: EventLogInfo, index: Int) extends Runnable {
def run: Unit = profileApp(path, index, { app =>
val (s, _) = processApps(Seq(app), false, profileOutputWriter)
val (s, _, d) = processApps(Seq(app), false, profileOutputWriter)
allApps.add(s)
allDiagnostics.add(d)
})
}

Expand All @@ -254,7 +260,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
" stopping processing any more event logs")
threadPool.shutdownNow()
}
allApps.asScala.toSeq
(allApps.asScala.toSeq, allDiagnostics.asScala.toSeq)
}

private def createAppAndProcess(
Expand All @@ -265,8 +271,10 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
val profileOutputWriter = new ProfileOutputWriter(s"$outputDir/${app.appId}",
Profiler.PROFILE_LOG_NAME, numOutputRows, outputCSV = outputCSV)
try {
val (sum, _) = processApps(Seq(app), appArgs.printPlans(), profileOutputWriter)
writeSafelyToOutput(profileOutputWriter, Seq(sum), false)
val (sum, _, diagnostics) =
processApps(Seq(app), appArgs.printPlans(), profileOutputWriter)
writeSafelyToOutput(profileOutputWriter, Seq(sum), false,
diagnosticSum = Seq(diagnostics))
} finally {
profileOutputWriter.close()
}
Expand Down Expand Up @@ -311,7 +319,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
apps: Seq[ApplicationInfo],
printPlans: Boolean,
profileOutputWriter: ProfileOutputWriter)
: (ApplicationSummaryInfo, Option[CompareSummaryInfo]) = {
: (ApplicationSummaryInfo, Option[CompareSummaryInfo], DiagnosticSummaryInfo) = {
val startTime = System.currentTimeMillis()

val collect = new CollectInformation(apps)
Expand Down Expand Up @@ -393,8 +401,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
analysis.sqlAggs, analysis.sqlDurAggs, analysis.taskShuffleSkew,
failedTasks, failedStages, failedJobs, removedBMs, removedExecutors,
unsupportedOps, sparkProps, collect.getSQLToStage, wholeStage, maxTaskInputInfo,
appLogPath, analysis.ioAggs, systemProps, sqlIdAlign, sparkRapidsBuildInfo,
analysis.stageDiagnostics), compareRes)
appLogPath, analysis.ioAggs, systemProps, sqlIdAlign, sparkRapidsBuildInfo),
compareRes, DiagnosticSummaryInfo(analysis.stageDiagnostics))
}

/**
Expand Down Expand Up @@ -428,7 +436,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea

def writeOutput(profileOutputWriter: ProfileOutputWriter,
appsSum: Seq[ApplicationSummaryInfo], outputCombined: Boolean,
comparedRes: Option[CompareSummaryInfo] = None): Unit = {
comparedRes: Option[CompareSummaryInfo] = None,
diagnosticSum: Seq[DiagnosticSummaryInfo]): Unit = {

val sums = if (outputCombined) {
// the properties table here has the column names as the app indexes so we have to
Expand Down Expand Up @@ -489,8 +498,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
appsSum.flatMap(_.ioMetrics).sortBy(_.appIndex),
combineProps("system", appsSum).sortBy(_.key),
appsSum.flatMap(_.sqlCleanedAlignedIds).sortBy(_.appIndex),
appsSum.flatMap(_.sparkRapidsBuildInfo),
appsSum.flatMap(_.stageDiagnostics)
appsSum.flatMap(_.sparkRapidsBuildInfo)
)
Seq(reduced)
} else {
Expand Down Expand Up @@ -537,7 +545,6 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
val skewHeader = TASK_SHUFFLE_SKEW
val skewTableDesc = AGG_DESCRIPTION(TASK_SHUFFLE_SKEW)
profileOutputWriter.write(skewHeader, app.skewInfo, tableDesc = Some(skewTableDesc))
profileOutputWriter.write(STAGE_DIAGNOSTICS_LABEL, app.stageDiagnostics)

profileOutputWriter.writeText("\n### C. Health Check###\n")
profileOutputWriter.write(ProfFailedTaskView.getLabel, app.failedTasks)
Expand All @@ -560,6 +567,14 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
profileOutputWriter.writeSparkRapidsBuildInfo("Spark Rapids Build Info",
app.sparkRapidsBuildInfo)
}
val diagnostics = if (outputCombined) {
Seq(DiagnosticSummaryInfo(diagnosticSum.flatMap(_.stageDiagnostics)))
} else {
diagnosticSum
}
diagnostics.foreach { diagnostoic =>
profileOutputWriter.writeCSVTable(STAGE_DIAGNOSTICS_LABEL, diagnostoic.stageDiagnostics)
}
}

/**
Expand All @@ -569,9 +584,10 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
*/
private def writeSafelyToOutput(profileOutputWriter: ProfileOutputWriter,
appsSum: Seq[ApplicationSummaryInfo], outputCombined: Boolean,
comparedRes: Option[CompareSummaryInfo] = None): Unit = {
comparedRes: Option[CompareSummaryInfo] = None,
diagnosticSum: Seq[DiagnosticSummaryInfo]): Unit = {
try {
writeOutput(profileOutputWriter, appsSum, outputCombined, comparedRes)
writeOutput(profileOutputWriter, appsSum, outputCombined, comparedRes, diagnosticSum)
} catch {
case e: Exception =>
logError("Exception thrown while writing", e)
Expand Down

0 comments on commit 1361d53

Please sign in to comment.