Skip to content

Commit

Permalink
Add test and address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Kuhu Shukla <[email protected]>
  • Loading branch information
kuhushukla committed Dec 26, 2023
1 parent e8f3788 commit 39fbf94
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.util.matching.Regex

import com.nvidia.spark.rapids.tool.{Platform, PlatformFactory}
import com.nvidia.spark.rapids.tool.{Platform, PlatformFactory, PlatformNames}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}
import org.yaml.snakeyaml.{DumperOptions, LoaderOptions, Yaml}
Expand Down Expand Up @@ -591,20 +591,8 @@ class AutoTuner(
}

def calculateJobLevelRecommendations(): Unit = {
val shuffleManagerVersion = appInfoProvider.getSparkVersion.get.filterNot("().".toSet)
val finalShuffleVersion = if (platform.contains("databricks")) {
val dbVersion = appInfoProvider.getProperty(
"spark.databricks.clusterUsageTags.sparkVersion").getOrElse("")
if (dbVersion.contains("10.4")) {
"321db"
} else if (dbVersion.contains("11.3")) {
"330db"
} else {
"332db"
}
} else shuffleManagerVersion
appendRecommendation("spark.shuffle.manager",
"com.nvidia.spark.rapids.spark" + finalShuffleVersion + ".RapidsShuffleManager")
val smClassName = getShuffleManagerClassName
appendRecommendation("spark.shuffle.manager", smClassName)
appendComment(classPathComments("rapids.shuffle.jars"))

recommendFileCache()
Expand All @@ -614,6 +602,21 @@ class AutoTuner(
recommendClassPathEntries()
}

def getShuffleManagerClassName() : String = {
val shuffleManagerVersion = appInfoProvider.getSparkVersion.get.filterNot("().".toSet)
val finalShuffleVersion : String = if (platform.getName == PlatformNames.DATABRICKS_AWS
|| platform.getName == PlatformNames.DATABRICKS_AZURE) {
val dbVersion = appInfoProvider.getProperty(
"spark.databricks.clusterUsageTags.sparkVersion").getOrElse("")
dbVersion match {
case ver if ver.contains("10.4") => "321db"
case ver if ver.contains("11.3") => "330db"
case _ => "332db"
}
} else shuffleManagerVersion
"com.nvidia.spark.rapids.spark" + finalShuffleVersion + ".RapidsShuffleManager"
}

/**
* Checks whether the cluster properties are valid.
* If the cluster worker-info is missing entries (i.e., CPU and GPU count), it sets the entries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,21 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
}

private def buildWorkerInfoAsString(
customProps: Option[mutable.Map[String, String]] = None,
numCores: Option[Int] = Some(32),
systemMemory: Option[String] = Some("122880MiB"),
numWorkers: Option[Int] = Some(4),
gpuCount: Option[Int] = Some(2),
gpuMemory: Option[String] = Some("15109MiB"),
gpuDevice: Option[String] = Some("T4")): String = {
customProps: Option[mutable.Map[String, String]] = None,
numCores: Option[Int] = Some(32),
systemMemory: Option[String] = Some("122880MiB"),
numWorkers: Option[Int] = Some(4),
gpuCount: Option[Int] = Some(2),
gpuMemory: Option[String] = Some("15109MiB"),
gpuDevice: Option[String] = Some("T4")): String = {
val gpuWorkerProps = new GpuWorkerProps(
gpuMemory.getOrElse(""), gpuCount.getOrElse(0), gpuDevice.getOrElse(""))
val cpuSystem = new SystemClusterProps(
numCores.getOrElse(0), systemMemory.getOrElse(""), numWorkers.getOrElse(0))
val systemProperties = customProps match {
case None => mutable.Map[String, String]()
case Some(newProps) => newProps
}
}
val convertedMap = new util.LinkedHashMap[String, String](systemProperties.asJava)
val clusterProps = new ClusterProperties(cpuSystem, gpuWorkerProps, convertedMap)
// set the options to convert the object into formatted yaml content
Expand Down Expand Up @@ -117,13 +117,13 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
}

private def getMockInfoProvider(maxInput: Double,
spilledMetrics: Seq[Long],
jvmGCFractions: Seq[Double],
propsFromLog: mutable.Map[String, String],
sparkVersion: Option[String],
rapidsJars: Seq[String] = Seq(),
distinctLocationPct: Double = 0.0,
redundantReadSize: Long = 0): AppSummaryInfoBaseProvider = {
spilledMetrics: Seq[Long],
jvmGCFractions: Seq[Double],
propsFromLog: mutable.Map[String, String],
sparkVersion: Option[String],
rapidsJars: Seq[String] = Seq(),
distinctLocationPct: Double = 0.0,
redundantReadSize: Long = 0): AppSummaryInfoBaseProvider = {
new AppInfoProviderMockTest(maxInput, spilledMetrics, jvmGCFractions, propsFromLog,
sparkVersion, rapidsJars, distinctLocationPct, redundantReadSize)
}
Expand Down Expand Up @@ -654,7 +654,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
assert(expectedResults == autoTunerOutput)
}

test("test AutoTuner with empty sparkProperties" ) {
test("test AutoTuner with empty sparkProperties") {
val dataprocWorkerInfo = buildWorkerInfoAsString(None)
val expectedResults =
s"""|
Expand Down Expand Up @@ -1090,19 +1090,19 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
// 1. The Autotuner should warn the users that they have multiple jars defined in the classPath
// 2. Compare the output
val expectedResults =
s"""|
s"""|
|Spark Properties:
|--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m
|--conf spark.sql.adaptive.coalescePartitions.minPartitionNum=128
|--conf spark.sql.shuffle.partitions=200
|
|Comments:
|- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set.
|- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set.
|- 'spark.sql.shuffle.partitions' was not set.
|- ${AutoTuner.classPathComments("rapids.jars.multiple")} [23.06.0, 23.02.1]
|- ${AutoTuner.classPathComments("rapids.shuffle.jars")}
|""".stripMargin
|--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m
|--conf spark.sql.adaptive.coalescePartitions.minPartitionNum=128
|--conf spark.sql.shuffle.partitions=200
|
|Comments:
|- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set.
|- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set.
|- 'spark.sql.shuffle.partitions' was not set.
|- ${AutoTuner.classPathComments("rapids.jars.multiple")} [23.06.0, 23.02.1]
|- ${AutoTuner.classPathComments("rapids.shuffle.jars")}
|""".stripMargin
val rapidsJarsArr = Seq("rapids-4-spark_2.12-23.06.0-SNAPSHOT.jar",
"rapids-4-spark_2.12-23.02.1.jar")
val autoTunerOutput = generateRecommendationsForRapidsJars(rapidsJarsArr)
Expand Down Expand Up @@ -1507,4 +1507,20 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
}

test("test shuffle manager version for databricks") {
val customProps = mutable.LinkedHashMap(
"spark.databricks.clusterUsageTags.sparkVersion" -> "11.3.x-gpu-ml-scala2.12")
val databricksWorkerInfo = buildWorkerInfoAsString(Some(customProps))
val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0),
mutable.Map("spark.rapids.sql.enabled" -> "true",
"spark.plugins" -> "com.nvidia.spark.AnotherPlugin, com.nvidia.spark.SQLPlugin",
"spark.databricks.clusterUsageTags.sparkVersion" -> "11.3.x-gpu-ml-scala2.12"),
Some("3.3.0"), Seq())
val autoTuner = AutoTuner.buildAutoTunerFromProps(databricksWorkerInfo,
infoProvider, PlatformFactory.createInstance(PlatformNames.DATABRICKS_AZURE))
val smVersion = autoTuner.getShuffleManagerClassName()
// Assert shuffle manager string for DB 11.3 tag
assert(smVersion == "com.nvidia.spark.rapids.spark330db.RapidsShuffleManager")
}
}

0 comments on commit 39fbf94

Please sign in to comment.