diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Analysis.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Analysis.scala index 62e63d3d2..f5acfa4a0 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Analysis.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Analysis.scala @@ -388,11 +388,12 @@ class Analysis(apps: Seq[ApplicationInfo]) { val allRows = apps.flatMap { app => app.sqlIdToInfo.map { case (sqlId, sqlCase) => SQLDurationExecutorTimeProfileResult(app.index, app.appId, sqlId, sqlCase.duration, - sqlCase.hasDatasetOrRDD, app.appInfo.duration, sqlCase.problematic, + sqlCase.hasDatasetOrRDD, + Option(app.appInfo).flatMap(_.duration).orElse(Option(0L)), + sqlCase.problematic, sqlCase.sqlCpuTimePercent) } } - if (allRows.size > 0) { val sortedRows = allRows.sortBy { cols => val sortDur = cols.duration.getOrElse(0L) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala index 6b577f437..1257435ac 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala @@ -34,13 +34,13 @@ case class StageMetrics(numTasks: Int, duration: String) class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging { def getAppInfo: Seq[AppInfoProfileResults] = { - val allRows = apps.map { app => - val a = app.appInfo + val allRows = apps.collect { + case app if app.appInfo != null => val a = app.appInfo AppInfoProfileResults(app.index, a.appName, a.appId, a.sparkUser, a.startTime, a.endTime, a.duration, a.durationStr, a.sparkVersion, a.pluginEnabled) } - if (allRows.size > 0) { + if (allRows.nonEmpty) { allRows.sortBy(cols => (cols.appIndex)) } else { Seq.empty @@ -48,11 +48,11 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging { } def getAppLogPath: Seq[AppLogPathProfileResults] = { - val allRows = apps.map { app => - val a = app.appInfo + val allRows = apps.collect { + case app if app.appInfo != null => val a = app.appInfo AppLogPathProfileResults(app.index, a.appName, a.appId, app.eventLogPath) } - if (allRows.size > 0) { + if (allRows.nonEmpty) { allRows.sortBy(cols => (cols.appIndex)) } else { Seq.empty diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala index 44c87fc32..8af55025f 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala @@ -138,4 +138,17 @@ class AnalysisSuite extends FunSuite { val containsDs = sqlDurAndCpu.filter(_.containsDataset === true) assert(containsDs.size == 1) } + + test("test duration for null appInfo") { + val qualLogDir = ToolTestUtils.getTestResourcePath("spark-events-qualification") + val logs = Array(s"$qualLogDir/dataset_eventlog") + + val apps = ToolTestUtils.processProfileApps(logs, sparkSession) + apps.foreach { app => + app.appInfo = null + } + val analysis = new Analysis(apps) + val metrics = analysis.sqlMetricsAggregationDurationAndCpuTime() + metrics.foreach(m => assert(m.appDuration.get == 0L)) + } }