diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala index 4586cab48..866e1fbbd 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala @@ -22,7 +22,7 @@ import com.nvidia.spark.rapids.tool.tuning.ClusterProperties import org.apache.spark.internal.Logging import org.apache.spark.sql.rapids.tool.{ExistingClusterInfo, RecommendedClusterInfo} -import org.apache.spark.sql.rapids.tool.util.StringUtils +import org.apache.spark.sql.rapids.tool.util.{SparkRuntime, StringUtils} /** * Utility object containing constants for various platform names. @@ -132,6 +132,19 @@ abstract class Platform(var gpuDevice: Option[GpuDevice], var recommendedClusterInfo: Option[RecommendedClusterInfo] = None // the number of GPUs to use, this might be updated as we handle different cases var numGpus: Int = 1 + // Default runtime for the platform + val defaultRuntime: SparkRuntime.SparkRuntime = SparkRuntime.SPARK + // Set of supported runtimes for the platform + protected val supportedRuntimes: Set[SparkRuntime.SparkRuntime] = Set( + SparkRuntime.SPARK, SparkRuntime.SPARK_RAPIDS + ) + + /** + * Checks if the given runtime is supported by the platform. + */ + def isRuntimeSupported(runtime: SparkRuntime.SparkRuntime): Boolean = { + supportedRuntimes.contains(runtime) + } // This function allow us to have one gpu type used by the auto // tuner recommendations but have a different GPU used for speedup @@ -511,6 +524,10 @@ abstract class DatabricksPlatform(gpuDevice: Option[GpuDevice], override val defaultGpuDevice: GpuDevice = T4Gpu override def isPlatformCSP: Boolean = true + override val supportedRuntimes: Set[SparkRuntime.SparkRuntime] = Set( + SparkRuntime.SPARK, SparkRuntime.SPARK_RAPIDS, SparkRuntime.PHOTON + ) + // note that Databricks generally sets the spark.executor.memory for the user. Our // auto tuner heuristics generally sets it lower then Databricks so go ahead and // allow our auto tuner to take affect for this in anticipation that we will use more diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala index 9580aa470..7ca4bbb5b 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala @@ -16,6 +16,7 @@ package com.nvidia.spark.rapids.tool.analysis +import scala.collection.breakOut import scala.collection.mutable.{AbstractSet, ArrayBuffer, HashMap, LinkedHashSet} import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults} @@ -265,7 +266,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap val jobsWithSQL = app.jobIdToInfo.filter { case (_, j) => j.sqlID.nonEmpty } - val sqlToStages = jobsWithSQL.flatMap { case (jobId, j) => + jobsWithSQL.flatMap { case (jobId, j) => val stages = j.stageIds val stagesInJob = app.stageManager.getStagesByIds(stages) stagesInJob.map { sModel => @@ -283,8 +284,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap SQLStageInfoProfileResult(appIndex, j.sqlID.get, jobId, sModel.stageInfo.stageId, sModel.stageInfo.attemptNumber(), sModel.duration, nodeNames) } - } - sqlToStages.toSeq + }(breakOut) } def generateSQLAccums(): Seq[SQLAccumProfileResults] = { @@ -294,20 +294,11 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap val driverAccumsOpt = app.driverAccumMap.get(metric.accumulatorId) val driverMax = driverAccumsOpt match { case Some(accums) => - val filtered = accums.filter { a => - a.sqlID == metric.sqlID - } - val accumValues = filtered.map(_.value).sortWith(_ < _) - if (accumValues.isEmpty) { - None - } else if (accumValues.length <= 1) { - Some(StatisticsMetrics(0L, 0L, 0L, accumValues.sum)) - } else { - Some(StatisticsMetrics(accumValues(0), accumValues(accumValues.size / 2), - accumValues(accumValues.size - 1), accumValues.sum)) - } - case None => - None + StatisticsMetrics.createOptionalFromArr(accums.collect { + case a if a.sqlID == metric.sqlID => + a.value + }(breakOut)) + case _ => None } if (accumTaskStats.isDefined || driverMax.isDefined) { @@ -325,7 +316,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap } else { None } - } + }(breakOut) } /** @@ -341,40 +332,31 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap def generateStageLevelAccums(): Seq[AccumProfileResults] = { app.accumManager.accumInfoMap.flatMap { accumMapEntry => val accumInfo = accumMapEntry._2 - accumInfo.stageValuesMap.keySet.flatMap( stageId => { - val stageTaskIds = app.taskManager.getAllTasksStageAttempt(stageId).map(_.taskId).toSet - // get the task updates that belong to that stage - val taskUpatesSubset = - accumInfo.taskUpdatesMap.filterKeys(stageTaskIds.contains).values.toSeq.sorted - if (taskUpatesSubset.isEmpty) { - None - } else { - val min = taskUpatesSubset.head - val max = taskUpatesSubset.last - val sum = taskUpatesSubset.sum - val median = if (taskUpatesSubset.size % 2 == 0) { - val mid = taskUpatesSubset.size / 2 - (taskUpatesSubset(mid) + taskUpatesSubset(mid - 1)) / 2 - } else { - taskUpatesSubset(taskUpatesSubset.size / 2) - } - // reuse AccumProfileResults to avoid generating extra memory from allocating new objects - val accumProfileResults = AccumProfileResults( - appIndex, - stageId, - accumInfo.infoRef, - min = min, - median = median, - max = max, - total = sum) - if (accumInfo.infoRef.name.isDiagnosticMetrics()) { - updateStageDiagnosticMetrics(accumProfileResults) - } - Some(accumProfileResults) + accumInfo.stageValuesMap.keys.flatMap( stageId => { + val stageTaskIds: Set[Long] = + app.taskManager.getAllTasksStageAttempt(stageId).map(_.taskId)(breakOut) + // Get the task updates that belong to that stage + StatisticsMetrics.createOptionalFromArr( + accumInfo.taskUpdatesMap.filterKeys(stageTaskIds).map(_._2)(breakOut)) match { + case Some(stat) => + // Reuse AccumProfileResults to avoid generating allocating new objects + val accumProfileResults = AccumProfileResults( + appIndex, + stageId, + accumInfo.infoRef, + min = stat.min, + median = stat.med, + max = stat.max, + total = stat.total) + if (accumInfo.infoRef.name.isDiagnosticMetrics()) { + updateStageDiagnosticMetrics(accumProfileResults) + } + Some(accumProfileResults) + case _ => None } }) - } - }.toSeq + }(breakOut) + } } object AppSQLPlanAnalyzer { diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala index 33194644e..6b8c3d5e5 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala @@ -16,17 +16,17 @@ package com.nvidia.spark.rapids.tool.analysis -import java.util.concurrent.TimeUnit - +import scala.collection.breakOut import scala.collection.mutable.{ArrayBuffer, HashMap, LinkedHashMap} import com.nvidia.spark.rapids.tool.analysis.StageAccumDiagnosticMetrics._ +import com.nvidia.spark.rapids.tool.analysis.util.{AggAccumHelper, AggAccumPhotonHelper} import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper -import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticResult} +import com.nvidia.spark.rapids.tool.profiling._ import org.apache.spark.sql.rapids.tool.{AppBase, ToolUtils} import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo -import org.apache.spark.sql.rapids.tool.store.{AccumInfo, AccumMetaRef, AccumNameRef, TaskModel} +import org.apache.spark.sql.rapids.tool.store.{AccumInfo, AccumMetaRef} /** * Does analysis on the DataFrames from object of AppBase. @@ -80,60 +80,54 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { * @return sequence of JobAggTaskMetricsProfileResult that contains only Job Ids */ def aggregateSparkMetricsByJob(index: Int): Seq[JobAggTaskMetricsProfileResult] = { - val jobRows = app.jobIdToInfo.flatMap { case (id, jc) => + app.jobIdToInfo.flatMap { case (id, jc) => if (jc.stageIds.isEmpty) { None } else { - val profResultsInJob = stageLevelSparkMetrics(index).filterKeys(jc.stageIds.contains).values - if (profResultsInJob.isEmpty) { + val jobAggAccumulator = new AggAccumHelper() + val perJobRec = jobAggAccumulator.accumPerJob( + jc.stageIds.collect { + case stageId if stageLevelSparkMetrics(index).contains(stageId) => + stageLevelSparkMetrics(index)(stageId) + }) + if (perJobRec.isEmptyAggregates) { None } else { - // Recalculate the duration sum, max, min, avg for the job based on the cached - // stage Profiling results - val tasksInJob = profResultsInJob.map(_.numTasks).sum - val durSum = profResultsInJob.map(_.durationSum).sum - val durMax = - AppSparkMetricsAnalyzer.maxWithEmptyHandling(profResultsInJob.map(_.durationMax)) - val durMin = - AppSparkMetricsAnalyzer.minWithEmptyHandling(profResultsInJob.map(_.durationMin)) - val durAvg = ToolUtils.calculateAverage(durSum, tasksInJob, 1) Some(JobAggTaskMetricsProfileResult(index, id, - tasksInJob, + perJobRec.numTasks, jc.duration, - profResultsInJob.map(_.diskBytesSpilledSum).sum, - durSum, - durMax, - durMin, - durAvg, - profResultsInJob.map(_.executorCPUTimeSum).sum, - profResultsInJob.map(_.executorDeserializeCpuTimeSum).sum, - profResultsInJob.map(_.executorDeserializeTimeSum).sum, - profResultsInJob.map(_.executorRunTimeSum).sum, - profResultsInJob.map(_.inputBytesReadSum).sum, - profResultsInJob.map(_.inputRecordsReadSum).sum, - profResultsInJob.map(_.jvmGCTimeSum).sum, - profResultsInJob.map(_.memoryBytesSpilledSum).sum, - profResultsInJob.map(_.outputBytesWrittenSum).sum, - profResultsInJob.map(_.outputRecordsWrittenSum).sum, - AppSparkMetricsAnalyzer.maxWithEmptyHandling( - profResultsInJob.map(_.peakExecutionMemoryMax)), - profResultsInJob.map(_.resultSerializationTimeSum).sum, - AppSparkMetricsAnalyzer.maxWithEmptyHandling(profResultsInJob.map(_.resultSizeMax)), - profResultsInJob.map(_.srFetchWaitTimeSum).sum, - profResultsInJob.map(_.srLocalBlocksFetchedSum).sum, - profResultsInJob.map(_.srcLocalBytesReadSum).sum, - profResultsInJob.map(_.srRemoteBlocksFetchSum).sum, - profResultsInJob.map(_.srRemoteBytesReadSum).sum, - profResultsInJob.map(_.srRemoteBytesReadToDiskSum).sum, - profResultsInJob.map(_.srTotalBytesReadSum).sum, - profResultsInJob.map(_.swBytesWrittenSum).sum, - profResultsInJob.map(_.swRecordsWrittenSum).sum, - profResultsInJob.map(_.swWriteTimeSum).sum)) + perJobRec.diskBytesSpilledSum, + perJobRec.durationSum, + perJobRec.durationMax, + perJobRec.durationMin, + perJobRec.durationAvg, + perJobRec.executorCPUTimeSum, + perJobRec.executorDeserializeCpuTimeSum, + perJobRec.executorDeserializeTimeSum, + perJobRec.executorRunTimeSum, + perJobRec.inputBytesReadSum, + perJobRec.inputRecordsReadSum, + perJobRec.jvmGCTimeSum, + perJobRec.memoryBytesSpilledSum, + perJobRec.outputBytesWrittenSum, + perJobRec.outputRecordsWrittenSum, + perJobRec.peakExecutionMemoryMax, + perJobRec.resultSerializationTimeSum, + perJobRec.resultSizeMax, + perJobRec.srFetchWaitTimeSum, + perJobRec.srLocalBlocksFetchedSum, + perJobRec.srLocalBytesReadSum, + perJobRec.srRemoteBlocksFetchSum, + perJobRec.srRemoteBytesReadSum, + perJobRec.srRemoteBytesReadToDiskSum, + perJobRec.srTotalBytesReadSum, + perJobRec.swBytesWrittenSum, + perJobRec.swRecordsWrittenSum, + perJobRec.swWriteTimeSum)) } } - } - jobRows.toSeq + }(breakOut) } private case class AverageStageInfo(avgDuration: Double, avgShuffleReadBytes: Double) @@ -169,7 +163,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { tc.taskId, tc.attempt, tc.duration, avg.avgDuration, tc.sr_totalBytesRead, avg.avgShuffleReadBytes, tc.peakExecutionMemory, tc.successful, tc.endReason) } - }.toSeq + }(breakOut) } /** @@ -178,76 +172,64 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { * @return sequence of SQLTaskAggMetricsProfileResult */ def aggregateSparkMetricsBySql(index: Int): Seq[SQLTaskAggMetricsProfileResult] = { - val sqlRows = app.sqlIdToInfo.flatMap { case (sqlId, sqlCase) => + app.sqlIdToInfo.flatMap { case (sqlId, sqlCase) => if (app.sqlIdToStages.contains(sqlId)) { val stagesInSQL = app.sqlIdToStages(sqlId) // TODO: Should we only consider successful tasks? - val cachedResBySQL = stageLevelSparkMetrics(index).filterKeys(stagesInSQL.contains).values - if (cachedResBySQL.isEmpty) { + val sqlAggAccumulator = new AggAccumHelper() + val preSqlRec = sqlAggAccumulator.accumPerSQL( + stagesInSQL.collect { + case stageId if stageLevelSparkMetrics(index).contains(stageId) => + stageLevelSparkMetrics(index)(stageId) + }) + if (preSqlRec.isEmptyAggregates) { None } else { - // Recalculate the duration sum, max, min, avg for the job based on the cached - // stage Profiling results - val tasksInSql = cachedResBySQL.map(_.numTasks).sum - val durSum = cachedResBySQL.map(_.durationSum).sum - val durMax = - AppSparkMetricsAnalyzer.maxWithEmptyHandling(cachedResBySQL.map(_.durationMax)) - val durMin = - AppSparkMetricsAnalyzer.minWithEmptyHandling(cachedResBySQL.map(_.durationMin)) - val durAvg = ToolUtils.calculateAverage(durSum, tasksInSql, 1) - val diskBytes = cachedResBySQL.map(_.diskBytesSpilledSum).sum - val execCpuTime = cachedResBySQL.map(_.executorCPUTimeSum).sum - val execRunTime = cachedResBySQL.map(_.executorRunTimeSum).sum - val execCPURatio = ToolUtils.calculateDurationPercent(execCpuTime, execRunTime) - val inputBytesRead = cachedResBySQL.map(_.inputBytesReadSum).sum // set this here, so make sure we don't get it again until later - sqlCase.sqlCpuTimePercent = execCPURatio - + sqlCase.sqlCpuTimePercent = preSqlRec.executorCpuRatio Some(SQLTaskAggMetricsProfileResult(index, app.appId, sqlId, sqlCase.description, - tasksInSql, + preSqlRec.numTasks, sqlCase.duration, - execCpuTime, - execRunTime, - execCPURatio, - diskBytes, - durSum, - durMax, - durMin, - durAvg, - execCpuTime, - cachedResBySQL.map(_.executorDeserializeCpuTimeSum).sum, - cachedResBySQL.map(_.executorDeserializeTimeSum).sum, - execRunTime, - inputBytesRead, - inputBytesRead * 1.0 / tasksInSql, - cachedResBySQL.map(_.inputRecordsReadSum).sum, - cachedResBySQL.map(_.jvmGCTimeSum).sum, - cachedResBySQL.map(_.memoryBytesSpilledSum).sum, - cachedResBySQL.map(_.outputBytesWrittenSum).sum, - cachedResBySQL.map(_.outputRecordsWrittenSum).sum, - AppSparkMetricsAnalyzer.maxWithEmptyHandling( - cachedResBySQL.map(_.peakExecutionMemoryMax)), - cachedResBySQL.map(_.resultSerializationTimeSum).sum, - AppSparkMetricsAnalyzer.maxWithEmptyHandling(cachedResBySQL.map(_.resultSizeMax)), - cachedResBySQL.map(_.srFetchWaitTimeSum).sum, - cachedResBySQL.map(_.srLocalBlocksFetchedSum).sum, - cachedResBySQL.map(_.srcLocalBytesReadSum).sum, - cachedResBySQL.map(_.srRemoteBlocksFetchSum).sum, - cachedResBySQL.map(_.srRemoteBytesReadSum).sum, - cachedResBySQL.map(_.srRemoteBytesReadToDiskSum).sum, - cachedResBySQL.map(_.srTotalBytesReadSum).sum, - cachedResBySQL.map(_.swBytesWrittenSum).sum, - cachedResBySQL.map(_.swRecordsWrittenSum).sum, - cachedResBySQL.map(_.swWriteTimeSum).sum)) + preSqlRec.executorCPUTimeSum, + preSqlRec.executorRunTimeSum, + preSqlRec.executorCpuRatio, + preSqlRec.diskBytesSpilledSum, + preSqlRec.durationSum, + preSqlRec.durationMax, + preSqlRec.durationMin, + preSqlRec.durationAvg, + preSqlRec.executorCPUTimeSum, + preSqlRec.executorDeserializeCpuTimeSum, + preSqlRec.executorDeserializeTimeSum, + preSqlRec.executorRunTimeSum, + preSqlRec.inputBytesReadSum, + preSqlRec.inputBytesReadAvg, + preSqlRec.inputRecordsReadSum, + preSqlRec.jvmGCTimeSum, + preSqlRec.memoryBytesSpilledSum, + preSqlRec.outputBytesWrittenSum, + preSqlRec.outputRecordsWrittenSum, + preSqlRec.peakExecutionMemoryMax, + preSqlRec.resultSerializationTimeSum, + preSqlRec.resultSizeMax, + preSqlRec.srFetchWaitTimeSum, + preSqlRec.srLocalBlocksFetchedSum, + preSqlRec.srLocalBytesReadSum, + preSqlRec.srRemoteBlocksFetchSum, + preSqlRec.srRemoteBytesReadSum, + preSqlRec.srRemoteBytesReadToDiskSum, + preSqlRec.srTotalBytesReadSum, + preSqlRec.swBytesWrittenSum, + preSqlRec.swRecordsWrittenSum, + preSqlRec.swWriteTimeSum)) } } else { None } - } - sqlRows.toSeq + }(breakOut) } /** @@ -258,7 +240,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { */ def aggregateIOMetricsBySql( sqlMetricsAggs: Seq[SQLTaskAggMetricsProfileResult]): Seq[IOAnalysisProfileResult] = { - val sqlIORows = sqlMetricsAggs.map { sqlAgg => + sqlMetricsAggs.map { sqlAgg => IOAnalysisProfileResult(sqlAgg.appIndex, app.appId, sqlAgg.sqlId, @@ -270,8 +252,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { sqlAgg.memoryBytesSpilledSum, sqlAgg.srTotalBytesReadSum, sqlAgg.swBytesWrittenSum) - } - sqlIORows + }(breakOut) } /** @@ -306,7 +287,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { * @return a sequence of SQLDurationExecutorTimeProfileResult or Empty if None. */ def aggregateDurationAndCPUTimeBySql(index: Int): Seq[SQLDurationExecutorTimeProfileResult] = { - val sqlRows = app.sqlIdToInfo.map { case (sqlId, sqlCase) => + app.sqlIdToInfo.map { case (sqlId, sqlCase) => // First, build the SQLIssues string by retrieving the potential issues from the // app.sqlIDtoProblematic map. val sqlIssues = if (app.sqlIDtoProblematic.contains(sqlId)) { @@ -318,8 +299,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { SQLDurationExecutorTimeProfileResult(index, app.appId, sqlCase.rootExecutionID, sqlId, sqlCase.duration, sqlCase.hasDatasetOrRDD, app.getAppDuration.orElse(Option(0L)), sqlIssues, sqlCase.sqlCpuTimePercent) - } - sqlRows.toSeq + }(breakOut) } /** @@ -339,8 +319,9 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { app.asInstanceOf[ApplicationInfo].planMetricProcessor } val zeroAccumProfileResults = - AccumProfileResults(0, 0, AccumMetaRef(0L, AccumNameRef("")), 0L, 0L, 0L, 0L) - + AccumProfileResults(0, 0, AccumMetaRef.EMPTY_ACCUM_META_REF, 0L, 0L, 0L, 0L) + val emptyNodeNames = Seq.empty[String] + val emptyDiagnosticMetrics = HashMap.empty[String, AccumProfileResults] // TODO: this has stage attempts. we should handle different attempts app.stageManager.getAllStages.map { sm => // TODO: Should we only consider successful tasks? @@ -348,13 +329,13 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { sm.stageInfo.attemptNumber()) // count duplicate task attempts val numTasks = tasksInStage.size - val nodeNames = sqlAnalyzer.stageToNodeNames. - getOrElse(sm.stageInfo.stageId, Seq.empty[String]) - val diagnosticMetricsMap = sqlAnalyzer.stageToDiagnosticMetrics. - getOrElse(sm.stageInfo.stageId, HashMap.empty[String, AccumProfileResults]). - withDefaultValue(zeroAccumProfileResults) + val nodeNames = sqlAnalyzer.stageToNodeNames.getOrElse(sm.stageInfo.stageId, emptyNodeNames) + val diagnosticMetricsMap = + sqlAnalyzer.stageToDiagnosticMetrics + .getOrElse(sm.stageInfo.stageId, emptyDiagnosticMetrics) + .withDefaultValue(zeroAccumProfileResults) val srTotalBytesMetrics = - AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.sr_totalBytesRead)) + StatisticsMetrics.createFromArr(tasksInStage.map(_.sr_totalBytesRead)(breakOut)) StageDiagnosticResult(index, app.getAppName, @@ -375,7 +356,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { diagnosticMetricsMap(SW_WRITE_TIME_METRIC), diagnosticMetricsMap(GPU_SEMAPHORE_WAIT_METRIC), nodeNames) - }.toSeq + }(breakOut) } /** @@ -417,10 +398,8 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { // TODO: Should we only consider successful tasks? val tasksInStage = app.taskManager.getTasks(sm.stageInfo.stageId, sm.stageInfo.attemptNumber()) - // count duplicate task attempts - val numAttempts = tasksInStage.size - val (peakMemoryMax, shuffleWriteTimeSum) = if (app.isPhoton) { + val accumHelperObj = if (app.isPhoton) { // If this a photon app, use the photonHelper // For max peak memory, we need to look at the accumulators at the task level. val peakMemoryValues = tasksInStage.flatMap { taskModel => photonPeakMemoryAccumInfos.flatMap { accumInfo => @@ -431,98 +410,46 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { val shuffleWriteValues = photonShuffleWriteTimeAccumInfos.flatMap { accumInfo => accumInfo.stageValuesMap.get(sm.stageInfo.stageId) } - (AppSparkMetricsAnalyzer.maxWithEmptyHandling(peakMemoryValues), - TimeUnit.NANOSECONDS.toMillis(shuffleWriteValues.sum)) + new AggAccumPhotonHelper(shuffleWriteValues, peakMemoryValues) } else { // For non-Photon apps, use the task metrics directly. - val peakMemoryValues = tasksInStage.map(_.peakExecutionMemory) - val shuffleWriteTime = tasksInStage.map(_.sw_writeTime) - (AppSparkMetricsAnalyzer.maxWithEmptyHandling(peakMemoryValues), - shuffleWriteTime.sum) + new AggAccumHelper() } - - val (durSum, durMax, durMin, durAvg) = AppSparkMetricsAnalyzer.getDurations(tasksInStage) + val perStageRec = accumHelperObj.accumPerStage(tasksInStage) val stageRow = StageAggTaskMetricsProfileResult(index, sm.stageInfo.stageId, - numAttempts, // TODO: why is this numAttempts and not numTasks? + // numTasks includes duplicate task attempts + perStageRec.numTasks, sm.duration, - tasksInStage.map(_.diskBytesSpilled).sum, - durSum, - durMax, - durMin, - durAvg, - tasksInStage.map(_.executorCPUTime).sum, - tasksInStage.map(_.executorDeserializeCPUTime).sum, - tasksInStage.map(_.executorDeserializeTime).sum, - tasksInStage.map(_.executorRunTime).sum, - tasksInStage.map(_.input_bytesRead).sum, - tasksInStage.map(_.input_recordsRead).sum, - tasksInStage.map(_.jvmGCTime).sum, - tasksInStage.map(_.memoryBytesSpilled).sum, - tasksInStage.map(_.output_bytesWritten).sum, - tasksInStage.map(_.output_recordsWritten).sum, - peakMemoryMax, - tasksInStage.map(_.resultSerializationTime).sum, - AppSparkMetricsAnalyzer.maxWithEmptyHandling(tasksInStage.map(_.resultSize)), - tasksInStage.map(_.sr_fetchWaitTime).sum, - tasksInStage.map(_.sr_localBlocksFetched).sum, - tasksInStage.map(_.sr_localBytesRead).sum, - tasksInStage.map(_.sr_remoteBlocksFetched).sum, - tasksInStage.map(_.sr_remoteBytesRead).sum, - tasksInStage.map(_.sr_remoteBytesReadToDisk).sum, - tasksInStage.map(_.sr_totalBytesRead).sum, - tasksInStage.map(_.sw_bytesWritten).sum, - tasksInStage.map(_.sw_recordsWritten).sum, - shuffleWriteTimeSum - ) + perStageRec.diskBytesSpilledSum, + perStageRec.durationSum, + perStageRec.durationMax, + perStageRec.durationMin, + perStageRec.durationAvg, + perStageRec.executorCPUTimeSum, + perStageRec.executorDeserializeCpuTimeSum, + perStageRec.executorDeserializeTimeSum, + perStageRec.executorRunTimeSum, + perStageRec.inputBytesReadSum, + perStageRec.inputRecordsReadSum, + perStageRec.jvmGCTimeSum, + perStageRec.memoryBytesSpilledSum, + perStageRec.outputBytesWrittenSum, + perStageRec.outputRecordsWrittenSum, + perStageRec.peakExecutionMemoryMax, + perStageRec.resultSerializationTimeSum, + perStageRec.resultSizeMax, + perStageRec.srFetchWaitTimeSum, + perStageRec.srLocalBlocksFetchedSum, + perStageRec.srLocalBytesReadSum, + perStageRec.srRemoteBlocksFetchSum, + perStageRec.srRemoteBytesReadSum, + perStageRec.srRemoteBytesReadToDiskSum, + perStageRec.srTotalBytesReadSum, + perStageRec.swBytesWrittenSum, + perStageRec.swRecordsWrittenSum, + perStageRec.swWriteTimeSum) stageLevelSparkMetrics(index).put(sm.stageInfo.stageId, stageRow) } } } - - -object AppSparkMetricsAnalyzer { - def getDurations(tcs: Iterable[TaskModel]): (Long, Long, Long, Double) = { - val durations = tcs.map(_.duration) - if (durations.nonEmpty) { - (durations.sum, durations.max, durations.min, - ToolUtils.calculateAverage(durations.sum, durations.size, 1)) - } else { - (0L, 0L, 0L, 0.toDouble) - } - } - - /** - * Given an input iterable, returns its min, median, max and sum. - */ - def getStatistics(arr: Iterable[Long]): StatisticsMetrics = { - if (arr.isEmpty) { - StatisticsMetrics(0L, 0L, 0L, 0L) - } else { - val sortedArr = arr.toSeq.sorted - val len = sortedArr.size - val med = if (len % 2 == 0) { - (sortedArr(len / 2) + sortedArr(len / 2 - 1)) / 2 - } else { - sortedArr(len / 2) - } - StatisticsMetrics(sortedArr.head, med, sortedArr(len - 1), sortedArr.sum) - } - } - - def maxWithEmptyHandling(arr: Iterable[Long]): Long = { - if (arr.isEmpty) { - 0L - } else { - arr.max - } - } - - def minWithEmptyHandling(arr: Iterable[Long]): Long = { - if (arr.isEmpty) { - 0L - } else { - arr.min - } - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/StatisticsMetrics.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/StatisticsMetrics.scala index 1b88d2d4c..d0a21a6c0 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/StatisticsMetrics.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/StatisticsMetrics.scala @@ -16,6 +16,8 @@ package com.nvidia.spark.rapids.tool.analysis +import org.apache.spark.sql.rapids.tool.util.InPlaceMedianArrView.{chooseMidpointPivotInPlace, findMedianInPlace} + // Store (min, median, max, total) for a given metric case class StatisticsMetrics(min: Long, med: Long, max: Long, total: Long) @@ -23,4 +25,31 @@ object StatisticsMetrics { // a static variable used to represent zero-statistics instead of allocating a dummy record // on every calculation. val ZERO_RECORD: StatisticsMetrics = StatisticsMetrics(0L, 0L, 0L, 0L) + + def createFromArr(arr: Array[Long]): StatisticsMetrics = { + if (arr.isEmpty) { + return ZERO_RECORD + } + val medV = findMedianInPlace(arr)(chooseMidpointPivotInPlace) + var minV = Long.MaxValue + var maxV = Long.MinValue + var totalV = 0L + arr.foreach { v => + if (v < minV) { + minV = v + } + if (v > maxV) { + maxV = v + } + totalV += v + } + StatisticsMetrics(minV, medV, maxV, totalV) + } + + def createOptionalFromArr(arr: Array[Long]): Option[StatisticsMetrics] = { + if (arr.isEmpty) { + return None + } + Some(createFromArr(arr)) + } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/AggAccumHelper.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/AggAccumHelper.scala new file mode 100644 index 000000000..b42ac08b4 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/AggAccumHelper.scala @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.analysis.util + +import com.nvidia.spark.rapids.tool.profiling.StageAggTaskMetricsProfileResult + +import org.apache.spark.sql.rapids.tool.store.TaskModel + +/** + * A helper class to facilitate the accumulation of aggregate metrics. + * This is a separate class to allow further customization in the future. For example, + * a parellel processor can be used to split the iterables without changing the caller side. + */ +class AggAccumHelper { + + private def accumCachedRecords[R <: TaskMetricsAccumRec]( + stageRecords: Iterable[StageAggTaskMetricsProfileResult], + rec: R): Unit = { + stageRecords.foreach(rec.addRecord) + rec.finalizeAggregation() + } + + protected def createStageAccumRecord(): TaskMetricsAccumRec = { + StageAggAccum() + } + + def accumPerStage(taskRecords: Iterable[TaskModel]): TaskMetricsAccumRec = { + val resRec = createStageAccumRecord() + taskRecords.foreach(resRec.addRecord) + resRec.finalizeAggregation() + resRec + } + + def accumPerSQL(stageRecords: Iterable[StageAggTaskMetricsProfileResult]): SQLAggAccum = { + val resRec = SQLAggAccum() + accumCachedRecords(stageRecords, resRec) + resRec + } + + def accumPerJob(stageRecords: Iterable[StageAggTaskMetricsProfileResult]): JobAggAccum = { + val resRec = JobAggAccum() + accumCachedRecords(stageRecords, resRec) + resRec + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/AggAccumPhotonHelper.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/AggAccumPhotonHelper.scala new file mode 100644 index 000000000..4f1356960 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/AggAccumPhotonHelper.scala @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.analysis.util + +/** + * Implementation of AggAccumHelper for Photon. + * It takes the shuffleWriteValues and peakMemValues Accumulables as an argument because those + * values are not available in the TaskModel. + */ +class AggAccumPhotonHelper( + shuffleWriteValues: Iterable[Long], + peakMemValues: Iterable[Long]) extends AggAccumHelper { + + override def createStageAccumRecord(): TaskMetricsAccumRec = { + StageAggPhoton(shuffleWriteValues, peakMemValues) + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/JobAggAccum.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/JobAggAccum.scala new file mode 100644 index 000000000..a8e5b78db --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/JobAggAccum.scala @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.analysis.util + +import org.apache.spark.sql.rapids.tool.store.TaskModel + +/** + * Accumulator for Job Aggregates. + * This is an optimization to avoid using the Scala collections API on each field for the entire + * number of tasks/stages in a Job. + */ +case class JobAggAccum() extends TaskMetricsAccumRec { + override def addRecord(rec: TaskModel): Unit = { + throw new UnsupportedOperationException( + "Not implemented: JobAggAccum accepts only cached records") + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/SQLAggAccum.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/SQLAggAccum.scala new file mode 100644 index 000000000..b8222679f --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/SQLAggAccum.scala @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.analysis.util + +import org.apache.spark.sql.rapids.tool.ToolUtils +import org.apache.spark.sql.rapids.tool.store.TaskModel + +/** + * Accumulator for SQL Aggregates. + * This is an optimization to avoid using the Scala collections API on each field for the entire + * number of tasks/stages in a SQL. + */ +case class SQLAggAccum( + var executorCpuRatio: Double = 0, + // Not added to the output since it is used only by the AutoTuner + var inputBytesReadAvg: Double = 0) extends TaskMetricsAccumRec { + + override def finalizeAggregation(): Unit = { + super.finalizeAggregation() + executorCpuRatio = ToolUtils.calculateDurationPercent(executorCPUTimeSum, executorRunTimeSum) + inputBytesReadAvg = ToolUtils.calculateAverage(inputBytesReadSum, numTasks, 1) + } + + override def addRecord(rec: TaskModel): Unit = { + throw new UnsupportedOperationException( + "Not implemented: SQLAggAccum accepts only cached records") + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/StageAggAccum.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/StageAggAccum.scala new file mode 100644 index 000000000..c88f1a77d --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/StageAggAccum.scala @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.analysis.util + +import com.nvidia.spark.rapids.tool.profiling.StageAggTaskMetricsProfileResult + +/** + * Accumulator for Stage Aggregates. + * This is an optimization to avoid using the Scala collections API on each field for the entire + * number of tasks in a Stage. + */ +case class StageAggAccum() extends TaskMetricsAccumRec { + override def addRecord(rec: StageAggTaskMetricsProfileResult): Unit = { + throw new UnsupportedOperationException("Not implemented: Cannot use cached results to" + + "calculate stage aggregates") + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/StageAggPhoton.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/StageAggPhoton.scala new file mode 100644 index 000000000..ed7127050 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/StageAggPhoton.scala @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.analysis.util + +import java.util.concurrent.TimeUnit + +import com.nvidia.spark.rapids.tool.profiling.StageAggTaskMetricsProfileResult + +/** + * Implementation of Accumulator object for Photon. + * It takes the shuffleWriteValues and peakMemValues Accumulables as an argument because those + * values are not available in the TaskModel. + */ +case class StageAggPhoton( + shuffleWriteValues: Iterable[Long], + peakMemValues: Iterable[Long]) extends TaskMetricsAccumRec { + + override def addRecord(rec: StageAggTaskMetricsProfileResult): Unit = { + throw new UnsupportedOperationException("Not implemented: Cannot use cached results to" + + "calculate stage aggregates") + } + + override def finalizeAggregation(): Unit = { + // Fix the shuffleWriteTimes and the peakMemoryValues to use the shuffleWriteValues and + // the peakMemValues. + swWriteTimeSum = 0 + peakExecutionMemoryMax = 0 + if (!isEmptyAggregates) { + // Re-calculate the photon specific fields only if the accumulator has tasks. + // Otherwise, leave it as 0. + if (shuffleWriteValues.nonEmpty) { + swWriteTimeSum = TimeUnit.NANOSECONDS.toMillis(shuffleWriteValues.sum) + } + if (peakMemValues.nonEmpty) { + peakExecutionMemoryMax = peakMemValues.max + } + } + super.finalizeAggregation() + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/TaskMetricsAccumRec.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/TaskMetricsAccumRec.scala new file mode 100644 index 000000000..b5d98b9ac --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/TaskMetricsAccumRec.scala @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.analysis.util + +import com.nvidia.spark.rapids.tool.profiling.StageAggTaskMetricsProfileResult + +import org.apache.spark.sql.rapids.tool.ToolUtils +import org.apache.spark.sql.rapids.tool.store.TaskModel + +/** + * Accumulator used for task metrics. + * This is an optimization decision to avoid using Scala builtin collections on every field in the + * taskModel. + */ +class TaskMetricsAccumRec { + var numTasks: Int = 0 + var diskBytesSpilledSum: Long = 0 + var durationSum: Long = 0 + var durationMax: Long = Long.MinValue + var durationMin: Long = Long.MaxValue + var durationAvg: Double = 0.0 + var executorCPUTimeSum: Long = 0 + var executorDeserializeCpuTimeSum: Long = 0 + var executorDeserializeTimeSum: Long = 0 + var executorRunTimeSum: Long = 0 + var inputBytesReadSum: Long = 0 + var inputRecordsReadSum: Long = 0 + var jvmGCTimeSum: Long = 0 + var memoryBytesSpilledSum: Long = 0 + var outputBytesWrittenSum: Long = 0 + var outputRecordsWrittenSum: Long = 0 + var peakExecutionMemoryMax: Long = Long.MinValue + var resultSerializationTimeSum: Long = 0 + var resultSizeMax: Long = Long.MinValue + var srFetchWaitTimeSum: Long = 0 + var srLocalBlocksFetchedSum: Long = 0 + var srLocalBytesReadSum: Long = 0 + var srRemoteBlocksFetchSum: Long = 0 + var srRemoteBytesReadSum: Long = 0 + var srRemoteBytesReadToDiskSum: Long = 0 + var srTotalBytesReadSum: Long = 0 + var swBytesWrittenSum: Long = 0 + var swRecordsWrittenSum: Long = 0 + var swWriteTimeSum: Long = 0 + + /** + * Assumption that 0-tasks implies no aggregations on metrics. This means that metrics on + * job/SQL levels won't be accumulated as long as no tasks are accounted for. + */ + def isEmptyAggregates: Boolean = numTasks == 0 + + /** + * Reset all fields to 0. This is used to reset the fields when the Task iterator is empty. + * When the iterator is empty, then fields such as "max" should be reset to 0. + */ + def resetFields(): Unit = { + durationMax = 0 + durationMin = 0 + peakExecutionMemoryMax = 0 + resultSizeMax = 0 + } + + def addRecord(rec: TaskModel): Unit = { + numTasks += 1 + // SumFields + diskBytesSpilledSum += rec.diskBytesSpilled + durationSum += rec.duration + executorCPUTimeSum += rec.executorCPUTime + executorDeserializeCpuTimeSum += rec.executorDeserializeCPUTime + executorDeserializeTimeSum += rec.executorDeserializeTime + executorRunTimeSum += rec.executorRunTime + inputBytesReadSum += rec.input_bytesRead + inputRecordsReadSum += rec.input_recordsRead + jvmGCTimeSum += rec.jvmGCTime + memoryBytesSpilledSum += rec.memoryBytesSpilled + outputBytesWrittenSum += rec.output_bytesWritten + outputRecordsWrittenSum += rec.output_recordsWritten + resultSerializationTimeSum += rec.resultSerializationTime + srFetchWaitTimeSum += rec.sr_fetchWaitTime + srLocalBlocksFetchedSum += rec.sr_localBlocksFetched + srLocalBytesReadSum += rec.sr_localBytesRead + srRemoteBlocksFetchSum += rec.sr_remoteBlocksFetched + srRemoteBytesReadSum += rec.sr_remoteBytesRead + srRemoteBytesReadToDiskSum += rec.sr_remoteBytesReadToDisk + srTotalBytesReadSum += rec.sr_totalBytesRead + swBytesWrittenSum += rec.sw_bytesWritten + swRecordsWrittenSum += rec.sw_recordsWritten + swWriteTimeSum += rec.sw_writeTime + // Max fields + durationMax = math.max(durationMax, rec.duration) + peakExecutionMemoryMax = math.max(peakExecutionMemoryMax, rec.peakExecutionMemory) + resultSizeMax = math.max(resultSizeMax, rec.resultSize) + // Min Fields + durationMin = math.min(durationMin, rec.duration) + } + + def addRecord(rec: StageAggTaskMetricsProfileResult): Unit = { + // Sums + numTasks += rec.numTasks + durationSum += rec.durationSum + diskBytesSpilledSum += rec.diskBytesSpilledSum + executorCPUTimeSum += rec.executorCPUTimeSum + executorRunTimeSum += rec.executorRunTimeSum + inputBytesReadSum += rec.inputBytesReadSum + executorDeserializeCpuTimeSum += rec.executorDeserializeCpuTimeSum + executorDeserializeTimeSum += rec.executorDeserializeTimeSum + inputRecordsReadSum += rec.inputRecordsReadSum + jvmGCTimeSum += rec.jvmGCTimeSum + memoryBytesSpilledSum += rec.memoryBytesSpilledSum + outputBytesWrittenSum += rec.outputBytesWrittenSum + outputRecordsWrittenSum += rec.outputRecordsWrittenSum + resultSerializationTimeSum += rec.resultSerializationTimeSum + srFetchWaitTimeSum += rec.srFetchWaitTimeSum + srLocalBlocksFetchedSum += rec.srLocalBlocksFetchedSum + srLocalBytesReadSum += rec.srcLocalBytesReadSum + srRemoteBlocksFetchSum += rec.srRemoteBlocksFetchSum + srRemoteBytesReadSum += rec.srRemoteBytesReadSum + srRemoteBytesReadToDiskSum += rec.srRemoteBytesReadToDiskSum + srTotalBytesReadSum += rec.srTotalBytesReadSum + swBytesWrittenSum += rec.swBytesWrittenSum + swRecordsWrittenSum += rec.swRecordsWrittenSum + swWriteTimeSum += rec.swWriteTimeSum + // Max + durationMax = math.max(durationMax, rec.durationMax) + peakExecutionMemoryMax = math.max(peakExecutionMemoryMax, rec.peakExecutionMemoryMax) + resultSizeMax = math.max(resultSizeMax, rec.resultSizeMax) + // Min + durationMin = math.min(durationMin, rec.durationMin) + } + + /** + * This method should be called to finalize the accumulations of all the metrics. + * For example, calculating averages and doing any last transformations on a field before the + * results are consumed. + */ + def finalizeAggregation(): Unit = { + durationAvg = ToolUtils.calculateAverage(durationSum, numTasks, 1) + if (numTasks < 1) { + // number of tasks is 0, then we should reset fields such as (max, min) to 0. + resetFields() + } + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala index 457ddbca7..592fa2cbb 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.util.control.NonFatal -import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, EventLogInfo, EventLogPathProcessor, FailedEventLog, PlatformFactory, ToolBase} +import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, EventLogInfo, EventLogPathProcessor, FailedEventLog, Platform, PlatformFactory, ToolBase} import com.nvidia.spark.rapids.tool.tuning.{AutoTuner, ProfilingAutoTunerConfigsProvider} import com.nvidia.spark.rapids.tool.views._ import org.apache.hadoop.conf.Configuration @@ -43,6 +43,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea private val outputCombined: Boolean = appArgs.combined() private val useAutoTuner: Boolean = appArgs.autoTuner() private val outputAlignedSQLIds: Boolean = appArgs.outputSqlIdsAligned() + // Unlike qualification tool, profiler tool does not require platform per app + private val platform: Platform = PlatformFactory.createInstance(appArgs.platform()) override def getNumThreads: Int = appArgs.numThreads.getOrElse( Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt) @@ -295,9 +297,9 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea private def createApp(path: EventLogInfo, index: Int, hadoopConf: Configuration): Either[FailureApp, ApplicationInfo] = { try { - // This apps only contains 1 app in each loop. + // These apps only contains 1 app in each loop. val startTime = System.currentTimeMillis() - val app = new ApplicationInfo(hadoopConf, path, index) + val app = new ApplicationInfo(hadoopConf, path, index, platform) EventLogPathProcessor.logApplicationInfo(app) val endTime = System.currentTimeMillis() if (!app.isAppMetaDefined) { diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala index b90917cd8..971a8711f 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala @@ -23,7 +23,7 @@ import scala.collection.immutable import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, LinkedHashSet, Map} import com.nvidia.spark.rapids.SparkRapidsBuildInfoEvent -import com.nvidia.spark.rapids.tool.{DatabricksEventLog, DatabricksRollingEventLogFilesFileReader, EventLogInfo} +import com.nvidia.spark.rapids.tool.{DatabricksEventLog, DatabricksRollingEventLogFilesFileReader, EventLogInfo, Platform} import com.nvidia.spark.rapids.tool.planparser.{HiveParseHelper, ReadParser} import com.nvidia.spark.rapids.tool.planparser.HiveParseHelper.isHiveTableScanNode import com.nvidia.spark.rapids.tool.profiling.{BlockManagerRemovedCase, DriverAccumCase, JobInfoClass, ResourceProfileInfoCase, SQLExecutionInfoClass, SQLPlanMetricsCase} @@ -42,7 +42,8 @@ import org.apache.spark.util.Utils abstract class AppBase( val eventLogInfo: Option[EventLogInfo], - val hadoopConf: Option[Configuration]) extends Logging + val hadoopConf: Option[Configuration], + val platform: Option[Platform] = None) extends Logging with ClusterTagPropHandler with AccumToStageRetriever { @@ -481,6 +482,7 @@ abstract class AppBase( protected def postCompletion(): Unit = { registerAttemptId() calculateAppDuration() + validateSparkRuntime() } /** @@ -491,6 +493,20 @@ abstract class AppBase( processEventsInternal() postCompletion() } + + /** + * Validates if the spark runtime (parsed from event log) is supported by the platform. + * If the runtime is not supported, an `UnsupportedSparkRuntimeException` + * is thrown. + */ + private def validateSparkRuntime(): Unit = { + val parsedRuntime = getSparkRuntime + platform.foreach { p => + require(p.isRuntimeSupported(parsedRuntime), + throw UnsupportedSparkRuntimeException(p, parsedRuntime) + ) + } + } } object AppBase { diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala index 021337495..455f19147 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.rapids.tool import scala.collection.mutable import scala.util.{Failure, Success, Try} +import com.nvidia.spark.rapids.tool.Platform import com.nvidia.spark.rapids.tool.planparser.SubqueryExecParser import com.nvidia.spark.rapids.tool.profiling.ProfileUtils.replaceDelimiter import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter @@ -28,7 +29,7 @@ import org.apache.spark.internal.{config, Logging} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.execution.SparkPlanInfo import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphNode} -import org.apache.spark.sql.rapids.tool.util.ToolsPlanGraph +import org.apache.spark.sql.rapids.tool.util.{SparkRuntime, ToolsPlanGraph} object ToolUtils extends Logging { // List of recommended file-encodings on the GPUs. @@ -441,6 +442,12 @@ case class UnsupportedMetricNameException(metricName: String) extends AppEventlogProcessException( s"Unsupported metric name found in the event log: $metricName") +case class UnsupportedSparkRuntimeException( + platform: Platform, + sparkRuntime: SparkRuntime.SparkRuntime) + extends AppEventlogProcessException( + s"Platform '${platform.platformName}' does not support the runtime '$sparkRuntime'") + // Class used a container to hold the information of the Tuple // to simplify arguments of methods and caching. case class SqlPlanInfoGraphEntry( diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala index 83a3cbc0b..6fbf2bb68 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.rapids.tool.profiling import scala.collection.Map -import com.nvidia.spark.rapids.tool.EventLogInfo +import com.nvidia.spark.rapids.tool.{EventLogInfo, Platform, PlatformFactory} import com.nvidia.spark.rapids.tool.analysis.AppSQLPlanAnalyzer import org.apache.hadoop.conf.Configuration @@ -184,8 +184,9 @@ object SparkPlanInfoWithStage { class ApplicationInfo( hadoopConf: Configuration, eLogInfo: EventLogInfo, - val index: Int) - extends AppBase(Some(eLogInfo), Some(hadoopConf)) with Logging { + val index: Int, + platform: Platform = PlatformFactory.createInstance()) + extends AppBase(Some(eLogInfo), Some(hadoopConf), Some(platform)) with Logging { private lazy val eventProcessor = new EventsProcessor(this) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index 1ef8b7315..e3c33203f 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -41,7 +41,7 @@ class QualificationAppInfo( mlOpsEnabled: Boolean = false, penalizeTransitions: Boolean = true, platform: Platform) - extends AppBase(eventLogInfo, hadoopConf) with Logging { + extends AppBase(eventLogInfo, hadoopConf, Some(platform)) with Logging { var lastJobEndTime: Option[Long] = None var lastSQLEndTime: Option[Long] = None diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumInfo.scala index 0f8e520c6..080a34df3 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumInfo.scala @@ -16,7 +16,7 @@ package org.apache.spark.sql.rapids.tool.store -import scala.collection.mutable +import scala.collection.{breakOut, mutable} import com.nvidia.spark.rapids.tool.analysis.StatisticsMetrics @@ -98,22 +98,8 @@ class AccumInfo(val infoRef: AccumMetaRef) { } def calculateAccStats(): StatisticsMetrics = { - val sortedTaskUpdates = taskUpdatesMap.values.toSeq.sorted - if (sortedTaskUpdates.isEmpty) { - // do not check stage values because the stats is only meant for task updates - StatisticsMetrics.ZERO_RECORD - } else { - val min = sortedTaskUpdates.head - val max = sortedTaskUpdates.last - val sum = sortedTaskUpdates.sum - val median = if (sortedTaskUpdates.size % 2 == 0) { - val mid = sortedTaskUpdates.size / 2 - (sortedTaskUpdates(mid) + sortedTaskUpdates(mid - 1)) / 2 - } else { - sortedTaskUpdates(sortedTaskUpdates.size / 2) - } - StatisticsMetrics(min, median, max, sum) - } + // do not check stage values because the stats is only meant for task updates + StatisticsMetrics.createFromArr(taskUpdatesMap.map(_._2)(breakOut)) } def getMaxStageValue: Option[Long] = { diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumMetaRef.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumMetaRef.scala index 7b70bedb2..35c9c19e1 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumMetaRef.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumMetaRef.scala @@ -27,6 +27,7 @@ case class AccumMetaRef(id: Long, name: AccumNameRef) { } object AccumMetaRef { + val EMPTY_ACCUM_META_REF: AccumMetaRef = new AccumMetaRef(0L, AccumNameRef.EMPTY_ACC_NAME_REF) def apply(id: Long, name: Option[String]): AccumMetaRef = new AccumMetaRef(id, AccumNameRef.getOrCreateAccumNameRef(name)) } diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumNameRef.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumNameRef.scala index 0172f5229..4ce41e4a5 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumNameRef.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumNameRef.scala @@ -42,7 +42,7 @@ case class AccumNameRef(value: String) { object AccumNameRef { // Dummy AccNameRef to represent None accumulator names. This is an optimization to avoid // storing an option[string] for all accumulable names which leads to "get-or-else" everywhere. - private val EMPTY_ACC_NAME_REF: AccumNameRef = new AccumNameRef("N/A") + val EMPTY_ACC_NAME_REF: AccumNameRef = new AccumNameRef("N/A") // A global table to store reference to all accumulator names. The map is accessible by all // threads (different applications) running in parallel. This avoids duplicate work across // different threads. diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/InPlaceMedianArrView.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/InPlaceMedianArrView.scala new file mode 100644 index 000000000..1be48a6a7 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/InPlaceMedianArrView.scala @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.rapids.tool.util + +import scala.annotation.tailrec +import scala.language.postfixOps + +/** + * Allows for in-place partitioning and finding the median. + * The tools used to find the median of a sequence by sorting the entire sequence, then returning + * the elements in the middle. As we started to capture all the accumulators in Spark plans, + * sorting is inefficient for large eventlogs that contain huge number of tasks and + * Accumulables. Thus, this class is an optimized version to get the median in a linear + * complexity while doing it in place to avoid allocating new array to store the sorted elements. + * The code is copied from a Stackoverflow thread: + * https://stackoverflow.com/questions/4662292/scala-median-implementation + * + * Notes: + * - The implementation assumes that the array is not empty. + */ +case class InPlaceMedianArrView(arr: Array[Long], from: Int, until: Int) { + def apply(n: Int): Long = { + if (from + n < until) { + arr(from + n) + } else { + throw new ArrayIndexOutOfBoundsException(n) + } + } + + /** + * Returns a new view of the array with the same elements but a different range. + * @param p a predicate to apply on the elements to proceed with the partitioning. + * @return a tuple of 2 views, the first one contains the elements that satisfy the predicate, + * and the second one contains the rest. + */ + def partitionInPlace(p: Long => Boolean): (InPlaceMedianArrView, InPlaceMedianArrView) = { + var upper = until - 1 + var lower = from + while (lower < upper) { + while (lower < until && p(arr(lower))) lower += 1 + while (upper >= from && !p(arr(upper))) upper -= 1 + if (lower < upper) { val tmp = arr(lower); arr(lower) = arr(upper); arr(upper) = tmp } + } + (copy(until = lower), copy(from = lower)) + } + + def size: Int = { + until - from + } + + def isEmpty: Boolean = { + size <= 0 + } + + override def toString = { + arr mkString ("ArraySize(", ", ", ")") + } +} + +/** + * Companion object for InPlaceMedianArrView. + */ +object InPlaceMedianArrView { + + def apply(arr: Array[Long]): InPlaceMedianArrView = { + InPlaceMedianArrView(arr, 0, arr.size) + } + + /** + * Finds the median of the array in place. + * @param arr the Array[Long] to be processed + * @param k the index of the median + * @param choosePivot a function to choose the pivot index. This useful to choose different + * strategies. For example, choosing the midpoint works better for sorted + * arrays. + * @return the median of the array. + */ + @tailrec + def findKMedianInPlace(arr: InPlaceMedianArrView, k: Int) + (implicit choosePivot: InPlaceMedianArrView => Long): Long = { + val a = choosePivot(arr) + val (s, b) = arr partitionInPlace (a >) + if (s.size == k) { + a + } else if (s.isEmpty) { + val (s, b) = arr partitionInPlace (a ==) + if (s.size > k) { + a + } else { + findKMedianInPlace(b, k - s.size) + } + } else if (s.size < k) { + findKMedianInPlace(b, k - s.size) + } else { + findKMedianInPlace(s, k) + } + } + + /** + * Choose a random pivot in the array. This can lead to worst case for sorted arrays. + * @param arr the array to choose the pivot from. + * @return a random element from the array. + */ + def chooseRandomPivotInPlace(arr: InPlaceMedianArrView): Long = { + arr(scala.util.Random.nextInt(arr.size)) + } + + /** + * Choose the element in the middle as a pivot. This works better to find median of sorted arrays. + * @param arr the array to choose the pivot from. + * @return the element in the middle of the array. + */ + def chooseMidpointPivotInPlace(arr: InPlaceMedianArrView): Long = { + arr((arr.size - 1) / 2) + } + + /** + * Finds the median of the array in place. + * @param arr the Array[Long] to be processed. + * @param choosePivot a function to choose the pivot index. + * @return the median of the array. + */ + def findMedianInPlace( + arr: Array[Long])(implicit choosePivot: InPlaceMedianArrView => Long): Long = { + val midIndex = (arr.size - 1) / 2 + if (arr.size % 2 == 0) { + // For even-length arrays, find the two middle elements and compute their average + val mid1 = findKMedianInPlace(InPlaceMedianArrView(arr), midIndex) + val mid2 = findKMedianInPlace(InPlaceMedianArrView(arr), midIndex + 1) + (mid1 + mid2) / 2 + } else { + // For odd-length arrays, return the middle element + findKMedianInPlace(InPlaceMedianArrView(arr), midIndex) + } + } +} diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/ToolTestUtils.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/ToolTestUtils.scala index bd5e7bf25..011e5010e 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/ToolTestUtils.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/ToolTestUtils.scala @@ -144,12 +144,13 @@ object ToolTestUtils extends Logging { val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = new ProfileArgs(logs) var index: Int = 1 + val platform = PlatformFactory.createInstance(appArgs.platform()) for (path <- appArgs.eventlog()) { val eventLogInfo = EventLogPathProcessor .getEventLogInfo(path, RapidsToolsConfUtil.newHadoopConf()) - assert(eventLogInfo.size >= 1, s"event log not parsed as expected $path") + assert(eventLogInfo.nonEmpty, s"event log not parsed as expected $path") apps += new ApplicationInfo(RapidsToolsConfUtil.newHadoopConf(), - eventLogInfo.head._1, index) + eventLogInfo.head._1, index, platform) index += 1 } apps diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/BasePlanParserSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/BasePlanParserSuite.scala index b7966d4d2..6fe3cd2cd 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/BasePlanParserSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/BasePlanParserSuite.scala @@ -17,7 +17,7 @@ package com.nvidia.spark.rapids.tool.planparser import com.nvidia.spark.rapids.BaseTestSuite -import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, PlatformFactory, ToolTestUtils} +import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, PlatformFactory, PlatformNames, ToolTestUtils} import com.nvidia.spark.rapids.tool.qualification._ import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo @@ -59,7 +59,8 @@ class BasePlanParserSuite extends BaseTestSuite { } } - def createAppFromEventlog(eventLog: String): QualificationAppInfo = { + def createAppFromEventlog(eventLog: String, + platformName: String = PlatformNames.DEFAULT): QualificationAppInfo = { val hadoopConf = RapidsToolsConfUtil.newHadoopConf() val (_, allEventLogs) = EventLogPathProcessor.processAllPaths( None, None, List(eventLog), hadoopConf) @@ -67,7 +68,7 @@ class BasePlanParserSuite extends BaseTestSuite { assert(allEventLogs.size == 1) val appResult = QualificationAppInfo.createApp(allEventLogs.head, hadoopConf, pluginTypeChecker, reportSqlLevel = false, mlOpsEnabled = false, penalizeTransitions = true, - PlatformFactory.createInstance()) + PlatformFactory.createInstance(platformName)) appResult match { case Right(app) => app case Left(_) => throw new AssertionError("Cannot create application") diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/PhotonPlanParserSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/PhotonPlanParserSuite.scala index edf8095bc..74f237178 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/PhotonPlanParserSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/PhotonPlanParserSuite.scala @@ -16,6 +16,7 @@ package com.nvidia.spark.rapids.tool.planparser +import com.nvidia.spark.rapids.tool.PlatformNames import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker @@ -34,7 +35,7 @@ class PhotonPlanParserSuite extends BasePlanParserSuite { test(s"$photonName is parsed as Spark $sparkName") { val eventLog = s"$qualLogDir/nds_q88_photon_db_13_3.zstd" val pluginTypeChecker = new PluginTypeChecker() - val app = createAppFromEventlog(eventLog) + val app = createAppFromEventlog(eventLog, platformName = PlatformNames.DATABRICKS_AWS) assert(app.sqlPlans.nonEmpty) val parsedPlans = app.sqlPlans.map { case (sqlID, plan) => SQLPlanParser.parseSQLPlan(app.appId, plan, sqlID, "", pluginTypeChecker, app) diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala index 2b8c3bf12..b7d8b315f 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids.tool.profiling import java.io.File -import com.nvidia.spark.rapids.tool.ToolTestUtils +import com.nvidia.spark.rapids.tool.{PlatformNames, ToolTestUtils} import com.nvidia.spark.rapids.tool.views.{ProfDataSourceView, RawMetricProfilerView} import org.scalatest.FunSuite @@ -139,7 +139,8 @@ class AnalysisSuite extends FunSuite { s"${fileName}_${metric}_metrics_agg_expectation.csv" } testSqlMetricsAggregation(Array(s"${qualLogDir}/${fileName}.zstd"), - expectFile("sql"), expectFile("job"), expectFile("stage")) + expectFile("sql"), expectFile("job"), expectFile("stage"), + platformName = PlatformNames.DATABRICKS_AWS) } test("test stage-level diagnostic aggregation simple") { @@ -163,8 +164,10 @@ class AnalysisSuite extends FunSuite { } private def testSqlMetricsAggregation(logs: Array[String], expectFileSQL: String, - expectFileJob: String, expectFileStage: String): Unit = { - val apps = ToolTestUtils.processProfileApps(logs, sparkSession) + expectFileJob: String, expectFileStage: String, + platformName: String = PlatformNames.DEFAULT): Unit = { + val args = Array("--platform", platformName) ++ logs + val apps = ToolTestUtils.processProfileApps(args, sparkSession) assert(apps.size == logs.size) val aggResults = RawMetricProfilerView.getAggMetrics(apps) import sparkSession.implicits._ @@ -256,9 +259,12 @@ class AnalysisSuite extends FunSuite { } test("test photon scan metrics") { - val fileName = "nds_q88_photon_db_13_3" - val logs = Array(s"${qualLogDir}/${fileName}.zstd") - val apps = ToolTestUtils.processProfileApps(logs, sparkSession) + val args = Array( + "--platform", + PlatformNames.DATABRICKS_AWS, + s"$qualLogDir/nds_q88_photon_db_13_3.zstd" + ) + val apps = ToolTestUtils.processProfileApps(args, sparkSession) val dataSourceResults = ProfDataSourceView.getRawView(apps) assert(dataSourceResults.exists(_.scan_time > 0)) } diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala index 7ff03a943..1d40472c9 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala @@ -22,7 +22,7 @@ import java.nio.file.{Files, Paths, StandardOpenOption} import scala.collection.mutable.ArrayBuffer -import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, StatusReportCounts, ToolTestUtils} +import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, PlatformNames, StatusReportCounts, ToolTestUtils} import com.nvidia.spark.rapids.tool.views.RawMetricProfilerView import org.apache.hadoop.io.IOUtils import org.scalatest.FunSuite @@ -30,6 +30,7 @@ import org.scalatest.FunSuite import org.apache.spark.internal.Logging import org.apache.spark.resource.ResourceProfile import org.apache.spark.sql.{SparkSession, TrampolineUtil} +import org.apache.spark.sql.rapids.tool.UnsupportedSparkRuntimeException import org.apache.spark.sql.rapids.tool.profiling._ import org.apache.spark.sql.rapids.tool.util.{FSUtils, SparkRuntime} @@ -1116,17 +1117,56 @@ class ApplicationInfoSuite extends FunSuite with Logging { } } - val sparkRuntimeTestCases: Seq[(SparkRuntime.Value, String)] = Seq( - SparkRuntime.SPARK -> s"$qualLogDir/nds_q86_test", - SparkRuntime.SPARK_RAPIDS -> s"$logDir/nds_q66_gpu.zstd", - SparkRuntime.PHOTON -> s"$qualLogDir/nds_q88_photon_db_13_3.zstd" + // scalastyle:off line.size.limit + val supportedSparkRuntimeTestCases: Map[String, Seq[(String, SparkRuntime.SparkRuntime)]] = Map( + // tests for standard Spark runtime + s"$qualLogDir/nds_q86_test" -> Seq( + (PlatformNames.DATABRICKS_AWS, SparkRuntime.SPARK), // Expected: SPARK on Databricks AWS + (PlatformNames.ONPREM, SparkRuntime.SPARK) // Expected: SPARK on Onprem + ), + // tests for Spark Rapids runtime + s"$logDir/nds_q66_gpu.zstd" -> Seq( + (PlatformNames.DATABRICKS_AWS, SparkRuntime.SPARK_RAPIDS), // Expected: SPARK_RAPIDS on Databricks AWS + (PlatformNames.ONPREM, SparkRuntime.SPARK_RAPIDS) // Expected: SPARK_RAPIDS on Onprem + ), + // tests for Photon runtime with fallback to SPARK for unsupported platforms + s"$qualLogDir/nds_q88_photon_db_13_3.zstd" -> Seq( + (PlatformNames.DATABRICKS_AWS, SparkRuntime.PHOTON), // Expected: PHOTON on Databricks AWS + (PlatformNames.DATABRICKS_AZURE, SparkRuntime.PHOTON) // Expected: PHOTON on Databricks Azure + ) ) + // scalastyle:on line.size.limit + + supportedSparkRuntimeTestCases.foreach { case (logPath, platformRuntimeCases) => + val baseFileName = logPath.split("/").last + platformRuntimeCases.foreach { case (platform, expectedRuntime) => + test(s"test eventlog $baseFileName on $platform has supported runtime: $expectedRuntime") { + val args = Array("--platform", platform, logPath) + val apps = ToolTestUtils.processProfileApps(args, sparkSession) + assert(apps.size == 1) + assert(apps.head.getSparkRuntime == expectedRuntime) + } + } + } - sparkRuntimeTestCases.foreach { case (expectedSparkRuntime, eventLog) => - test(s"test spark runtime property for ${expectedSparkRuntime.toString} eventlog") { - val apps = ToolTestUtils.processProfileApps(Array(eventLog), sparkSession) - assert(apps.size == 1) - assert(apps.head.getSparkRuntime == expectedSparkRuntime) + // scalastyle:off line.size.limit + val unsupportedSparkRuntimeTestCases: Map[String, Seq[String]] = Map( + s"$qualLogDir/nds_q88_photon_db_13_3.zstd" -> Seq( + PlatformNames.ONPREM, // Expected: PHOTON runtime on Onprem is not supported + PlatformNames.DATAPROC // Expected: PHOTON runtime on Dataproc is not supported + ) + ) + // scalastyle:on line.size.limit + + unsupportedSparkRuntimeTestCases.foreach { case (logPath, platformNames) => + val baseFileName = logPath.split("/").last + platformNames.foreach { platform => + test(s"test eventlog $baseFileName on $platform has unsupported runtime") { + val args = Array("--platform", platform, logPath) + intercept[UnsupportedSparkRuntimeException] { + ToolTestUtils.processProfileApps(args, sparkSession) + } + } } } } diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala index 03943d463..6de463db1 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala @@ -136,12 +136,15 @@ class QualificationSuite extends BaseTestSuite { } } - private def runQualificationTest(eventLogs: Array[String], expectFileName: String = "", + private def runQualificationTest(eventLogs: Array[String], + expectFileName: String = "", platformName: String = PlatformNames.DEFAULT, shouldReturnEmpty: Boolean = false, expectPerSqlFileName: Option[String] = None, expectedStatus: Option[StatusReportCounts] = None): Unit = { TrampolineUtil.withTempDir { outpath => val qualOutputPrefix = "rapids_4_spark_qualification_output" val outputArgs = Array( + "--platform", + platformName, "--output-directory", outpath.getAbsolutePath()) @@ -1762,7 +1765,8 @@ class QualificationSuite extends BaseTestSuite { val logFiles = Array(s"$logDir/nds_q88_photon_db_13_3.zstd") // photon event log // Status counts: 1 SUCCESS, 0 FAILURE, 0 SKIPPED, 0 UNKNOWN val expectedStatus = Some(StatusReportCounts(1, 0, 0, 0)) - runQualificationTest(logFiles, expectedStatus = expectedStatus) + runQualificationTest(logFiles, platformName = PlatformNames.DATABRICKS_AWS, + expectedStatus = expectedStatus) } test("process multiple attempts of the same app ID and skip lower attempts") { diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/util/ToolUtilsSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/util/ToolUtilsSuite.scala index baba6eb79..5e1b6558b 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/util/ToolUtilsSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/util/ToolUtilsSuite.scala @@ -24,13 +24,13 @@ import scala.concurrent.duration._ import scala.xml.XML import com.nvidia.spark.rapids.tool.profiling.{ProfileOutputWriter, ProfileResult} +import org.scalatest.AppendedClues.convertToClueful import org.scalatest.FunSuite import org.scalatest.Matchers.{contain, convertToAnyShouldWrapper, equal, not} import org.apache.spark.internal.Logging import org.apache.spark.sql.TrampolineUtil -import org.apache.spark.sql.rapids.tool.util.{FSUtils, RapidsToolsConfUtil, StringUtils, WebCrawlerUtil} - +import org.apache.spark.sql.rapids.tool.util.{FSUtils, InPlaceMedianArrView, RapidsToolsConfUtil, StringUtils, WebCrawlerUtil} class ToolUtilsSuite extends FunSuite with Logging { test("get page links of a url") { @@ -210,6 +210,27 @@ class ToolUtilsSuite extends FunSuite with Logging { } } + test("Finding median of arrays") { + val testSet: Map[String, (Array[Long], Long)] = Map( + "All same values" -> (Array[Long](5, 5, 5, 5) -> 5L), + "Odd number of values [9, 7, 5, 3, 1]" -> (Array[Long](9, 7, 5, 3, 1) -> 5L), + "Even number of values [11, 9, 7, 5, 3, 1]" -> (Array[Long](11, 9, 7, 5, 3, 1) -> 6), + "Even number of values(2) [15, 13, 11, 9, 7, 5, 3, 1]" -> + (Array[Long](15, 13, 11, 9, 7, 5, 3, 1) -> 8), + "Even number of values(3) [3, 13, 11, 9, 7, 5, 15, 1]" -> + (Array[Long](3, 13, 11, 9, 7, 5, 15, 1) -> 8), + "Single element" -> (Array[Long](1) -> 1), + "Two elements" -> (Array[Long](1, 2).reverse -> 1) + ) + for ((desc, (arr, expectedMedian)) <- testSet) { + val actualMedian = + InPlaceMedianArrView.findMedianInPlace(arr)(InPlaceMedianArrView.chooseMidpointPivotInPlace) + actualMedian shouldBe expectedMedian withClue s"Failed for $desc. " + + s"Expected: $expectedMedian, " + + s"Actual: $actualMedian" + } + } + case class MockProfileResults(appID: String, appIndex: Int, nonEnglishField: String, parentIDs: String) extends ProfileResult { override val outputHeaders: Seq[String] = Seq("appID", "appIndex", "nonEnglishField", diff --git a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py index 0d46e5025..e50659b46 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py +++ b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py @@ -35,9 +35,10 @@ class ToolsCLI(object): # pylint: disable=too-few-public-methods """ def qualification(self, + *, # force named arguments + platform: str, eventlogs: str = None, cluster: str = None, - platform: str = None, output_folder: str = None, filter_apps: str = None, custom_model_file: str = None, @@ -55,6 +56,8 @@ def qualification(self, The cmd will process each app individually, but will group apps with the same name into the same output row after averaging duration metrics accordingly. + :param platform: Defines one of the following: "onprem", "emr", "dataproc", "dataproc-gke", + "databricks-aws", and "databricks-azure". :param eventlogs: Event log filenames or CSP storage directories containing event logs (comma separated). @@ -62,8 +65,6 @@ def qualification(self, cluster name on the CSP. :param cluster: The CPU cluster on which the Spark application(s) were executed. Name or ID (for databricks platforms) of cluster or path to cluster-properties. - :param platform: Defines one of the following: "onprem", "emr", "dataproc", "dataproc-gke", - "databricks-aws", and "databricks-azure". :param output_folder: Local path to store the output. :param tools_jar: Path to a bundled jar including Rapids tool. The path is a local filesystem, or remote cloud storage url. If missing, the wrapper downloads the latest rapids-4-spark-tools_*.jar @@ -89,8 +90,8 @@ def qualification(self, For more details on Qualification tool options, please visit https://docs.nvidia.com/spark-rapids/user-guide/latest/qualification/jar-usage.html#running-the-qualification-tool-standalone-on-spark-event-logs """ - eventlogs = Utils.get_value_or_pop(eventlogs, rapids_options, 'e') platform = Utils.get_value_or_pop(platform, rapids_options, 'p') + eventlogs = Utils.get_value_or_pop(eventlogs, rapids_options, 'e') tools_jar = Utils.get_value_or_pop(tools_jar, rapids_options, 't') output_folder = Utils.get_value_or_pop(output_folder, rapids_options, 'o') filter_apps = Utils.get_value_or_pop(filter_apps, rapids_options, 'f') @@ -108,9 +109,9 @@ def qualification(self, if estimation_model_args is None: return None qual_args = AbsToolUserArgModel.create_tool_args('qualification', + platform=platform, eventlogs=eventlogs, cluster=cluster, - platform=platform, output_folder=output_folder, tools_jar=tools_jar, jvm_heap_size=jvm_heap_size, @@ -127,9 +128,10 @@ def qualification(self, return None def profiling(self, + *, # force named arguments + platform: str, eventlogs: str = None, cluster: str = None, - platform: str = None, driverlog: str = None, output_folder: str = None, tools_jar: str = None, @@ -146,14 +148,14 @@ def profiling(self, The tool also will recommend setting for the application assuming that the job will be able to use all the cluster resources (CPU and GPU) when it is running. + :param platform: defines one of the following "onprem", "emr", "dataproc", "databricks-aws", + and "databricks-azure". :param eventlogs: Event log filenames or cloud storage directories containing event logs (comma separated). If missing, the wrapper reads the Spark's property `spark.eventLog.dir` defined in the `cluster`. :param cluster: The cluster on which the Spark applications were executed. The argument can be a cluster name or ID (for databricks platforms) or a valid path to the cluster's properties file (json format) generated by the CSP SDK. - :param platform: defines one of the following "onprem", "emr", "dataproc", "databricks-aws", - and "databricks-azure". :param driverlog: Valid path to the GPU driver log file. :param output_folder: path to store the output. :param tools_jar: Path to a bundled jar including Rapids tool. The path is a local filesystem, @@ -173,9 +175,9 @@ def profiling(self, For more details on Profiling tool options, please visit https://docs.nvidia.com/spark-rapids/user-guide/latest/profiling/jar-usage.html#prof-tool-title-options """ + platform = Utils.get_value_or_pop(platform, rapids_options, 'p') eventlogs = Utils.get_value_or_pop(eventlogs, rapids_options, 'e') cluster = Utils.get_value_or_pop(cluster, rapids_options, 'c') - platform = Utils.get_value_or_pop(platform, rapids_options, 'p') driverlog = Utils.get_value_or_pop(driverlog, rapids_options, 'd') output_folder = Utils.get_value_or_pop(output_folder, rapids_options, 'o') tools_jar = Utils.get_value_or_pop(tools_jar, rapids_options, 't') @@ -184,9 +186,9 @@ def profiling(self, ToolLogging.enable_debug_mode() init_environment('prof') prof_args = AbsToolUserArgModel.create_tool_args('profiling', + platform=platform, eventlogs=eventlogs, cluster=cluster, - platform=platform, driverlog=driverlog, jvm_heap_size=jvm_heap_size, jvm_threads=jvm_threads, diff --git a/user_tools/tests/spark_rapids_tools_e2e/features/event_log_processing.feature b/user_tools/tests/spark_rapids_tools_e2e/features/event_log_processing.feature index cd66b0bb6..fc7ec2a52 100644 --- a/user_tools/tests/spark_rapids_tools_e2e/features/event_log_processing.feature +++ b/user_tools/tests/spark_rapids_tools_e2e/features/event_log_processing.feature @@ -16,6 +16,7 @@ Feature: Event Log Processing @test_id_ELP_0001 Scenario Outline: Tool spark_rapids runs with different types of event logs + Given platform is "" When spark-rapids tool is executed with "" eventlogs Then stderr contains the following """ @@ -25,12 +26,13 @@ Feature: Event Log Processing And return code is "0" Examples: - | event_logs | expected_stderr | processed_apps_count | - | invalid_path_eventlog | process.failure.count = 1;invalid_path_eventlog not found, skipping! | 0 | - | gpu_eventlog.zstd | process.skipped.count = 1;GpuEventLogException: Cannot parse event logs from GPU run: skipping this file | 0 | - | photon_eventlog.zstd | process.success.count = 1; | 1 | - | streaming_eventlog.zstd | process.skipped.count = 1;StreamingEventLogException: Encountered Spark Structured Streaming Job: skipping this file! | 0 | - | incorrect_app_status_eventlog.zstd | process.NA.count = 1;IncorrectAppStatusException: Application status is incorrect. Missing AppInfo | 0 | + | platform | event_logs | expected_stderr | processed_apps_count | + | onprem | invalid_path_eventlog | process.failure.count = 1;invalid_path_eventlog not found, skipping! | 0 | + | onprem | gpu_eventlog.zstd | process.skipped.count = 1;GpuEventLogException: Cannot parse event logs from GPU run: skipping this file | 0 | + | onprem | streaming_eventlog.zstd | process.skipped.count = 1;StreamingEventLogException: Encountered Spark Structured Streaming Job: skipping this file! | 0 | + | onprem | incorrect_app_status_eventlog.zstd | process.NA.count = 1;IncorrectAppStatusException: Application status is incorrect. Missing AppInfo | 0 | + | onprem | photon_eventlog.zstd | process.skipped.count = 1;UnsupportedSparkRuntimeException: Platform 'onprem' does not support the runtime 'PHOTON' | 0 | + | databricks-aws | photon_eventlog.zstd | process.success.count = 1; | 1 | @test_id_ELP_0002 Scenario: Qualification tool JAR crashes