Skip to content

Commit

Permalink
Adding EMR-specific tunings for shuffle manager and ignoring Spark RA…
Browse files Browse the repository at this point in the history
…PIDS jar

Signed-off-by: Matt Ahrens <[email protected]>
  • Loading branch information
mattahrens committed Nov 12, 2024
1 parent 4e783d9 commit a7f27d1
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.mutable.ListBuffer
import scala.util.control.NonFatal
import scala.util.matching.Regex

import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, GpuDevice, Platform, PlatformFactory}
import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, GpuDevice, Platform, PlatformFactory, PlatformNames}
import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}
Expand Down Expand Up @@ -757,6 +757,14 @@ class AutoTuner(
case ver if ver.contains("11.3") => "330db"
case _ => "332db"
}
} else if (sparkVersion.contains("amzn")) {
sparkVersion match {
case ver if ver.contains("3.5.1") => "351"
case ver if ver.contains("3.5.0") => "350"
case ver if ver.contains("3.4.1") => "341"
case ver if ver.contains("3.4.0") => "340"
case _ => "332"
}
} else {
shuffleManagerVersion
}
Expand Down Expand Up @@ -889,6 +897,10 @@ class AutoTuner(
val missingRapidsJarsEntry = classPathComments("rapids.jars.missing")
val multipleRapidsJarsEntry = classPathComments("rapids.jars.multiple")

if (platform.platformName == PlatformNames.EMR) {
return
}

appInfoProvider.getRapidsJars match {
case Seq() =>
// No rapids jars
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2551,4 +2551,71 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
}

test("Test EMR sets shuffle manager properly and doesn't need Spark RAPIDS jar") {
// mock the properties loaded from eventLog
val logEventsProps: mutable.Map[String, String] =
mutable.LinkedHashMap[String, String](
"spark.executor.cores" -> "32",
"spark.executor.instances" -> "1",
"spark.executor.memory" -> "80g",
"spark.executor.resource.gpu.amount" -> "1",
"spark.executor.instances" -> "1",
)
val emrWorkerInfo = buildWorkerInfoAsString(None, Some(32),
Some("212992MiB"), Some(5), Some(4), Some(T4Gpu.getMemory), Some(T4Gpu.toString))
val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0),
logEventsProps, Some("3.4.1-amzn-1"))
val clusterPropsOpt = loadClusterPropertiesFromContent(emrWorkerInfo)
val platform = PlatformFactory.createInstance(PlatformNames.EMR, clusterPropsOpt)
val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(emrWorkerInfo, infoProvider,
platform)
val (properties, comments) = autoTuner.getRecommendedProperties()
val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments)
// scalastyle:off line.size.limit
val expectedResults =
s"""|
|Spark Properties:
|--conf spark.executor.instances=20
|--conf spark.executor.memory=24000m
|--conf spark.executor.memoryOverhead=15564m
|--conf spark.rapids.memory.pinnedPool.size=4096m
|--conf spark.rapids.shuffle.multiThreaded.reader.threads=48
|--conf spark.rapids.shuffle.multiThreaded.writer.threads=48
|--conf spark.rapids.sql.batchSizeBytes=2147483647
|--conf spark.rapids.sql.concurrentGpuTasks=2
|--conf spark.rapids.sql.format.parquet.multithreaded.combine.waitTime=1000
|--conf spark.rapids.sql.multiThreadedRead.numThreads=64
|--conf spark.rapids.sql.reader.multithreaded.combine.sizeBytes=10485760
|--conf spark.shuffle.manager=com.nvidia.spark.rapids.spark341.RapidsShuffleManager
|--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m
|--conf spark.sql.adaptive.coalescePartitions.minPartitionSize=4m
|--conf spark.sql.files.maxPartitionBytes=512m
|--conf spark.sql.shuffle.partitions=200
|--conf spark.task.resource.gpu.amount=0.03125
|
|Comments:
|- 'spark.executor.memoryOverhead' was not set.
|- 'spark.rapids.memory.pinnedPool.size' was not set.
|- 'spark.rapids.shuffle.multiThreaded.reader.threads' was not set.
|- 'spark.rapids.shuffle.multiThreaded.writer.threads' was not set.
|- 'spark.rapids.sql.batchSizeBytes' was not set.
|- 'spark.rapids.sql.concurrentGpuTasks' was not set.
|- 'spark.rapids.sql.format.parquet.multithreaded.combine.waitTime' was not set.
|- 'spark.rapids.sql.multiThreadedRead.numThreads' was not set.
|- 'spark.rapids.sql.reader.multithreaded.combine.sizeBytes' was not set.
|- 'spark.shuffle.manager' was not set.
|- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set.
|- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set.
|- 'spark.sql.adaptive.coalescePartitions.minPartitionSize' was not set.
|- 'spark.sql.adaptive.enabled' should be enabled for better performance.
|- 'spark.sql.files.maxPartitionBytes' was not set.
|- 'spark.sql.shuffle.partitions' was not set.
|- 'spark.task.resource.gpu.amount' was not set.
|- RAPIDS Accelerator for Apache Spark jar is missing in "spark.plugins". Please refer to https://docs.nvidia.com/spark-rapids/user-guide/latest/getting-started/overview.html
|- ${AutoTuner.classPathComments("rapids.shuffle.jars")}
|""".stripMargin
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
}
}

0 comments on commit a7f27d1

Please sign in to comment.