From d55430f0eaad23ddf7aeeedf4402c47d2b6d4e89 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Tue, 12 Nov 2024 15:27:32 -0800 Subject: [PATCH 1/4] Add platform specific runtime check Signed-off-by: Partho Sarthi --- .../nvidia/spark/rapids/tool/Platform.scala | 19 ++++++++- .../rapids/tool/profiling/Profiler.scala | 8 ++-- .../spark/sql/rapids/tool/AppBase.scala | 28 +++++++++++-- .../tool/profiling/ApplicationInfo.scala | 7 ++-- .../qualification/QualificationAppInfo.scala | 2 +- .../spark/rapids/tool/ToolTestUtils.scala | 5 ++- .../tool/profiling/ApplicationInfoSuite.scala | 42 ++++++++++++++----- 7 files changed, 87 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala index 6bf62a537..c76e20fe2 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala @@ -22,7 +22,7 @@ import com.nvidia.spark.rapids.tool.profiling.ClusterProperties import org.apache.spark.internal.Logging import org.apache.spark.sql.rapids.tool.{ExistingClusterInfo, RecommendedClusterInfo} -import org.apache.spark.sql.rapids.tool.util.StringUtils +import org.apache.spark.sql.rapids.tool.util.{SparkRuntime, StringUtils} /** * Utility object containing constants for various platform names. @@ -132,6 +132,19 @@ abstract class Platform(var gpuDevice: Option[GpuDevice], var recommendedClusterInfo: Option[RecommendedClusterInfo] = None // the number of GPUs to use, this might be updated as we handle different cases var numGpus: Int = 1 + // Default runtime for the platform + val defaultRuntime: SparkRuntime.SparkRuntime = SparkRuntime.SPARK + // Set of supported runtimes for the platform + protected val supportedRuntimes: Set[SparkRuntime.SparkRuntime] = Set( + SparkRuntime.SPARK, SparkRuntime.SPARK_RAPIDS + ) + + /** + * Checks if the given runtime is supported by the platform. + */ + def isRuntimeSupported(runtime: SparkRuntime.SparkRuntime): Boolean = { + supportedRuntimes.contains(runtime) + } // This function allow us to have one gpu type used by the auto // tuner recommendations but have a different GPU used for speedup @@ -506,6 +519,10 @@ abstract class DatabricksPlatform(gpuDevice: Option[GpuDevice], override val defaultGpuDevice: GpuDevice = T4Gpu override def isPlatformCSP: Boolean = true + override val supportedRuntimes: Set[SparkRuntime.SparkRuntime] = Set( + SparkRuntime.SPARK, SparkRuntime.SPARK_RAPIDS, SparkRuntime.PHOTON + ) + // note that Databricks generally sets the spark.executor.memory for the user. Our // auto tuner heuristics generally sets it lower then Databricks so go ahead and // allow our auto tuner to take affect for this in anticipation that we will use more diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala index bad5524e3..1b7d1cced 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.util.control.NonFatal -import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, EventLogInfo, EventLogPathProcessor, FailedEventLog, PlatformFactory, ToolBase} +import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, EventLogInfo, EventLogPathProcessor, FailedEventLog, Platform, PlatformFactory, ToolBase} import com.nvidia.spark.rapids.tool.profiling.AutoTuner.loadClusterProps import com.nvidia.spark.rapids.tool.views._ import org.apache.hadoop.conf.Configuration @@ -43,6 +43,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea private val outputCombined: Boolean = appArgs.combined() private val useAutoTuner: Boolean = appArgs.autoTuner() private val outputAlignedSQLIds: Boolean = appArgs.outputSqlIdsAligned() + // Unlike qualification tool, profiler tool does not platform per app + private val platform: Platform = PlatformFactory.createInstance(appArgs.platform()) override def getNumThreads: Int = appArgs.numThreads.getOrElse( Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt) @@ -286,9 +288,9 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea private def createApp(path: EventLogInfo, index: Int, hadoopConf: Configuration): Either[FailureApp, ApplicationInfo] = { try { - // This apps only contains 1 app in each loop. + // These apps only contains 1 app in each loop. val startTime = System.currentTimeMillis() - val app = new ApplicationInfo(hadoopConf, path, index) + val app = new ApplicationInfo(hadoopConf, path, index, platform) EventLogPathProcessor.logApplicationInfo(app) val endTime = System.currentTimeMillis() if (!app.isAppMetaDefined) { diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala index e3313b832..eb1f69fc7 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala @@ -23,7 +23,7 @@ import scala.collection.immutable import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, LinkedHashSet, Map} import com.nvidia.spark.rapids.SparkRapidsBuildInfoEvent -import com.nvidia.spark.rapids.tool.{DatabricksEventLog, DatabricksRollingEventLogFilesFileReader, EventLogInfo} +import com.nvidia.spark.rapids.tool.{DatabricksEventLog, DatabricksRollingEventLogFilesFileReader, EventLogInfo, Platform} import com.nvidia.spark.rapids.tool.planparser.{HiveParseHelper, ReadParser} import com.nvidia.spark.rapids.tool.planparser.HiveParseHelper.isHiveTableScanNode import com.nvidia.spark.rapids.tool.profiling.{BlockManagerRemovedCase, DriverAccumCase, JobInfoClass, ResourceProfileInfoCase, SQLExecutionInfoClass, SQLPlanMetricsCase} @@ -37,12 +37,13 @@ import org.apache.spark.scheduler.{SparkListenerEvent, StageInfo} import org.apache.spark.sql.execution.SparkPlanInfo import org.apache.spark.sql.execution.ui.SparkPlanGraphNode import org.apache.spark.sql.rapids.tool.store.{AccumManager, DataSourceRecord, SQLPlanModelManager, StageModel, StageModelManager, TaskModelManager} -import org.apache.spark.sql.rapids.tool.util.{EventUtils, RapidsToolsConfUtil, ToolsPlanGraph, UTF8Source} +import org.apache.spark.sql.rapids.tool.util.{EventUtils, RapidsToolsConfUtil, SparkRuntime, ToolsPlanGraph, UTF8Source} import org.apache.spark.util.Utils abstract class AppBase( val eventLogInfo: Option[EventLogInfo], - val hadoopConf: Option[Configuration]) extends Logging with ClusterTagPropHandler { + val hadoopConf: Option[Configuration], + val platform: Option[Platform] = None) extends Logging with ClusterTagPropHandler { var appMetaData: Option[AppMetaData] = None @@ -485,6 +486,27 @@ abstract class AppBase( processEventsInternal() postCompletion() } + + /** + * Returns the SparkRuntime environment in which the application is being executed. + * This is calculated based on other cached properties. + * + * If the platform is provided, and it does not support the parsed runtime, + * the method will log a warning and fall back to the platform’s default runtime. + */ + override def getSparkRuntime: SparkRuntime.SparkRuntime = { + val parsedRuntime = super.getSparkRuntime + platform.map { p => + if (p.isRuntimeSupported(parsedRuntime)) { + parsedRuntime + } else { + logWarning(s"Application $appId: Platform '${p.platformName}' does not support " + + s"the parsed runtime '$parsedRuntime'. Falling back to default runtime - " + + s"'${p.defaultRuntime}'.") + p.defaultRuntime + } + }.getOrElse(parsedRuntime) + } } object AppBase { diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala index 83a3cbc0b..6fbf2bb68 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.rapids.tool.profiling import scala.collection.Map -import com.nvidia.spark.rapids.tool.EventLogInfo +import com.nvidia.spark.rapids.tool.{EventLogInfo, Platform, PlatformFactory} import com.nvidia.spark.rapids.tool.analysis.AppSQLPlanAnalyzer import org.apache.hadoop.conf.Configuration @@ -184,8 +184,9 @@ object SparkPlanInfoWithStage { class ApplicationInfo( hadoopConf: Configuration, eLogInfo: EventLogInfo, - val index: Int) - extends AppBase(Some(eLogInfo), Some(hadoopConf)) with Logging { + val index: Int, + platform: Platform = PlatformFactory.createInstance()) + extends AppBase(Some(eLogInfo), Some(hadoopConf), Some(platform)) with Logging { private lazy val eventProcessor = new EventsProcessor(this) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index 887075c8c..d2ac79ea2 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -41,7 +41,7 @@ class QualificationAppInfo( mlOpsEnabled: Boolean = false, penalizeTransitions: Boolean = true, platform: Platform) - extends AppBase(eventLogInfo, hadoopConf) with Logging { + extends AppBase(eventLogInfo, hadoopConf, Some(platform)) with Logging { var lastJobEndTime: Option[Long] = None var lastSQLEndTime: Option[Long] = None diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/ToolTestUtils.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/ToolTestUtils.scala index bd5e7bf25..011e5010e 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/ToolTestUtils.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/ToolTestUtils.scala @@ -144,12 +144,13 @@ object ToolTestUtils extends Logging { val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = new ProfileArgs(logs) var index: Int = 1 + val platform = PlatformFactory.createInstance(appArgs.platform()) for (path <- appArgs.eventlog()) { val eventLogInfo = EventLogPathProcessor .getEventLogInfo(path, RapidsToolsConfUtil.newHadoopConf()) - assert(eventLogInfo.size >= 1, s"event log not parsed as expected $path") + assert(eventLogInfo.nonEmpty, s"event log not parsed as expected $path") apps += new ApplicationInfo(RapidsToolsConfUtil.newHadoopConf(), - eventLogInfo.head._1, index) + eventLogInfo.head._1, index, platform) index += 1 } apps diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala index de9921cec..ec8fb7ab8 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala @@ -22,7 +22,7 @@ import java.nio.file.{Files, Paths, StandardOpenOption} import scala.collection.mutable.ArrayBuffer -import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, StatusReportCounts, ToolTestUtils} +import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, PlatformNames, StatusReportCounts, ToolTestUtils} import com.nvidia.spark.rapids.tool.views.RawMetricProfilerView import org.apache.hadoop.io.IOUtils import org.scalatest.FunSuite @@ -1116,17 +1116,37 @@ class ApplicationInfoSuite extends FunSuite with Logging { } } - val sparkRuntimeTestCases: Seq[(SparkRuntime.Value, String)] = Seq( - SparkRuntime.SPARK -> s"$qualLogDir/nds_q86_test", - SparkRuntime.SPARK_RAPIDS -> s"$logDir/nds_q66_gpu.zstd", - SparkRuntime.PHOTON -> s"$qualLogDir/nds_q88_photon_db_13_3.zstd" + // scalastyle:off line.size.limit + val sparkRuntimeTestCases: Map[String, Seq[(String, SparkRuntime.Value)]] = Map( + // tests for standard Spark runtime + s"$qualLogDir/nds_q86_test" -> Seq( + (PlatformNames.DATABRICKS_AWS, SparkRuntime.SPARK), // Expected: SPARK on Databricks AWS + (PlatformNames.ONPREM, SparkRuntime.SPARK) // Expected: SPARK on Onprem + ), + // tests for Spark Rapids runtime + s"$logDir/nds_q66_gpu.zstd" -> Seq( + (PlatformNames.DATABRICKS_AWS, SparkRuntime.SPARK_RAPIDS), // Expected: SPARK_RAPIDS on Databricks AWS + (PlatformNames.ONPREM, SparkRuntime.SPARK_RAPIDS) // Expected: SPARK_RAPIDS on Onprem + ), + // tests for Photon runtime with fallback to SPARK for unsupported platforms + s"$qualLogDir/nds_q88_photon_db_13_3.zstd" -> Seq( + (PlatformNames.DATABRICKS_AWS, SparkRuntime.PHOTON), // Expected: PHOTON on Databricks AWS + (PlatformNames.DATABRICKS_AZURE, SparkRuntime.PHOTON), // Expected: PHOTON on Databricks Azure + (PlatformNames.ONPREM, SparkRuntime.SPARK), // Expected: Fallback to SPARK on Onprem + (PlatformNames.DATAPROC, SparkRuntime.SPARK) // Expected: Fallback to SPARK on Dataproc + ) ) - - sparkRuntimeTestCases.foreach { case (expectedSparkRuntime, eventLog) => - test(s"test spark runtime property for ${expectedSparkRuntime.toString} eventlog") { - val apps = ToolTestUtils.processProfileApps(Array(eventLog), sparkSession) - assert(apps.size == 1) - assert(apps.head.getSparkRuntime == expectedSparkRuntime) + // scalastyle:on line.size.limit + + sparkRuntimeTestCases.foreach { case (logPath, platformRuntimeCases) => + val baseFileName = logPath.split("/").last + platformRuntimeCases.foreach { case (platform, expectedRuntime) => + test(s"test eventlog $baseFileName on $platform has runtime: $expectedRuntime") { + val args = Array("--platform", platform, logPath) + val apps = ToolTestUtils.processProfileApps(args, sparkSession) + assert(apps.size == 1) + assert(apps.head.getSparkRuntime == expectedRuntime) + } } } } From 383b71c78de50b79742c164c9c67dd30121c4c50 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Wed, 13 Nov 2024 11:19:59 -0800 Subject: [PATCH 2/4] Refactor comments Signed-off-by: Partho Sarthi --- .../scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala index 1b7d1cced..8a00aea91 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala @@ -43,7 +43,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea private val outputCombined: Boolean = appArgs.combined() private val useAutoTuner: Boolean = appArgs.autoTuner() private val outputAlignedSQLIds: Boolean = appArgs.outputSqlIdsAligned() - // Unlike qualification tool, profiler tool does not platform per app + // Unlike qualification tool, profiler tool does not require platform per app private val platform: Platform = PlatformFactory.createInstance(appArgs.platform()) override def getNumThreads: Int = appArgs.numThreads.getOrElse( From c4b8a52898f8c4b5c05ccf627c4e0b1557922949 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Tue, 10 Dec 2024 15:45:36 -0800 Subject: [PATCH 3/4] Update behavior to fail on unsupported Spark Runtime Signed-off-by: Partho Sarthi --- .../spark/sql/rapids/tool/AppBase.scala | 30 ++++++++----------- .../spark/sql/rapids/tool/ToolUtils.scala | 9 +++++- .../tool/planparser/BasePlanParserSuite.scala | 7 +++-- .../planparser/PhotonPlanParserSuite.scala | 3 +- .../rapids/tool/profiling/AnalysisSuite.scala | 20 ++++++++----- .../tool/profiling/ApplicationInfoSuite.scala | 30 +++++++++++++++---- .../qualification/QualificationSuite.scala | 8 +++-- .../features/event_log_processing.feature | 14 +++++---- 8 files changed, 78 insertions(+), 43 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala index 9b28a773d..971a8711f 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala @@ -37,7 +37,7 @@ import org.apache.spark.scheduler.{SparkListenerEvent, StageInfo} import org.apache.spark.sql.execution.SparkPlanInfo import org.apache.spark.sql.execution.ui.SparkPlanGraphNode import org.apache.spark.sql.rapids.tool.store.{AccumManager, DataSourceRecord, SQLPlanModelManager, StageModel, StageModelManager, TaskModelManager} -import org.apache.spark.sql.rapids.tool.util.{EventUtils, RapidsToolsConfUtil, SparkRuntime, ToolsPlanGraph, UTF8Source} +import org.apache.spark.sql.rapids.tool.util.{EventUtils, RapidsToolsConfUtil, ToolsPlanGraph, UTF8Source} import org.apache.spark.util.Utils abstract class AppBase( @@ -482,6 +482,7 @@ abstract class AppBase( protected def postCompletion(): Unit = { registerAttemptId() calculateAppDuration() + validateSparkRuntime() } /** @@ -494,24 +495,17 @@ abstract class AppBase( } /** - * Returns the SparkRuntime environment in which the application is being executed. - * This is calculated based on other cached properties. - * - * If the platform is provided, and it does not support the parsed runtime, - * the method will log a warning and fall back to the platform’s default runtime. + * Validates if the spark runtime (parsed from event log) is supported by the platform. + * If the runtime is not supported, an `UnsupportedSparkRuntimeException` + * is thrown. */ - override def getSparkRuntime: SparkRuntime.SparkRuntime = { - val parsedRuntime = super.getSparkRuntime - platform.map { p => - if (p.isRuntimeSupported(parsedRuntime)) { - parsedRuntime - } else { - logWarning(s"Application $appId: Platform '${p.platformName}' does not support " + - s"the parsed runtime '$parsedRuntime'. Falling back to default runtime - " + - s"'${p.defaultRuntime}'.") - p.defaultRuntime - } - }.getOrElse(parsedRuntime) + private def validateSparkRuntime(): Unit = { + val parsedRuntime = getSparkRuntime + platform.foreach { p => + require(p.isRuntimeSupported(parsedRuntime), + throw UnsupportedSparkRuntimeException(p, parsedRuntime) + ) + } } } diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala index 021337495..455f19147 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.rapids.tool import scala.collection.mutable import scala.util.{Failure, Success, Try} +import com.nvidia.spark.rapids.tool.Platform import com.nvidia.spark.rapids.tool.planparser.SubqueryExecParser import com.nvidia.spark.rapids.tool.profiling.ProfileUtils.replaceDelimiter import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter @@ -28,7 +29,7 @@ import org.apache.spark.internal.{config, Logging} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.execution.SparkPlanInfo import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphNode} -import org.apache.spark.sql.rapids.tool.util.ToolsPlanGraph +import org.apache.spark.sql.rapids.tool.util.{SparkRuntime, ToolsPlanGraph} object ToolUtils extends Logging { // List of recommended file-encodings on the GPUs. @@ -441,6 +442,12 @@ case class UnsupportedMetricNameException(metricName: String) extends AppEventlogProcessException( s"Unsupported metric name found in the event log: $metricName") +case class UnsupportedSparkRuntimeException( + platform: Platform, + sparkRuntime: SparkRuntime.SparkRuntime) + extends AppEventlogProcessException( + s"Platform '${platform.platformName}' does not support the runtime '$sparkRuntime'") + // Class used a container to hold the information of the Tuple // to simplify arguments of methods and caching. case class SqlPlanInfoGraphEntry( diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/BasePlanParserSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/BasePlanParserSuite.scala index b7966d4d2..6fe3cd2cd 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/BasePlanParserSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/BasePlanParserSuite.scala @@ -17,7 +17,7 @@ package com.nvidia.spark.rapids.tool.planparser import com.nvidia.spark.rapids.BaseTestSuite -import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, PlatformFactory, ToolTestUtils} +import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, PlatformFactory, PlatformNames, ToolTestUtils} import com.nvidia.spark.rapids.tool.qualification._ import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo @@ -59,7 +59,8 @@ class BasePlanParserSuite extends BaseTestSuite { } } - def createAppFromEventlog(eventLog: String): QualificationAppInfo = { + def createAppFromEventlog(eventLog: String, + platformName: String = PlatformNames.DEFAULT): QualificationAppInfo = { val hadoopConf = RapidsToolsConfUtil.newHadoopConf() val (_, allEventLogs) = EventLogPathProcessor.processAllPaths( None, None, List(eventLog), hadoopConf) @@ -67,7 +68,7 @@ class BasePlanParserSuite extends BaseTestSuite { assert(allEventLogs.size == 1) val appResult = QualificationAppInfo.createApp(allEventLogs.head, hadoopConf, pluginTypeChecker, reportSqlLevel = false, mlOpsEnabled = false, penalizeTransitions = true, - PlatformFactory.createInstance()) + PlatformFactory.createInstance(platformName)) appResult match { case Right(app) => app case Left(_) => throw new AssertionError("Cannot create application") diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/PhotonPlanParserSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/PhotonPlanParserSuite.scala index edf8095bc..74f237178 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/PhotonPlanParserSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/PhotonPlanParserSuite.scala @@ -16,6 +16,7 @@ package com.nvidia.spark.rapids.tool.planparser +import com.nvidia.spark.rapids.tool.PlatformNames import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker @@ -34,7 +35,7 @@ class PhotonPlanParserSuite extends BasePlanParserSuite { test(s"$photonName is parsed as Spark $sparkName") { val eventLog = s"$qualLogDir/nds_q88_photon_db_13_3.zstd" val pluginTypeChecker = new PluginTypeChecker() - val app = createAppFromEventlog(eventLog) + val app = createAppFromEventlog(eventLog, platformName = PlatformNames.DATABRICKS_AWS) assert(app.sqlPlans.nonEmpty) val parsedPlans = app.sqlPlans.map { case (sqlID, plan) => SQLPlanParser.parseSQLPlan(app.appId, plan, sqlID, "", pluginTypeChecker, app) diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala index 2b8c3bf12..b7d8b315f 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids.tool.profiling import java.io.File -import com.nvidia.spark.rapids.tool.ToolTestUtils +import com.nvidia.spark.rapids.tool.{PlatformNames, ToolTestUtils} import com.nvidia.spark.rapids.tool.views.{ProfDataSourceView, RawMetricProfilerView} import org.scalatest.FunSuite @@ -139,7 +139,8 @@ class AnalysisSuite extends FunSuite { s"${fileName}_${metric}_metrics_agg_expectation.csv" } testSqlMetricsAggregation(Array(s"${qualLogDir}/${fileName}.zstd"), - expectFile("sql"), expectFile("job"), expectFile("stage")) + expectFile("sql"), expectFile("job"), expectFile("stage"), + platformName = PlatformNames.DATABRICKS_AWS) } test("test stage-level diagnostic aggregation simple") { @@ -163,8 +164,10 @@ class AnalysisSuite extends FunSuite { } private def testSqlMetricsAggregation(logs: Array[String], expectFileSQL: String, - expectFileJob: String, expectFileStage: String): Unit = { - val apps = ToolTestUtils.processProfileApps(logs, sparkSession) + expectFileJob: String, expectFileStage: String, + platformName: String = PlatformNames.DEFAULT): Unit = { + val args = Array("--platform", platformName) ++ logs + val apps = ToolTestUtils.processProfileApps(args, sparkSession) assert(apps.size == logs.size) val aggResults = RawMetricProfilerView.getAggMetrics(apps) import sparkSession.implicits._ @@ -256,9 +259,12 @@ class AnalysisSuite extends FunSuite { } test("test photon scan metrics") { - val fileName = "nds_q88_photon_db_13_3" - val logs = Array(s"${qualLogDir}/${fileName}.zstd") - val apps = ToolTestUtils.processProfileApps(logs, sparkSession) + val args = Array( + "--platform", + PlatformNames.DATABRICKS_AWS, + s"$qualLogDir/nds_q88_photon_db_13_3.zstd" + ) + val apps = ToolTestUtils.processProfileApps(args, sparkSession) val dataSourceResults = ProfDataSourceView.getRawView(apps) assert(dataSourceResults.exists(_.scan_time > 0)) } diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala index a7837e602..5218f4358 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala @@ -30,6 +30,7 @@ import org.scalatest.FunSuite import org.apache.spark.internal.Logging import org.apache.spark.resource.ResourceProfile import org.apache.spark.sql.{SparkSession, TrampolineUtil} +import org.apache.spark.sql.rapids.tool.UnsupportedSparkRuntimeException import org.apache.spark.sql.rapids.tool.profiling._ import org.apache.spark.sql.rapids.tool.util.{FSUtils, SparkRuntime} @@ -1117,7 +1118,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { } // scalastyle:off line.size.limit - val sparkRuntimeTestCases: Map[String, Seq[(String, SparkRuntime.Value)]] = Map( + val supportedSparkRuntimeTestCases: Map[String, Seq[(String, SparkRuntime.SparkRuntime)]] = Map( // tests for standard Spark runtime s"$qualLogDir/nds_q86_test" -> Seq( (PlatformNames.DATABRICKS_AWS, SparkRuntime.SPARK), // Expected: SPARK on Databricks AWS @@ -1132,16 +1133,14 @@ class ApplicationInfoSuite extends FunSuite with Logging { s"$qualLogDir/nds_q88_photon_db_13_3.zstd" -> Seq( (PlatformNames.DATABRICKS_AWS, SparkRuntime.PHOTON), // Expected: PHOTON on Databricks AWS (PlatformNames.DATABRICKS_AZURE, SparkRuntime.PHOTON), // Expected: PHOTON on Databricks Azure - (PlatformNames.ONPREM, SparkRuntime.SPARK), // Expected: Fallback to SPARK on Onprem - (PlatformNames.DATAPROC, SparkRuntime.SPARK) // Expected: Fallback to SPARK on Dataproc ) ) // scalastyle:on line.size.limit - sparkRuntimeTestCases.foreach { case (logPath, platformRuntimeCases) => + supportedSparkRuntimeTestCases.foreach { case (logPath, platformRuntimeCases) => val baseFileName = logPath.split("/").last platformRuntimeCases.foreach { case (platform, expectedRuntime) => - test(s"test eventlog $baseFileName on $platform has runtime: $expectedRuntime") { + test(s"test eventlog $baseFileName on $platform has supported runtime: $expectedRuntime") { val args = Array("--platform", platform, logPath) val apps = ToolTestUtils.processProfileApps(args, sparkSession) assert(apps.size == 1) @@ -1149,4 +1148,25 @@ class ApplicationInfoSuite extends FunSuite with Logging { } } } + + // scalastyle:off line.size.limit + val unsupportedSparkRuntimeTestCases: Map[String, Seq[String]] = Map( + s"$qualLogDir/nds_q88_photon_db_13_3.zstd" -> Seq( + PlatformNames.ONPREM, // Expected: PHOTON runtime on Onprem is not supported + PlatformNames.DATAPROC // Expected: PHOTON runtime on Dataproc is not supported + ) + ) + // scalastyle:on line.size.limit + + unsupportedSparkRuntimeTestCases.foreach { case (logPath, platformNames) => + val baseFileName = logPath.split("/").last + platformNames.foreach { platform => + test(s"test eventlog $baseFileName on $platform has unsupported runtime") { + val args = Array("--platform", platform, logPath) + intercept[UnsupportedSparkRuntimeException] { + ToolTestUtils.processProfileApps(args, sparkSession) + } + } + } + } } diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala index 03943d463..6de463db1 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala @@ -136,12 +136,15 @@ class QualificationSuite extends BaseTestSuite { } } - private def runQualificationTest(eventLogs: Array[String], expectFileName: String = "", + private def runQualificationTest(eventLogs: Array[String], + expectFileName: String = "", platformName: String = PlatformNames.DEFAULT, shouldReturnEmpty: Boolean = false, expectPerSqlFileName: Option[String] = None, expectedStatus: Option[StatusReportCounts] = None): Unit = { TrampolineUtil.withTempDir { outpath => val qualOutputPrefix = "rapids_4_spark_qualification_output" val outputArgs = Array( + "--platform", + platformName, "--output-directory", outpath.getAbsolutePath()) @@ -1762,7 +1765,8 @@ class QualificationSuite extends BaseTestSuite { val logFiles = Array(s"$logDir/nds_q88_photon_db_13_3.zstd") // photon event log // Status counts: 1 SUCCESS, 0 FAILURE, 0 SKIPPED, 0 UNKNOWN val expectedStatus = Some(StatusReportCounts(1, 0, 0, 0)) - runQualificationTest(logFiles, expectedStatus = expectedStatus) + runQualificationTest(logFiles, platformName = PlatformNames.DATABRICKS_AWS, + expectedStatus = expectedStatus) } test("process multiple attempts of the same app ID and skip lower attempts") { diff --git a/user_tools/tests/spark_rapids_tools_e2e/features/event_log_processing.feature b/user_tools/tests/spark_rapids_tools_e2e/features/event_log_processing.feature index cd66b0bb6..fc7ec2a52 100644 --- a/user_tools/tests/spark_rapids_tools_e2e/features/event_log_processing.feature +++ b/user_tools/tests/spark_rapids_tools_e2e/features/event_log_processing.feature @@ -16,6 +16,7 @@ Feature: Event Log Processing @test_id_ELP_0001 Scenario Outline: Tool spark_rapids runs with different types of event logs + Given platform is "" When spark-rapids tool is executed with "" eventlogs Then stderr contains the following """ @@ -25,12 +26,13 @@ Feature: Event Log Processing And return code is "0" Examples: - | event_logs | expected_stderr | processed_apps_count | - | invalid_path_eventlog | process.failure.count = 1;invalid_path_eventlog not found, skipping! | 0 | - | gpu_eventlog.zstd | process.skipped.count = 1;GpuEventLogException: Cannot parse event logs from GPU run: skipping this file | 0 | - | photon_eventlog.zstd | process.success.count = 1; | 1 | - | streaming_eventlog.zstd | process.skipped.count = 1;StreamingEventLogException: Encountered Spark Structured Streaming Job: skipping this file! | 0 | - | incorrect_app_status_eventlog.zstd | process.NA.count = 1;IncorrectAppStatusException: Application status is incorrect. Missing AppInfo | 0 | + | platform | event_logs | expected_stderr | processed_apps_count | + | onprem | invalid_path_eventlog | process.failure.count = 1;invalid_path_eventlog not found, skipping! | 0 | + | onprem | gpu_eventlog.zstd | process.skipped.count = 1;GpuEventLogException: Cannot parse event logs from GPU run: skipping this file | 0 | + | onprem | streaming_eventlog.zstd | process.skipped.count = 1;StreamingEventLogException: Encountered Spark Structured Streaming Job: skipping this file! | 0 | + | onprem | incorrect_app_status_eventlog.zstd | process.NA.count = 1;IncorrectAppStatusException: Application status is incorrect. Missing AppInfo | 0 | + | onprem | photon_eventlog.zstd | process.skipped.count = 1;UnsupportedSparkRuntimeException: Platform 'onprem' does not support the runtime 'PHOTON' | 0 | + | databricks-aws | photon_eventlog.zstd | process.success.count = 1; | 1 | @test_id_ELP_0002 Scenario: Qualification tool JAR crashes From 4379878fdf5725914a7ebb16dfcf6aed45c95b75 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Tue, 10 Dec 2024 16:09:53 -0800 Subject: [PATCH 4/4] Fix trailing comma Signed-off-by: Partho Sarthi --- .../spark/rapids/tool/profiling/ApplicationInfoSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala index 5218f4358..1d40472c9 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala @@ -1132,7 +1132,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { // tests for Photon runtime with fallback to SPARK for unsupported platforms s"$qualLogDir/nds_q88_photon_db_13_3.zstd" -> Seq( (PlatformNames.DATABRICKS_AWS, SparkRuntime.PHOTON), // Expected: PHOTON on Databricks AWS - (PlatformNames.DATABRICKS_AZURE, SparkRuntime.PHOTON), // Expected: PHOTON on Databricks Azure + (PlatformNames.DATABRICKS_AZURE, SparkRuntime.PHOTON) // Expected: PHOTON on Databricks Azure ) ) // scalastyle:on line.size.limit