Skip to content

Commit

Permalink
Add sparkRuntime property to capture runtime type in application_info…
Browse files Browse the repository at this point in the history
…rmation (#1414)

* Store spark runtime in app info csv

Signed-off-by: Partho Sarthi <[email protected]>

* Address review comments & convert variable to getter

Signed-off-by: Partho Sarthi <[email protected]>

---------

Signed-off-by: Partho Sarthi <[email protected]>
  • Loading branch information
parthosa authored Nov 8, 2024
1 parent 019ede2 commit 4e783d9
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import scala.collection.Map

import org.apache.spark.resource.{ExecutorResourceRequest, TaskResourceRequest}
import org.apache.spark.sql.rapids.tool.store.AccumMetaRef
import org.apache.spark.sql.rapids.tool.util.StringUtils
import org.apache.spark.sql.rapids.tool.util.{SparkRuntime, StringUtils}

/**
* This is a warehouse to store all Classes
Expand Down Expand Up @@ -292,11 +292,11 @@ case class UnsupportedOpsProfileResult(appIndex: Int,
case class AppInfoProfileResults(appIndex: Int, appName: String,
appId: Option[String], sparkUser: String,
startTime: Long, endTime: Option[Long], duration: Option[Long],
durationStr: String, sparkVersion: String,
durationStr: String, sparkRuntime: SparkRuntime.SparkRuntime, sparkVersion: String,
pluginEnabled: Boolean) extends ProfileResult {
override val outputHeaders = Seq("appIndex", "appName", "appId",
"sparkUser", "startTime", "endTime", "duration", "durationStr",
"sparkVersion", "pluginEnabled")
"sparkRuntime", "sparkVersion", "pluginEnabled")

def endTimeToStr: String = {
endTime match {
Expand All @@ -315,12 +315,13 @@ case class AppInfoProfileResults(appIndex: Int, appName: String,
override def convertToSeq: Seq[String] = {
Seq(appIndex.toString, appName, appId.getOrElse(""),
sparkUser, startTime.toString, endTimeToStr, durToStr,
durationStr, sparkVersion, pluginEnabled.toString)
durationStr, sparkRuntime.toString, sparkVersion, pluginEnabled.toString)
}
override def convertToCSVSeq: Seq[String] = {
Seq(appIndex.toString, StringUtils.reformatCSVString(appName),
StringUtils.reformatCSVString(appId.getOrElse("")), StringUtils.reformatCSVString(sparkUser),
startTime.toString, endTimeToStr, durToStr, StringUtils.reformatCSVString(durationStr),
StringUtils.reformatCSVString(sparkRuntime.toString),
StringUtils.reformatCSVString(sparkVersion), pluginEnabled.toString)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ trait AppInformationViewTrait extends ViewableTrait[AppInfoProfileResults] {
app.appMetaData.map { a =>
AppInfoProfileResults(index, a.appName, a.appId,
a.sparkUser, a.startTime, a.endTime, app.getAppDuration,
a.getDurationString, app.sparkVersion, app.gpuMode)
a.getDurationString, app.getSparkRuntime, app.sparkVersion, app.gpuMode)
}.toSeq
}
override def sortView(rows: Seq[AppInfoProfileResults]): Seq[AppInfoProfileResults] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ trait ClusterTagPropHandler extends CacheablePropsHandler {
var clusterTagClusterId: String = ""
var clusterTagClusterName: String = ""

// A flag to indicate whether the eventlog being processed is an eventlog from Photon.
var isPhoton = false

// Flag used to indicate that the App was a Databricks App.
def isDatabricks: Boolean = {
clusterTags.nonEmpty && clusterTagClusterId.nonEmpty && clusterTagClusterName.nonEmpty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,46 @@ import org.apache.spark.scheduler.{SparkListenerEnvironmentUpdate, SparkListener
import org.apache.spark.sql.rapids.tool.AppEventlogProcessException
import org.apache.spark.util.Utils.REDACTION_REPLACEMENT_TEXT


/**
* SparkRuntime enumeration is used to identify the specific runtime environment
* in which the application is being executed.
*/
object SparkRuntime extends Enumeration {
type SparkRuntime = Value

/**
* Represents the default Apache Spark runtime environment.
*/
val SPARK: SparkRuntime = Value

/**
* Represents the Spark RAPIDS runtime environment.
*/
val SPARK_RAPIDS: SparkRuntime = Value

/**
* Represents the Photon runtime environment on Databricks.
*/
val PHOTON: SparkRuntime = Value

/**
* Returns the SparkRuntime value based on the given parameters.
* @param isPhoton Boolean flag indicating whether the application is running on Photon.
* @param isGpu Boolean flag indicating whether the application is running on GPU.
* @return
*/
def getRuntime(isPhoton: Boolean, isGpu: Boolean): SparkRuntime.SparkRuntime = {
if (isPhoton) {
PHOTON
} else if (isGpu) {
SPARK_RAPIDS
} else {
SPARK
}
}
}

// Handles updating and caching Spark Properties for a Spark application.
// Properties stored in this container can be accessed to make decision about certain analysis
// that depends on the context of the Spark properties.
Expand Down Expand Up @@ -68,10 +108,13 @@ trait CacheablePropsHandler {

// caches the spark-version from the eventlogs
var sparkVersion: String = ""
// A flag to indicate whether the eventlog is an eventlog with Spark RAPIDS runtime.
var gpuMode = false
// A flag to indicate whether the eventlog is an eventlog from Photon runtime.
var isPhoton = false
// A flag whether hive is enabled or not. Note that we assume that the
// property is global to the entire application once it is set. a.k.a, it cannot be disabled
// once it is was set to true.
// once it was set to true.
var hiveEnabled = false
// Indicates the ML eventlogType (i.e., Scala or pyspark). It is set only when MLOps are detected.
// By default, it is empty.
Expand Down Expand Up @@ -132,4 +175,12 @@ trait CacheablePropsHandler {
def isGPUModeEnabledForJob(event: SparkListenerJobStart): Boolean = {
gpuMode || ProfileUtils.isPluginEnabled(event.properties.asScala)
}

/**
* Returns the SparkRuntime environment in which the application is being executed.
* This is calculated based on other cached properties.
*/
def getSparkRuntime: SparkRuntime.SparkRuntime = {
SparkRuntime.getRuntime(isPhoton, gpuMode)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.sql.{SparkSession, TrampolineUtil}
import org.apache.spark.sql.rapids.tool.profiling._
import org.apache.spark.sql.rapids.tool.util.FSUtils
import org.apache.spark.sql.rapids.tool.util.{FSUtils, SparkRuntime}

class ApplicationInfoSuite extends FunSuite with Logging {

Expand Down Expand Up @@ -1115,4 +1115,18 @@ class ApplicationInfoSuite extends FunSuite with Logging {
assert(actualResult == expectedResult)
}
}

val sparkRuntimeTestCases: Seq[(SparkRuntime.Value, String)] = Seq(
SparkRuntime.SPARK -> s"$qualLogDir/nds_q86_test",
SparkRuntime.SPARK_RAPIDS -> s"$logDir/nds_q66_gpu.zstd",
SparkRuntime.PHOTON -> s"$qualLogDir/nds_q88_photon_db_13_3.zstd"
)

sparkRuntimeTestCases.foreach { case (expectedSparkRuntime, eventLog) =>
test(s"test spark runtime property for ${expectedSparkRuntime.toString} eventlog") {
val apps = ToolTestUtils.processProfileApps(Array(eventLog), sparkSession)
assert(apps.size == 1)
assert(apps.head.getSparkRuntime == expectedSparkRuntime)
}
}
}

0 comments on commit 4e783d9

Please sign in to comment.