Skip to content

Commit

Permalink
Merge branch 'dev' into spark-rapids-tools-1399
Browse files Browse the repository at this point in the history
  • Loading branch information
parthosa authored Dec 20, 2024
2 parents b4c84c0 + 7308c12 commit c802031
Show file tree
Hide file tree
Showing 29 changed files with 930 additions and 329 deletions.
19 changes: 18 additions & 1 deletion core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.nvidia.spark.rapids.tool.tuning.ClusterProperties

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.{ExistingClusterInfo, RecommendedClusterInfo}
import org.apache.spark.sql.rapids.tool.util.StringUtils
import org.apache.spark.sql.rapids.tool.util.{SparkRuntime, StringUtils}

/**
* Utility object containing constants for various platform names.
Expand Down Expand Up @@ -132,6 +132,19 @@ abstract class Platform(var gpuDevice: Option[GpuDevice],
var recommendedClusterInfo: Option[RecommendedClusterInfo] = None
// the number of GPUs to use, this might be updated as we handle different cases
var numGpus: Int = 1
// Default runtime for the platform
val defaultRuntime: SparkRuntime.SparkRuntime = SparkRuntime.SPARK
// Set of supported runtimes for the platform
protected val supportedRuntimes: Set[SparkRuntime.SparkRuntime] = Set(
SparkRuntime.SPARK, SparkRuntime.SPARK_RAPIDS
)

/**
* Checks if the given runtime is supported by the platform.
*/
def isRuntimeSupported(runtime: SparkRuntime.SparkRuntime): Boolean = {
supportedRuntimes.contains(runtime)
}

// This function allow us to have one gpu type used by the auto
// tuner recommendations but have a different GPU used for speedup
Expand Down Expand Up @@ -511,6 +524,10 @@ abstract class DatabricksPlatform(gpuDevice: Option[GpuDevice],
override val defaultGpuDevice: GpuDevice = T4Gpu
override def isPlatformCSP: Boolean = true

override val supportedRuntimes: Set[SparkRuntime.SparkRuntime] = Set(
SparkRuntime.SPARK, SparkRuntime.SPARK_RAPIDS, SparkRuntime.PHOTON
)

// note that Databricks generally sets the spark.executor.memory for the user. Our
// auto tuner heuristics generally sets it lower then Databricks so go ahead and
// allow our auto tuner to take affect for this in anticipation that we will use more
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.nvidia.spark.rapids.tool.analysis

import scala.collection.breakOut
import scala.collection.mutable.{AbstractSet, ArrayBuffer, HashMap, LinkedHashSet}

import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults}
Expand Down Expand Up @@ -265,7 +266,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
val jobsWithSQL = app.jobIdToInfo.filter { case (_, j) =>
j.sqlID.nonEmpty
}
val sqlToStages = jobsWithSQL.flatMap { case (jobId, j) =>
jobsWithSQL.flatMap { case (jobId, j) =>
val stages = j.stageIds
val stagesInJob = app.stageManager.getStagesByIds(stages)
stagesInJob.map { sModel =>
Expand All @@ -283,8 +284,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
SQLStageInfoProfileResult(appIndex, j.sqlID.get, jobId, sModel.stageInfo.stageId,
sModel.stageInfo.attemptNumber(), sModel.duration, nodeNames)
}
}
sqlToStages.toSeq
}(breakOut)
}

def generateSQLAccums(): Seq[SQLAccumProfileResults] = {
Expand All @@ -294,20 +294,11 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
val driverAccumsOpt = app.driverAccumMap.get(metric.accumulatorId)
val driverMax = driverAccumsOpt match {
case Some(accums) =>
val filtered = accums.filter { a =>
a.sqlID == metric.sqlID
}
val accumValues = filtered.map(_.value).sortWith(_ < _)
if (accumValues.isEmpty) {
None
} else if (accumValues.length <= 1) {
Some(StatisticsMetrics(0L, 0L, 0L, accumValues.sum))
} else {
Some(StatisticsMetrics(accumValues(0), accumValues(accumValues.size / 2),
accumValues(accumValues.size - 1), accumValues.sum))
}
case None =>
None
StatisticsMetrics.createOptionalFromArr(accums.collect {
case a if a.sqlID == metric.sqlID =>
a.value
}(breakOut))
case _ => None
}

if (accumTaskStats.isDefined || driverMax.isDefined) {
Expand All @@ -325,7 +316,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
} else {
None
}
}
}(breakOut)
}

/**
Expand All @@ -341,40 +332,31 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
def generateStageLevelAccums(): Seq[AccumProfileResults] = {
app.accumManager.accumInfoMap.flatMap { accumMapEntry =>
val accumInfo = accumMapEntry._2
accumInfo.stageValuesMap.keySet.flatMap( stageId => {
val stageTaskIds = app.taskManager.getAllTasksStageAttempt(stageId).map(_.taskId).toSet
// get the task updates that belong to that stage
val taskUpatesSubset =
accumInfo.taskUpdatesMap.filterKeys(stageTaskIds.contains).values.toSeq.sorted
if (taskUpatesSubset.isEmpty) {
None
} else {
val min = taskUpatesSubset.head
val max = taskUpatesSubset.last
val sum = taskUpatesSubset.sum
val median = if (taskUpatesSubset.size % 2 == 0) {
val mid = taskUpatesSubset.size / 2
(taskUpatesSubset(mid) + taskUpatesSubset(mid - 1)) / 2
} else {
taskUpatesSubset(taskUpatesSubset.size / 2)
}
// reuse AccumProfileResults to avoid generating extra memory from allocating new objects
val accumProfileResults = AccumProfileResults(
appIndex,
stageId,
accumInfo.infoRef,
min = min,
median = median,
max = max,
total = sum)
if (accumInfo.infoRef.name.isDiagnosticMetrics()) {
updateStageDiagnosticMetrics(accumProfileResults)
}
Some(accumProfileResults)
accumInfo.stageValuesMap.keys.flatMap( stageId => {
val stageTaskIds: Set[Long] =
app.taskManager.getAllTasksStageAttempt(stageId).map(_.taskId)(breakOut)
// Get the task updates that belong to that stage
StatisticsMetrics.createOptionalFromArr(
accumInfo.taskUpdatesMap.filterKeys(stageTaskIds).map(_._2)(breakOut)) match {
case Some(stat) =>
// Reuse AccumProfileResults to avoid generating allocating new objects
val accumProfileResults = AccumProfileResults(
appIndex,
stageId,
accumInfo.infoRef,
min = stat.min,
median = stat.med,
max = stat.max,
total = stat.total)
if (accumInfo.infoRef.name.isDiagnosticMetrics()) {
updateStageDiagnosticMetrics(accumProfileResults)
}
Some(accumProfileResults)
case _ => None
}
})
}
}.toSeq
}(breakOut)
}
}

object AppSQLPlanAnalyzer {
Expand Down
Loading

0 comments on commit c802031

Please sign in to comment.