From 4e783d91c4def697f18c6601fb5a96761a25c087 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Fri, 8 Nov 2024 11:30:05 -0800 Subject: [PATCH] Add sparkRuntime property to capture runtime type in application_information (#1414) * Store spark runtime in app info csv Signed-off-by: Partho Sarthi * Address review comments & convert variable to getter Signed-off-by: Partho Sarthi --------- Signed-off-by: Partho Sarthi --- .../profiling/ProfileClassWarehouse.scala | 9 ++-- .../rapids/tool/views/InformationView.scala | 2 +- .../rapids/tool/ClusterTagPropHandler.scala | 3 -- .../tool/util/CacheablePropsHandler.scala | 53 ++++++++++++++++++- .../tool/profiling/ApplicationInfoSuite.scala | 16 +++++- 5 files changed, 73 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala index b98d36aab..24b6cbd73 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala @@ -20,7 +20,7 @@ import scala.collection.Map import org.apache.spark.resource.{ExecutorResourceRequest, TaskResourceRequest} import org.apache.spark.sql.rapids.tool.store.AccumMetaRef -import org.apache.spark.sql.rapids.tool.util.StringUtils +import org.apache.spark.sql.rapids.tool.util.{SparkRuntime, StringUtils} /** * This is a warehouse to store all Classes @@ -292,11 +292,11 @@ case class UnsupportedOpsProfileResult(appIndex: Int, case class AppInfoProfileResults(appIndex: Int, appName: String, appId: Option[String], sparkUser: String, startTime: Long, endTime: Option[Long], duration: Option[Long], - durationStr: String, sparkVersion: String, + durationStr: String, sparkRuntime: SparkRuntime.SparkRuntime, sparkVersion: String, pluginEnabled: Boolean) extends ProfileResult { override val outputHeaders = Seq("appIndex", "appName", "appId", "sparkUser", "startTime", "endTime", "duration", "durationStr", - "sparkVersion", "pluginEnabled") + "sparkRuntime", "sparkVersion", "pluginEnabled") def endTimeToStr: String = { endTime match { @@ -315,12 +315,13 @@ case class AppInfoProfileResults(appIndex: Int, appName: String, override def convertToSeq: Seq[String] = { Seq(appIndex.toString, appName, appId.getOrElse(""), sparkUser, startTime.toString, endTimeToStr, durToStr, - durationStr, sparkVersion, pluginEnabled.toString) + durationStr, sparkRuntime.toString, sparkVersion, pluginEnabled.toString) } override def convertToCSVSeq: Seq[String] = { Seq(appIndex.toString, StringUtils.reformatCSVString(appName), StringUtils.reformatCSVString(appId.getOrElse("")), StringUtils.reformatCSVString(sparkUser), startTime.toString, endTimeToStr, durToStr, StringUtils.reformatCSVString(durationStr), + StringUtils.reformatCSVString(sparkRuntime.toString), StringUtils.reformatCSVString(sparkVersion), pluginEnabled.toString) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/InformationView.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/InformationView.scala index 1da665b1f..7a7484c38 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/InformationView.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/InformationView.scala @@ -29,7 +29,7 @@ trait AppInformationViewTrait extends ViewableTrait[AppInfoProfileResults] { app.appMetaData.map { a => AppInfoProfileResults(index, a.appName, a.appId, a.sparkUser, a.startTime, a.endTime, app.getAppDuration, - a.getDurationString, app.sparkVersion, app.gpuMode) + a.getDurationString, app.getSparkRuntime, app.sparkVersion, app.gpuMode) }.toSeq } override def sortView(rows: Seq[AppInfoProfileResults]): Seq[AppInfoProfileResults] = { diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ClusterTagPropHandler.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ClusterTagPropHandler.scala index 4377ef0e5..aebab1082 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ClusterTagPropHandler.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ClusterTagPropHandler.scala @@ -28,9 +28,6 @@ trait ClusterTagPropHandler extends CacheablePropsHandler { var clusterTagClusterId: String = "" var clusterTagClusterName: String = "" - // A flag to indicate whether the eventlog being processed is an eventlog from Photon. - var isPhoton = false - // Flag used to indicate that the App was a Databricks App. def isDatabricks: Boolean = { clusterTags.nonEmpty && clusterTagClusterId.nonEmpty && clusterTagClusterName.nonEmpty diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/CacheablePropsHandler.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/CacheablePropsHandler.scala index bbdffc1d7..50b6763ff 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/CacheablePropsHandler.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/CacheablePropsHandler.scala @@ -25,6 +25,46 @@ import org.apache.spark.scheduler.{SparkListenerEnvironmentUpdate, SparkListener import org.apache.spark.sql.rapids.tool.AppEventlogProcessException import org.apache.spark.util.Utils.REDACTION_REPLACEMENT_TEXT + +/** + * SparkRuntime enumeration is used to identify the specific runtime environment + * in which the application is being executed. + */ +object SparkRuntime extends Enumeration { + type SparkRuntime = Value + + /** + * Represents the default Apache Spark runtime environment. + */ + val SPARK: SparkRuntime = Value + + /** + * Represents the Spark RAPIDS runtime environment. + */ + val SPARK_RAPIDS: SparkRuntime = Value + + /** + * Represents the Photon runtime environment on Databricks. + */ + val PHOTON: SparkRuntime = Value + + /** + * Returns the SparkRuntime value based on the given parameters. + * @param isPhoton Boolean flag indicating whether the application is running on Photon. + * @param isGpu Boolean flag indicating whether the application is running on GPU. + * @return + */ + def getRuntime(isPhoton: Boolean, isGpu: Boolean): SparkRuntime.SparkRuntime = { + if (isPhoton) { + PHOTON + } else if (isGpu) { + SPARK_RAPIDS + } else { + SPARK + } + } +} + // Handles updating and caching Spark Properties for a Spark application. // Properties stored in this container can be accessed to make decision about certain analysis // that depends on the context of the Spark properties. @@ -68,10 +108,13 @@ trait CacheablePropsHandler { // caches the spark-version from the eventlogs var sparkVersion: String = "" + // A flag to indicate whether the eventlog is an eventlog with Spark RAPIDS runtime. var gpuMode = false + // A flag to indicate whether the eventlog is an eventlog from Photon runtime. + var isPhoton = false // A flag whether hive is enabled or not. Note that we assume that the // property is global to the entire application once it is set. a.k.a, it cannot be disabled - // once it is was set to true. + // once it was set to true. var hiveEnabled = false // Indicates the ML eventlogType (i.e., Scala or pyspark). It is set only when MLOps are detected. // By default, it is empty. @@ -132,4 +175,12 @@ trait CacheablePropsHandler { def isGPUModeEnabledForJob(event: SparkListenerJobStart): Boolean = { gpuMode || ProfileUtils.isPluginEnabled(event.properties.asScala) } + + /** + * Returns the SparkRuntime environment in which the application is being executed. + * This is calculated based on other cached properties. + */ + def getSparkRuntime: SparkRuntime.SparkRuntime = { + SparkRuntime.getRuntime(isPhoton, gpuMode) + } } diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala index 75cd86df5..de9921cec 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.resource.ResourceProfile import org.apache.spark.sql.{SparkSession, TrampolineUtil} import org.apache.spark.sql.rapids.tool.profiling._ -import org.apache.spark.sql.rapids.tool.util.FSUtils +import org.apache.spark.sql.rapids.tool.util.{FSUtils, SparkRuntime} class ApplicationInfoSuite extends FunSuite with Logging { @@ -1115,4 +1115,18 @@ class ApplicationInfoSuite extends FunSuite with Logging { assert(actualResult == expectedResult) } } + + val sparkRuntimeTestCases: Seq[(SparkRuntime.Value, String)] = Seq( + SparkRuntime.SPARK -> s"$qualLogDir/nds_q86_test", + SparkRuntime.SPARK_RAPIDS -> s"$logDir/nds_q66_gpu.zstd", + SparkRuntime.PHOTON -> s"$qualLogDir/nds_q88_photon_db_13_3.zstd" + ) + + sparkRuntimeTestCases.foreach { case (expectedSparkRuntime, eventLog) => + test(s"test spark runtime property for ${expectedSparkRuntime.toString} eventlog") { + val apps = ToolTestUtils.processProfileApps(Array(eventLog), sparkSession) + assert(apps.size == 1) + assert(apps.head.getSparkRuntime == expectedSparkRuntime) + } + } }