Skip to content

Commit

Permalink
Profiling tool throws NPE when appInfo is null and unchecked
Browse files Browse the repository at this point in the history
Signed-off-by: Kuhu Shukla <[email protected]>
  • Loading branch information
kuhushukla committed Dec 29, 2023
1 parent 40b1b9e commit c94115c
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -388,11 +388,12 @@ class Analysis(apps: Seq[ApplicationInfo]) {
val allRows = apps.flatMap { app =>
app.sqlIdToInfo.map { case (sqlId, sqlCase) =>
SQLDurationExecutorTimeProfileResult(app.index, app.appId, sqlId, sqlCase.duration,
sqlCase.hasDatasetOrRDD, app.appInfo.duration, sqlCase.problematic,
sqlCase.hasDatasetOrRDD,
Option(app.appInfo).flatMap(_.duration).orElse(Option(0L)),
sqlCase.problematic,
sqlCase.sqlCpuTimePercent)
}
}

if (allRows.size > 0) {
val sortedRows = allRows.sortBy { cols =>
val sortDur = cols.duration.getOrElse(0L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,25 @@ case class StageMetrics(numTasks: Int, duration: String)
class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {

def getAppInfo: Seq[AppInfoProfileResults] = {
val allRows = apps.map { app =>
val a = app.appInfo
val allRows = apps.collect {
case app if app.appInfo != null => val a = app.appInfo
AppInfoProfileResults(app.index, a.appName, a.appId,
a.sparkUser, a.startTime, a.endTime, a.duration,
a.durationStr, a.sparkVersion, a.pluginEnabled)
}
if (allRows.size > 0) {
if (allRows.nonEmpty) {
allRows.sortBy(cols => (cols.appIndex))
} else {
Seq.empty
}
}

def getAppLogPath: Seq[AppLogPathProfileResults] = {
val allRows = apps.map { app =>
val a = app.appInfo
val allRows = apps.collect {
case app if app.appInfo != null => val a = app.appInfo
AppLogPathProfileResults(app.index, a.appName, a.appId, app.eventLogPath)
}
if (allRows.size > 0) {
if (allRows.nonEmpty) {
allRows.sortBy(cols => (cols.appIndex))
} else {
Seq.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,17 @@ class AnalysisSuite extends FunSuite {
val containsDs = sqlDurAndCpu.filter(_.containsDataset === true)
assert(containsDs.size == 1)
}

test("test duration for null appInfo") {
val qualLogDir = ToolTestUtils.getTestResourcePath("spark-events-qualification")
val logs = Array(s"$qualLogDir/dataset_eventlog")

val apps = ToolTestUtils.processProfileApps(logs, sparkSession)
apps.foreach { app =>
app.appInfo = null
}
val analysis = new Analysis(apps)
val metrics = analysis.sqlMetricsAggregationDurationAndCpuTime()
metrics.foreach(m => assert(m.appDuration.get == 0L))
}
}

0 comments on commit c94115c

Please sign in to comment.