diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala index f0f6824da..e57f5700e 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala @@ -16,13 +16,16 @@ package com.nvidia.spark.rapids.tool.analysis +import java.util.concurrent.TimeUnit + import scala.collection.mutable +import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticMetricsProfileResult} import org.apache.spark.sql.rapids.tool.{AppBase, ToolUtils} import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo -import org.apache.spark.sql.rapids.tool.store.TaskModel +import org.apache.spark.sql.rapids.tool.store.{AccumInfo, TaskModel} /** * Does analysis on the DataFrames from object of AppBase. @@ -408,12 +411,62 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { */ private def aggregateSparkMetricsByStageInternal(index: Int): Unit = { // TODO: this has stage attempts. we should handle different attempts + + // For Photon apps, peak memory and shuffle write time need to be calculated from accumulators + // instead of task metrics. + // Approach: + // 1. Collect accumulators for each metric type. + // 2. For each stage, retrieve the relevant accumulators and calculate aggregated values. + // Note: + // - A HashMap could be used instead of separate mutable.ArrayBuffer for each metric type, + // but avoiding it for readability. + val photonPeakMemoryAccumInfos = mutable.ArrayBuffer[AccumInfo]() + val photonShuffleWriteTimeAccumInfos = mutable.ArrayBuffer[AccumInfo]() + + if (app.isPhoton) { + app.accumManager.applyToAccumInfoMap { accumInfo => + accumInfo.infoRef.name.value match { + case name if name.contains( + DatabricksParseHelper.PHOTON_METRIC_PEAK_MEMORY_LABEL) => + // Collect accumulators for peak memory + photonPeakMemoryAccumInfos += accumInfo + case name if name.contains( + DatabricksParseHelper.PHOTON_METRIC_SHUFFLE_WRITE_TIME_LABEL) => + // Collect accumulators for shuffle write time + photonShuffleWriteTimeAccumInfos += accumInfo + case _ => // Ignore other accumulators + } + } + } + app.stageManager.getAllStages.foreach { sm => // TODO: Should we only consider successful tasks? val tasksInStage = app.taskManager.getTasks(sm.stageInfo.stageId, sm.stageInfo.attemptNumber()) // count duplicate task attempts val numAttempts = tasksInStage.size + + val (peakMemoryMax, shuffleWriteTimeSum) = if (app.isPhoton) { + // For max peak memory, we need to look at the accumulators at the task level. + val peakMemoryValues = tasksInStage.flatMap { taskModel => + photonPeakMemoryAccumInfos.flatMap { accumInfo => + accumInfo.taskUpdatesMap.get(taskModel.taskId) + } + } + // For sum of shuffle write time, we need to look at the accumulators at the stage level. + val shuffleWriteValues = photonShuffleWriteTimeAccumInfos.flatMap { accumInfo => + accumInfo.stageValuesMap.get(sm.stageInfo.stageId) + } + (AppSparkMetricsAnalyzer.maxWithEmptyHandling(peakMemoryValues), + TimeUnit.NANOSECONDS.toMillis(shuffleWriteValues.sum)) + } else { + // For non-Photon apps, use the task metrics directly. + val peakMemoryValues = tasksInStage.map(_.peakExecutionMemory) + val shuffleWriteTime = tasksInStage.map(_.sw_writeTime) + (AppSparkMetricsAnalyzer.maxWithEmptyHandling(peakMemoryValues), + shuffleWriteTime.sum) + } + val (durSum, durMax, durMin, durAvg) = AppSparkMetricsAnalyzer.getDurations(tasksInStage) val stageRow = StageAggTaskMetricsProfileResult(index, sm.stageInfo.stageId, @@ -434,7 +487,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { tasksInStage.map(_.memoryBytesSpilled).sum, tasksInStage.map(_.output_bytesWritten).sum, tasksInStage.map(_.output_recordsWritten).sum, - AppSparkMetricsAnalyzer.maxWithEmptyHandling(tasksInStage.map(_.peakExecutionMemory)), + peakMemoryMax, tasksInStage.map(_.resultSerializationTime).sum, AppSparkMetricsAnalyzer.maxWithEmptyHandling(tasksInStage.map(_.resultSize)), tasksInStage.map(_.sr_fetchWaitTime).sum, @@ -446,7 +499,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { tasksInStage.map(_.sr_totalBytesRead).sum, tasksInStage.map(_.sw_bytesWritten).sum, tasksInStage.map(_.sw_recordsWritten).sum, - tasksInStage.map(_.sw_writeTime).sum + shuffleWriteTimeSum ) stageLevelSparkMetrics(index).put(sm.stageInfo.stageId, stageRow) } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DatabricksParseHelper.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DatabricksParseHelper.scala index 4916d3529..886c9986b 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DatabricksParseHelper.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DatabricksParseHelper.scala @@ -17,15 +17,19 @@ package com.nvidia.spark.rapids.tool.planparser import java.nio.file.Paths +import java.util.concurrent.TimeUnit import scala.util.control.NonFatal import scala.util.matching.Regex +import com.nvidia.spark.rapids.tool.profiling.SQLAccumProfileResults +import com.nvidia.spark.rapids.tool.views.IoMetrics import org.json4s.{DefaultFormats, Formats} import org.json4s.jackson.JsonMethods import org.json4s.jackson.JsonMethods.parse import org.apache.spark.internal.Logging +import org.apache.spark.sql.rapids.tool.UnsupportedMetricNameException import org.apache.spark.sql.rapids.tool.util.UTF8Source // Utilities used to handle Databricks and Photon Ops @@ -48,6 +52,13 @@ object DatabricksParseHelper extends Logging { val SUB_PROP_JOB_ID = "JobId" val SUB_PROP_RUN_NAME = "RunName" + // scalastyle:off + // Photon metric labels that are used as alternatives to Spark metric labels + val PHOTON_METRIC_CUMULATIVE_TIME_LABEL = "cumulative time" // Alternative for "scan time" + val PHOTON_METRIC_PEAK_MEMORY_LABEL = "peak memory usage" // Alternative for "peak execution memory" + val PHOTON_METRIC_SHUFFLE_WRITE_TIME_LABEL = "part of shuffle file write" // Alternative for "shuffle write time" + // scalastyle:on + private val PHOTON_PATTERN: Regex = "Photon[a-zA-Z]*".r private val PHOTON_OPS_MAPPING_DIR = "photonOperatorMappings" // TODO: Create separate mapping file for different Photon/Databricks versions @@ -147,4 +158,21 @@ object DatabricksParseHelper extends Logging { def mapPhotonToSpark(inputStr: String): String = { PHOTON_PATTERN.replaceAllIn(inputStr, m => photonToSparkMapping.getOrElse(m.matched, m.matched)) } + + /** + * Checks if 'accum' is a Photon I/O metric. + */ + def isPhotonIoMetric(accum: SQLAccumProfileResults): Boolean = + accum.name == PHOTON_METRIC_CUMULATIVE_TIME_LABEL && accum.nodeName.contains("Scan") + + /** + * Updates the I/O metrics for Photon apps based on the accumulator values. + */ + def updatePhotonIoMetric(accum: SQLAccumProfileResults, ioMetrics: IoMetrics): Unit = { + accum.name match { + case PHOTON_METRIC_CUMULATIVE_TIME_LABEL if accum.nodeName.contains("Scan") => + ioMetrics.scanTime = TimeUnit.NANOSECONDS.toMillis(accum.total) + case _ => throw UnsupportedMetricNameException(accum.name) + } + } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/DataSourceView.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/DataSourceView.scala index 157af2d0b..ff9a0699c 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/DataSourceView.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/DataSourceView.scala @@ -19,6 +19,7 @@ package com.nvidia.spark.rapids.tool.views import scala.collection.Seq import com.nvidia.spark.rapids.tool.analysis.{ProfAppIndexMapperTrait, QualAppIndexMapperTrait} +import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper import com.nvidia.spark.rapids.tool.profiling.{DataSourceProfileResult, SQLAccumProfileResults} import com.nvidia.spark.rapids.tool.qualification.QualSQLPlanAnalyzer @@ -40,6 +41,12 @@ object IoMetrics { val DECODE_TIME_LABEL = "GPU decode time" val EMPTY_IO_METRICS: IoMetrics = IoMetrics(0, 0, 0, 0) + + /** + * Get all labels for IoMetrics + */ + def getAllLabels: Seq[String] = Seq( + BUFFER_TIME_LABEL, SCAN_TIME_LABEL, DATA_SIZE_LABEL, DECODE_TIME_LABEL) } trait AppDataSourceViewTrait extends ViewableTrait[DataSourceProfileResult] { @@ -48,11 +55,13 @@ trait AppDataSourceViewTrait extends ViewableTrait[DataSourceProfileResult] { private def getIoMetrics(sqlAccums: Seq[SQLAccumProfileResults]): IoMetrics = { val finalRes = IoMetrics(0, 0, 0, 0) try { - sqlAccums.map(accum => accum.name match { + sqlAccums.foreach(accum => accum.name match { case IoMetrics.BUFFER_TIME_LABEL => finalRes.bufferTime = accum.total case IoMetrics.SCAN_TIME_LABEL => finalRes.scanTime = accum.total case IoMetrics.DATA_SIZE_LABEL => finalRes.dataSize = accum.total case IoMetrics.DECODE_TIME_LABEL => finalRes.decodeTime = accum.total + case _ if DatabricksParseHelper.isPhotonIoMetric(accum) => + DatabricksParseHelper.updatePhotonIoMetric(accum, finalRes) case _ => throw UnsupportedMetricNameException(accum.name) }) } catch { @@ -98,11 +107,9 @@ trait AppDataSourceViewTrait extends ViewableTrait[DataSourceProfileResult] { index: Int, appSqlAccums: Seq[SQLAccumProfileResults]): Seq[DataSourceProfileResult] = { // Filter appSqlAccums to get only required metrics - val dataSourceMetrics = appSqlAccums.filter( - sqlAccum => sqlAccum.name.contains(IoMetrics.BUFFER_TIME_LABEL) - || sqlAccum.name.contains(IoMetrics.SCAN_TIME_LABEL) - || sqlAccum.name.contains(IoMetrics.DECODE_TIME_LABEL) - || sqlAccum.name.equals(IoMetrics.DATA_SIZE_LABEL)) + val dataSourceMetrics = appSqlAccums.filter(sqlAccum => + IoMetrics.getAllLabels.contains(sqlAccum.name) || + app.isPhoton && DatabricksParseHelper.isPhotonIoMetric(sqlAccum)) val dsFromLastPlan = app.dataSourceInfo.map { ds => val sqlIdtoDs = dataSourceMetrics.filter( diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumManager.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumManager.scala index aeac4eeb3..14f60fe29 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumManager.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumManager.scala @@ -66,4 +66,11 @@ class AccumManager { def getMaxStageValue(id: Long): Option[Long] = { accumInfoMap.get(id).map(_.getMaxStageValue.get) } + + /** + * Applies the function `f` to each AccumInfo in the accumInfoMap. + */ + def applyToAccumInfoMap(f: AccumInfo => Unit): Unit = { + accumInfoMap.values.foreach(f) + } } diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeUtil.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeUtil.scala index d39323c7a..a5babf35d 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeUtil.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeUtil.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.rapids.tool.util import java.io.{PrintWriter, StringWriter} +import java.lang.management.ManagementFactory + +import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` import com.nvidia.spark.rapids.tool.ToolTextFileWriter import org.apache.hadoop.conf.Configuration @@ -49,6 +52,12 @@ object RuntimeUtil extends Logging { // Add the Spark version used in runtime. // Note that it is different from the Spark version used in the build. buildProps.setProperty("runtime.spark.version", ToolUtils.sparkRuntimeVersion) + // Add the JVM and OS information + getJVMOSInfo.foreach { + kv => buildProps.setProperty(s"runtime.${kv._1}", kv._2) + } + // get the JVM memory arguments + getJVMHeapArguments.foreach(kv => buildProps.setProperty(s"runtime.${kv._1}", kv._2)) val reportWriter = new ToolTextFileWriter(outputDir, REPORT_FILE_NAME, REPORT_LABEL, hadoopConf) try { reportWriter.writeProperties(buildProps, REPORT_LABEL) @@ -73,6 +82,25 @@ object RuntimeUtil extends Logging { "os.version" -> System.getProperty("os.version") ) } -} - + def getJVMHeapArguments: Map[String, String] = { + ManagementFactory.getRuntimeMXBean.getInputArguments.filter( + p => p.startsWith("-Xmx") || p.startsWith("-Xms") || p.startsWith("-XX:")).map { + sizeArg => + if (sizeArg.startsWith("-Xmx")) { + ("jvm.arg.heap.max", sizeArg.drop(4)) + } else if (sizeArg.startsWith("-Xms")) { + ("jvm.arg.heap.min", sizeArg.drop(4)) + } else { // this is heap argument + // drop the first "-XX:" + val dropSize = if (sizeArg.startsWith("-XX:+")) 5 else 4 + val parts = sizeArg.drop(dropSize).split("=") + if (parts.length == 2) { + (s"jvm.arg.gc.${parts(0)}", parts(1)) + } else { + (s"jvm.arg.gc.${parts(0)}", "") + } + } + }.toMap + } +} diff --git a/core/src/test/resources/ProfilingExpectations/nds_q88_photon_db_13_3_job_metrics_agg_expectation.csv b/core/src/test/resources/ProfilingExpectations/nds_q88_photon_db_13_3_job_metrics_agg_expectation.csv new file mode 100644 index 000000000..97de9ad08 --- /dev/null +++ b/core/src/test/resources/ProfilingExpectations/nds_q88_photon_db_13_3_job_metrics_agg_expectation.csv @@ -0,0 +1,58 @@ +appIndex,jobId,numTasks,Duration,diskBytesSpilled_sum,duration_sum,duration_max,duration_min,duration_avg,executorCPUTime_sum,executorDeserializeCPUTime_sum,executorDeserializeTime_sum,executorRunTime_sum,input_bytesRead_sum,input_recordsRead_sum,jvmGCTime_sum,memoryBytesSpilled_sum,output_bytesWritten_sum,output_recordsWritten_sum,peakExecutionMemory_max,resultSerializationTime_sum,resultSize_max,sr_fetchWaitTime_sum,sr_localBlocksFetched_sum,sr_localBytesRead_sum,sr_remoteBlocksFetched_sum,sr_remoteBytesRead_sum,sr_remoteBytesReadToDisk_sum,sr_totalBytesRead_sum,sw_bytesWritten_sum,sw_recordsWritten_sum,sw_writeTime_sum +1,48,431,237976,0,371230,1032,333,861.3,343288,1261,1286,367785,190131859,8639936081,160,0,0,0,169667947,21,15767,8,3,15657,3,15657,0,31314,18964,431,16 +1,47,432,214657,0,376777,1133,499,872.2,346230,1272,1310,373271,14007280,8639936081,144,0,0,0,159530057,11,15577,4,3,15657,3,15657,0,31314,19008,432,20 +1,46,433,191440,0,457364,3323,391,1056.3,352763,1508,2509,451358,5242628040,8639936081,1912,0,0,0,250840493,9,16203,551,3,15657,3,15657,0,31314,19052,433,16 +1,49,1,186241,0,266,266,266,266.0,86,1,1,261,0,0,0,0,0,0,138414192,0,5344,10,209,9196,222,9768,0,18964,44,1,0 +1,45,433,166081,0,415849,1448,339,960.4,349795,1302,1375,412139,2276478144,8639936081,568,0,0,0,195992906,2,15780,7,3,15657,3,15657,0,31314,19052,433,34 +1,44,431,139667,0,398973,1403,365,925.7,354119,1281,1327,395265,1075691986,8639936081,328,0,0,0,188587155,0,15767,10,3,15657,3,15657,0,31314,18964,431,17 +1,50,1,122711,0,267,267,267,267.0,71,1,1,262,0,0,0,0,0,0,138414192,0,5343,58,219,9636,213,9372,0,19008,44,1,0 +1,43,432,114755,0,403652,1369,329,934.4,353325,1290,1326,399766,1395949742,8639936081,624,0,0,0,201771890,13,15767,14,3,15657,3,15657,0,31314,19008,432,16 +1,51,1,97958,0,386,386,386,386.0,60,1,1,381,0,0,0,0,0,0,138414192,0,5343,154,221,9724,210,9240,0,18964,44,1,0 +1,42,431,89634,0,616500,1899,589,1430.4,378074,1330,1515,612098,16461920726,8639936081,4132,0,0,0,216740322,23,15805,10,3,15657,3,15657,0,31314,18964,431,16 +1,52,1,71718,0,384,384,384,384.0,54,1,1,379,0,0,0,0,0,0,138414192,0,5343,170,223,9812,210,9240,0,19052,44,1,0 +1,41,431,51085,0,759623,2321,918,1762.5,394996,1460,2027,754015,26337468742,8639936081,7772,0,0,0,250648581,87,16157,170,3,15657,3,15657,0,31314,18964,431,19 +1,53,1,46297,0,136,136,136,136.0,57,1,1,131,0,0,0,0,0,0,138414192,0,5344,0,214,9416,219,9636,0,19052,44,1,0 +1,54,1,23051,0,340,340,340,340.0,36,1,1,334,0,0,0,0,0,0,138414192,0,5343,223,215,9460,217,9548,0,19008,44,1,0 +1,31,1,6979,0,6738,6738,6738,6738.0,5104,128,688,6035,349526,86400,53,0,0,0,155563380,1,10759,0,0,0,0,0,0,0,7239,1800,0 +1,34,1,6953,0,6725,6725,6725,6725.0,479,185,677,6036,349526,86400,53,0,0,0,155563380,0,9814,0,0,0,0,0,0,0,7239,1800,0 +1,33,1,6940,0,6729,6729,6729,6729.0,206,216,679,6035,349526,86400,53,0,0,0,155563380,1,9896,0,0,0,0,0,0,0,7239,1800,0 +1,35,1,6925,0,6729,6729,6729,6729.0,157,136,681,6035,12261,1350,53,0,0,0,155199546,1,9839,0,0,0,0,0,0,0,699,165,0 +1,38,1,6855,0,6743,6743,6743,6743.0,187,256,688,6035,349526,86400,53,0,0,0,155563380,1,9927,0,0,0,0,0,0,0,7239,1800,0 +1,0,1,6033,0,5699,5699,5699,5699.0,422,948,1114,4382,0,0,37,0,0,0,0,8,2794,0,0,0,0,0,0,0,0,0,0 +1,13,200,5707,0,87661,966,349,438.3,9821,427,951,84265,0,0,144,0,0,0,0,9,6258,0,0,0,0,0,0,0,0,0,0 +1,23,200,5479,0,84240,490,355,421.2,5290,200,214,82784,0,0,136,0,0,0,0,0,6214,0,0,0,0,0,0,0,0,0,0 +1,21,200,5271,0,80904,485,353,404.5,6004,203,220,79384,0,0,136,0,0,0,0,1,6302,0,0,0,0,0,0,0,0,0,0 +1,27,200,4728,0,70760,442,309,353.8,4042,200,209,69494,0,0,152,0,0,0,0,10,5788,0,0,0,0,0,0,0,0,0,0 +1,3,1,4708,0,4693,4693,4693,4693.0,280,701,804,3796,0,0,26,0,0,0,0,7,2834,0,0,0,0,0,0,0,0,0,0 +1,25,200,4603,0,70379,569,314,351.9,4106,200,216,69040,0,0,168,0,0,0,0,14,5708,0,0,0,0,0,0,0,0,0,0 +1,36,1,4556,0,4332,4332,4332,4332.0,3359,95,401,3907,30328,7200,39,0,0,0,155245068,1,10552,0,0,0,0,0,0,0,7719,1920,0 +1,29,200,4555,0,69682,423,310,348.4,3730,200,218,68521,0,0,168,0,0,0,0,9,5748,0,0,0,0,0,0,0,0,0,0 +1,32,1,4515,0,4334,4334,4334,4334.0,260,130,404,3907,349526,86400,39,0,0,0,155563380,1,9851,0,0,0,0,0,0,0,7239,1800,0 +1,39,1,4488,0,4322,4322,4322,4322.0,112,124,392,3907,349526,86400,39,0,0,0,155563380,1,9926,0,0,0,0,0,0,0,7239,1800,0 +1,37,1,4481,0,4334,4334,4334,4334.0,136,144,405,3907,349526,86400,39,0,0,0,155563380,1,9851,0,0,0,0,0,0,0,7239,1800,0 +1,40,1,4476,0,4327,4327,4327,4327.0,98,147,394,3907,349526,86400,39,0,0,0,155563380,1,9895,0,0,0,0,0,0,0,7239,1800,0 +1,56,1,1055,0,1022,1022,1022,1022.0,758,77,95,901,0,0,0,0,0,0,134218344,6,10091,5,218,9592,220,9680,0,19272,0,0,0 +1,19,200,803,0,11895,145,38,59.5,943,209,252,10017,0,0,56,0,0,0,0,22,2739,0,0,0,0,0,0,0,0,0,0 +1,26,1,316,0,312,312,312,312.0,2,1,1,306,0,0,0,0,0,0,0,0,3777,0,0,0,0,0,0,0,0,0,0 +1,2,1,280,0,267,267,267,267.0,6,4,4,124,0,0,0,0,0,0,0,1,3342,0,0,0,0,0,0,0,0,0,0 +1,11,1,264,0,254,254,254,254.0,5,3,3,241,0,0,0,0,0,0,0,0,2913,0,0,0,0,0,0,0,0,0,0 +1,7,1,240,0,227,227,227,227.0,5,4,4,213,0,0,114,0,0,0,0,0,2206,0,0,0,0,0,0,0,0,0,0 +1,1,1,209,0,173,173,173,173.0,28,5,6,152,0,0,0,0,0,0,0,0,2506,0,0,0,0,0,0,0,0,0,0 +1,5,1,179,0,165,165,165,165.0,4,4,4,151,0,0,0,0,0,0,0,0,2475,0,0,0,0,0,0,0,0,0,0 +1,14,1,151,0,143,143,143,143.0,3,1,1,132,0,0,0,0,0,0,0,1,3120,0,0,0,0,0,0,0,0,0,0 +1,4,1,147,0,139,139,139,139.0,22,5,6,121,0,0,0,0,0,0,0,0,2334,0,0,0,0,0,0,0,0,0,0 +1,20,1,141,0,137,137,137,137.0,1,1,1,130,0,0,0,0,0,0,0,0,2170,0,0,0,0,0,0,0,0,0,0 +1,28,1,140,0,136,136,136,136.0,2,1,1,130,0,0,0,0,0,0,0,0,3784,0,0,0,0,0,0,0,0,0,0 +1,18,1,129,0,124,124,124,124.0,2,1,1,116,0,0,0,0,0,0,0,0,2501,0,0,0,0,0,0,0,0,0,0 +1,16,1,125,0,117,117,117,117.0,2,1,1,108,0,0,0,0,0,0,0,0,2758,0,0,0,0,0,0,0,0,0,0 +1,6,1,123,0,113,113,113,113.0,4,3,3,100,0,0,0,0,0,0,0,0,2208,0,0,0,0,0,0,0,0,0,0 +1,10,1,120,0,110,110,110,110.0,6,3,3,98,0,0,0,0,0,0,0,0,3565,0,0,0,0,0,0,0,0,0,0 +1,9,1,114,0,104,104,104,104.0,5,3,3,90,0,0,0,0,0,0,0,1,3514,0,0,0,0,0,0,0,0,0,0 +1,12,1,105,0,85,85,85,85.0,4,3,3,72,0,0,0,0,0,0,0,0,3369,0,0,0,0,0,0,0,0,0,0 +1,17,1,103,0,97,97,97,97.0,2,2,2,89,0,0,0,0,0,0,0,0,3003,0,0,0,0,0,0,0,0,0,0 +1,8,1,102,0,95,95,95,95.0,4,3,3,82,0,0,0,0,0,0,0,0,3142,0,0,0,0,0,0,0,0,0,0 +1,30,1,73,0,67,67,67,67.0,2,1,1,62,0,0,0,0,0,0,0,0,3199,0,0,0,0,0,0,0,0,0,0 +1,24,1,72,0,59,59,59,59.0,2,1,1,51,0,0,0,0,0,0,0,0,3288,0,0,0,0,0,0,0,0,0,0 +1,22,1,70,0,65,65,65,65.0,2,1,1,59,0,0,0,0,0,0,0,0,3436,0,0,0,0,0,0,0,0,0,0 +1,55,1,65,0,54,54,54,54.0,27,1,1,49,0,0,0,0,0,0,138414192,0,5343,0,216,9504,215,9460,0,18964,44,1,0 +1,15,1,64,0,58,58,58,58.0,2,1,1,50,0,0,0,0,0,0,0,0,2306,0,0,0,0,0,0,0,0,0,0 diff --git a/core/src/test/resources/ProfilingExpectations/nds_q88_photon_db_13_3_sql_metrics_agg_expectation.csv b/core/src/test/resources/ProfilingExpectations/nds_q88_photon_db_13_3_sql_metrics_agg_expectation.csv new file mode 100644 index 000000000..63aaf7a1b --- /dev/null +++ b/core/src/test/resources/ProfilingExpectations/nds_q88_photon_db_13_3_sql_metrics_agg_expectation.csv @@ -0,0 +1,2 @@ +appIndex,appID,sqlID,description,numTasks,Duration,executorCPUTime,executorRunTime,executorCPURatio,diskBytesSpilled_sum,duration_sum,duration_max,duration_min,duration_avg,executorCPUTime_sum,executorDeserializeCPUTime_sum,executorDeserializeTime_sum,executorRunTime_sum,input_bytesRead_sum,input_recordsRead_sum,jvmGCTime_sum,memoryBytesSpilled_sum,output_bytesWritten_sum,output_recordsWritten_sum,peakExecutionMemory_max,resultSerializationTime_sum,resultSize_max,sr_fetchWaitTime_sum,sr_localBlocksFetched_sum,sr_localBytesRead_sum,sr_remoteBlocksFetched_sum,sr_remoteBytesRead_sum,sr_remoteBytesReadToDisk_sum,sr_totalBytesRead_sum,sw_bytesWritten_sum,sw_recordsWritten_sum,sw_writeTime_sum +1,"app-20240919162642-0000",26,"query88",3472,250542,2883837,3818106,75.53,0,3858136,6743,54,1111.2,2883837,12349,18186,3818106,52997115316,69120188398,16100,0,0,0,250840493,181,16203,1394,1759,201596,1750,201200,0,402796,218614,19946,154 diff --git a/core/src/test/resources/ProfilingExpectations/nds_q88_photon_db_13_3_stage_metrics_agg_expectation.csv b/core/src/test/resources/ProfilingExpectations/nds_q88_photon_db_13_3_stage_metrics_agg_expectation.csv new file mode 100644 index 000000000..ae34c1ac0 --- /dev/null +++ b/core/src/test/resources/ProfilingExpectations/nds_q88_photon_db_13_3_stage_metrics_agg_expectation.csv @@ -0,0 +1,58 @@ +appIndex,stageId,numTasks,Duration,diskBytesSpilled_sum,duration_sum,duration_max,duration_min,duration_avg,executorCPUTime_sum,executorDeserializeCPUTime_sum,executorDeserializeTime_sum,executorRunTime_sum,input_bytesRead_sum,input_recordsRead_sum,jvmGCTime_sum,memoryBytesSpilled_sum,output_bytesWritten_sum,output_recordsWritten_sum,peakExecutionMemory_max,resultSerializationTime_sum,resultSize_max,sr_fetchWaitTime_sum,sr_localBlocksFetched_sum,sr_localBytesRead_sum,sr_remoteBlocksFetched_sum,sr_remoteBytesRead_sum,sr_remoteBytesReadToDisk_sum,sr_totalBytesRead_sum,sw_bytesWritten_sum,sw_recordsWritten_sum,sw_writeTime_sum +1,58,431,237799,0,371230,1032,333,861.3,343288,1261,1286,367785,190131859,8639936081,160,0,0,0,169667947,21,15767,8,3,15657,3,15657,0,31314,18964,431,16 +1,54,432,214633,0,376777,1133,499,872.2,346230,1272,1310,373271,14007280,8639936081,144,0,0,0,159530057,11,15577,4,3,15657,3,15657,0,31314,19008,432,20 +1,44,433,191384,0,457364,3323,391,1056.3,352763,1508,2509,451358,5242628040,8639936081,1912,0,0,0,250840493,9,16203,551,3,15657,3,15657,0,31314,19052,433,16 +1,61,1,186240,0,266,266,266,266.0,86,1,1,261,0,0,0,0,0,0,138414192,0,5344,10,209,9196,222,9768,0,18964,44,1,0 +1,46,433,166015,0,415849,1448,339,960.4,349795,1302,1375,412139,2276478144,8639936081,568,0,0,0,195992906,2,15780,7,3,15657,3,15657,0,31314,19052,433,34 +1,50,431,139628,0,398973,1403,365,925.7,354119,1281,1327,395265,1075691986,8639936081,328,0,0,0,188587155,0,15767,10,3,15657,3,15657,0,31314,18964,431,17 +1,64,1,122708,0,267,267,267,267.0,71,1,1,262,0,0,0,0,0,0,138414192,0,5343,58,219,9636,213,9372,0,19008,44,1,0 +1,48,432,114722,0,403652,1369,329,934.4,353325,1290,1326,399766,1395949742,8639936081,624,0,0,0,201771890,13,15767,14,3,15657,3,15657,0,31314,19008,432,16 +1,67,1,97957,0,386,386,386,386.0,60,1,1,381,0,0,0,0,0,0,138414192,0,5343,154,221,9724,210,9240,0,18964,44,1,0 +1,56,431,89600,0,616500,1899,589,1430.4,378074,1330,1515,612098,16461920726,8639936081,4132,0,0,0,216740322,23,15805,10,3,15657,3,15657,0,31314,18964,431,16 +1,70,1,71716,0,384,384,384,384.0,54,1,1,379,0,0,0,0,0,0,138414192,0,5343,170,223,9812,210,9240,0,19052,44,1,0 +1,52,431,51060,0,759623,2321,918,1762.5,394996,1460,2027,754015,26337468742,8639936081,7772,0,0,0,250648581,87,16157,170,3,15657,3,15657,0,31314,18964,431,19 +1,73,1,46297,0,136,136,136,136.0,57,1,1,131,0,0,0,0,0,0,138414192,0,5344,0,214,9416,219,9636,0,19052,44,1,0 +1,76,1,23048,0,340,340,340,340.0,36,1,1,334,0,0,0,0,0,0,138414192,0,5343,223,215,9460,217,9548,0,19008,44,1,0 +1,31,1,6956,0,6738,6738,6738,6738.0,5104,128,688,6035,349526,86400,53,0,0,0,155563380,1,10759,0,0,0,0,0,0,0,7239,1800,0 +1,32,1,6945,0,6725,6725,6725,6725.0,479,185,677,6036,349526,86400,53,0,0,0,155563380,0,9814,0,0,0,0,0,0,0,7239,1800,0 +1,33,1,6930,0,6729,6729,6729,6729.0,206,216,679,6035,349526,86400,53,0,0,0,155563380,1,9896,0,0,0,0,0,0,0,7239,1800,0 +1,34,1,6907,0,6729,6729,6729,6729.0,157,136,681,6035,12261,1350,53,0,0,0,155199546,1,9839,0,0,0,0,0,0,0,699,165,0 +1,38,1,6842,0,6743,6743,6743,6743.0,187,256,688,6035,349526,86400,53,0,0,0,155563380,1,9927,0,0,0,0,0,0,0,7239,1800,0 +1,0,1,5904,0,5699,5699,5699,5699.0,422,948,1114,4382,0,0,37,0,0,0,0,8,2794,0,0,0,0,0,0,0,0,0,0 +1,13,200,5697,0,87661,966,349,438.3,9821,427,951,84265,0,0,144,0,0,0,0,9,6258,0,0,0,0,0,0,0,0,0,0 +1,23,200,5476,0,84240,490,355,421.2,5290,200,214,82784,0,0,136,0,0,0,0,0,6214,0,0,0,0,0,0,0,0,0,0 +1,21,200,5265,0,80904,485,353,404.5,6004,203,220,79384,0,0,136,0,0,0,0,1,6302,0,0,0,0,0,0,0,0,0,0 +1,27,200,4719,0,70760,442,309,353.8,4042,200,209,69494,0,0,152,0,0,0,0,10,5788,0,0,0,0,0,0,0,0,0,0 +1,3,1,4701,0,4693,4693,4693,4693.0,280,701,804,3796,0,0,26,0,0,0,0,7,2834,0,0,0,0,0,0,0,0,0,0 +1,25,200,4599,0,70379,569,314,351.9,4106,200,216,69040,0,0,168,0,0,0,0,14,5708,0,0,0,0,0,0,0,0,0,0 +1,29,200,4552,0,69682,423,310,348.4,3730,200,218,68521,0,0,168,0,0,0,0,9,5748,0,0,0,0,0,0,0,0,0,0 +1,35,1,4525,0,4332,4332,4332,4332.0,3359,95,401,3907,30328,7200,39,0,0,0,155245068,1,10552,0,0,0,0,0,0,0,7719,1920,0 +1,36,1,4509,0,4334,4334,4334,4334.0,260,130,404,3907,349526,86400,39,0,0,0,155563380,1,9851,0,0,0,0,0,0,0,7239,1800,0 +1,39,1,4474,0,4322,4322,4322,4322.0,112,124,392,3907,349526,86400,39,0,0,0,155563380,1,9926,0,0,0,0,0,0,0,7239,1800,0 +1,40,1,4469,0,4327,4327,4327,4327.0,98,147,394,3907,349526,86400,39,0,0,0,155563380,1,9895,0,0,0,0,0,0,0,7239,1800,0 +1,37,1,4464,0,4334,4334,4334,4334.0,136,144,405,3907,349526,86400,39,0,0,0,155563380,1,9851,0,0,0,0,0,0,0,7239,1800,0 +1,107,1,1052,0,1022,1022,1022,1022.0,758,77,95,901,0,0,0,0,0,0,134218344,6,10091,5,218,9592,220,9680,0,19272,0,0,0 +1,19,200,794,0,11895,145,38,59.5,943,209,252,10017,0,0,56,0,0,0,0,22,2739,0,0,0,0,0,0,0,0,0,0 +1,26,1,315,0,312,312,312,312.0,2,1,1,306,0,0,0,0,0,0,0,0,3777,0,0,0,0,0,0,0,0,0,0 +1,2,1,276,0,267,267,267,267.0,6,4,4,124,0,0,0,0,0,0,0,1,3342,0,0,0,0,0,0,0,0,0,0 +1,11,1,260,0,254,254,254,254.0,5,3,3,241,0,0,0,0,0,0,0,0,2913,0,0,0,0,0,0,0,0,0,0 +1,7,1,238,0,227,227,227,227.0,5,4,4,213,0,0,114,0,0,0,0,0,2206,0,0,0,0,0,0,0,0,0,0 +1,1,1,202,0,173,173,173,173.0,28,5,6,152,0,0,0,0,0,0,0,0,2506,0,0,0,0,0,0,0,0,0,0 +1,5,1,177,0,165,165,165,165.0,4,4,4,151,0,0,0,0,0,0,0,0,2475,0,0,0,0,0,0,0,0,0,0 +1,14,1,148,0,143,143,143,143.0,3,1,1,132,0,0,0,0,0,0,0,1,3120,0,0,0,0,0,0,0,0,0,0 +1,4,1,145,0,139,139,139,139.0,22,5,6,121,0,0,0,0,0,0,0,0,2334,0,0,0,0,0,0,0,0,0,0 +1,20,1,140,0,137,137,137,137.0,1,1,1,130,0,0,0,0,0,0,0,0,2170,0,0,0,0,0,0,0,0,0,0 +1,28,1,139,0,136,136,136,136.0,2,1,1,130,0,0,0,0,0,0,0,0,3784,0,0,0,0,0,0,0,0,0,0 +1,18,1,127,0,124,124,124,124.0,2,1,1,116,0,0,0,0,0,0,0,0,2501,0,0,0,0,0,0,0,0,0,0 +1,16,1,122,0,117,117,117,117.0,2,1,1,108,0,0,0,0,0,0,0,0,2758,0,0,0,0,0,0,0,0,0,0 +1,6,1,120,0,113,113,113,113.0,4,3,3,100,0,0,0,0,0,0,0,0,2208,0,0,0,0,0,0,0,0,0,0 +1,10,1,118,0,110,110,110,110.0,6,3,3,98,0,0,0,0,0,0,0,0,3565,0,0,0,0,0,0,0,0,0,0 +1,9,1,110,0,104,104,104,104.0,5,3,3,90,0,0,0,0,0,0,0,1,3514,0,0,0,0,0,0,0,0,0,0 +1,17,1,102,0,97,97,97,97.0,2,2,2,89,0,0,0,0,0,0,0,0,3003,0,0,0,0,0,0,0,0,0,0 +1,8,1,100,0,95,95,95,95.0,4,3,3,82,0,0,0,0,0,0,0,0,3142,0,0,0,0,0,0,0,0,0,0 +1,12,1,98,0,85,85,85,85.0,4,3,3,72,0,0,0,0,0,0,0,0,3369,0,0,0,0,0,0,0,0,0,0 +1,30,1,71,0,67,67,67,67.0,2,1,1,62,0,0,0,0,0,0,0,0,3199,0,0,0,0,0,0,0,0,0,0 +1,22,1,68,0,65,65,65,65.0,2,1,1,59,0,0,0,0,0,0,0,0,3436,0,0,0,0,0,0,0,0,0,0 +1,24,1,68,0,59,59,59,59.0,2,1,1,51,0,0,0,0,0,0,0,0,3288,0,0,0,0,0,0,0,0,0,0 +1,81,1,63,0,54,54,54,54.0,27,1,1,49,0,0,0,0,0,0,138414192,0,5343,0,216,9504,215,9460,0,18964,44,1,0 +1,15,1,62,0,58,58,58,58.0,2,1,1,50,0,0,0,0,0,0,0,0,2306,0,0,0,0,0,0,0,0,0,0 diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala index 70a1ebbbe..97fe97741 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids.tool.profiling import java.io.File import com.nvidia.spark.rapids.tool.ToolTestUtils -import com.nvidia.spark.rapids.tool.views.RawMetricProfilerView +import com.nvidia.spark.rapids.tool.views.{ProfDataSourceView, RawMetricProfilerView} import org.scalatest.FunSuite import org.apache.spark.sql.{DataFrame, SparkSession} @@ -66,6 +66,15 @@ class AnalysisSuite extends FunSuite { expectFile("sql"), expectFile("job"), expectFile("stage")) } + test("test photon sql metrics aggregation") { + val fileName = "nds_q88_photon_db_13_3" + val expectFile = (metric: String) => { + s"${fileName}_${metric}_metrics_agg_expectation.csv" + } + testSqlMetricsAggregation(Array(s"${qualLogDir}/${fileName}.zstd"), + expectFile("sql"), expectFile("job"), expectFile("stage")) + } + test("test stage-level diagnostic aggregation simple") { val expectFile = "rapids_join_eventlog_stagediagnosticmetricsagg_expectation.csv" val logs = Array(s"$logDir/rapids_join_eventlog.zstd") @@ -176,4 +185,12 @@ class AnalysisSuite extends FunSuite { val metrics = aggResults.sqlDurAggs metrics.foreach(m => assert(m.appDuration.get == 0L)) } + + test("test photon scan metrics") { + val fileName = "nds_q88_photon_db_13_3" + val logs = Array(s"${qualLogDir}/${fileName}.zstd") + val apps = ToolTestUtils.processProfileApps(logs, sparkSession) + val dataSourceResults = ProfDataSourceView.getRawView(apps) + assert(dataSourceResults.exists(_.scan_time > 0)) + } } diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py index 55b91d81c..5f8d5cf1a 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py @@ -878,11 +878,16 @@ def _re_evaluate_platform_args(self, tool_name: str) -> dict: job_args = self.ctxt.get_ctxt('jobArgs') result = copy.deepcopy(job_args) job_resources = self._get_job_submission_resources(tool_name) + jvm_min_heap = job_resources['jvmMinHeapSize'] jvm_max_heap = job_resources['jvmMaxHeapSize'] - jvm_heap_key = f'Xmx{jvm_max_heap}g' + jvm_max_heap_key = f'Xmx{jvm_max_heap}g' + jvm_min_heap_key = f'Xms{jvm_min_heap}g' # At this point, we need to set the heap argument for the JVM. Otherwise, the process uses # its default values. - result['platformArgs']['jvmArgs'].update({jvm_heap_key: ''}) + result['platformArgs']['jvmArgs'].update({ + jvm_min_heap_key: '', + jvm_max_heap_key: '' + }) return result @timeit('Building Job Arguments and Executing Job CMD') # pylint: disable=too-many-function-args diff --git a/user_tools/src/spark_rapids_tools/utils/util.py b/user_tools/src/spark_rapids_tools/utils/util.py index ab90c34ea..cc449a4d0 100644 --- a/user_tools/src/spark_rapids_tools/utils/util.py +++ b/user_tools/src/spark_rapids_tools/utils/util.py @@ -291,13 +291,18 @@ def adjust_tools_resources(cls, else: prof_threads = max(1, jvm_threads - num_threads_unit) if concurrent_mode else jvm_threads + # calculate the min heap size based on the max heap size + min_heap = max(1, heap_unit // 2) + return { 'qualification': { 'jvmMaxHeapSize': heap_unit, + 'jvmMinHeapSize': min_heap, 'rapidsThreads': num_threads_unit }, 'profiling': { 'jvmMaxHeapSize': prof_heap, + 'jvmMinHeapSize': min_heap, 'rapidsThreads': prof_threads } }