From a7f27d10339ec5901af66ccbcadffdcd3b0c19e6 Mon Sep 17 00:00:00 2001 From: Matt Ahrens Date: Tue, 12 Nov 2024 14:45:25 -0600 Subject: [PATCH] Adding EMR-specific tunings for shuffle manager and ignoring Spark RAPIDS jar Signed-off-by: Matt Ahrens --- .../rapids/tool/profiling/AutoTuner.scala | 14 +++- .../tool/profiling/AutoTunerSuite.scala | 67 +++++++++++++++++++ 2 files changed, 80 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala index 6d1a2c165..7fc0bbd23 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala @@ -26,7 +26,7 @@ import scala.collection.mutable.ListBuffer import scala.util.control.NonFatal import scala.util.matching.Regex -import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, GpuDevice, Platform, PlatformFactory} +import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, GpuDevice, Platform, PlatformFactory, PlatformNames} import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path} @@ -757,6 +757,14 @@ class AutoTuner( case ver if ver.contains("11.3") => "330db" case _ => "332db" } + } else if (sparkVersion.contains("amzn")) { + sparkVersion match { + case ver if ver.contains("3.5.1") => "351" + case ver if ver.contains("3.5.0") => "350" + case ver if ver.contains("3.4.1") => "341" + case ver if ver.contains("3.4.0") => "340" + case _ => "332" + } } else { shuffleManagerVersion } @@ -889,6 +897,10 @@ class AutoTuner( val missingRapidsJarsEntry = classPathComments("rapids.jars.missing") val multipleRapidsJarsEntry = classPathComments("rapids.jars.multiple") + if (platform.platformName == PlatformNames.EMR) { + return + } + appInfoProvider.getRapidsJars match { case Seq() => // No rapids jars diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala index cc2f806eb..d9d325682 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala @@ -2551,4 +2551,71 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { // scalastyle:on line.size.limit assert(expectedResults == autoTunerOutput) } + + test("Test EMR sets shuffle manager properly and doesn't need Spark RAPIDS jar") { + // mock the properties loaded from eventLog + val logEventsProps: mutable.Map[String, String] = + mutable.LinkedHashMap[String, String]( + "spark.executor.cores" -> "32", + "spark.executor.instances" -> "1", + "spark.executor.memory" -> "80g", + "spark.executor.resource.gpu.amount" -> "1", + "spark.executor.instances" -> "1", + ) + val emrWorkerInfo = buildWorkerInfoAsString(None, Some(32), + Some("212992MiB"), Some(5), Some(4), Some(T4Gpu.getMemory), Some(T4Gpu.toString)) + val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), + logEventsProps, Some("3.4.1-amzn-1")) + val clusterPropsOpt = loadClusterPropertiesFromContent(emrWorkerInfo) + val platform = PlatformFactory.createInstance(PlatformNames.EMR, clusterPropsOpt) + val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(emrWorkerInfo, infoProvider, + platform) + val (properties, comments) = autoTuner.getRecommendedProperties() + val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) + // scalastyle:off line.size.limit + val expectedResults = + s"""| + |Spark Properties: + |--conf spark.executor.instances=20 + |--conf spark.executor.memory=24000m + |--conf spark.executor.memoryOverhead=15564m + |--conf spark.rapids.memory.pinnedPool.size=4096m + |--conf spark.rapids.shuffle.multiThreaded.reader.threads=48 + |--conf spark.rapids.shuffle.multiThreaded.writer.threads=48 + |--conf spark.rapids.sql.batchSizeBytes=2147483647 + |--conf spark.rapids.sql.concurrentGpuTasks=2 + |--conf spark.rapids.sql.format.parquet.multithreaded.combine.waitTime=1000 + |--conf spark.rapids.sql.multiThreadedRead.numThreads=64 + |--conf spark.rapids.sql.reader.multithreaded.combine.sizeBytes=10485760 + |--conf spark.shuffle.manager=com.nvidia.spark.rapids.spark341.RapidsShuffleManager + |--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m + |--conf spark.sql.adaptive.coalescePartitions.minPartitionSize=4m + |--conf spark.sql.files.maxPartitionBytes=512m + |--conf spark.sql.shuffle.partitions=200 + |--conf spark.task.resource.gpu.amount=0.03125 + | + |Comments: + |- 'spark.executor.memoryOverhead' was not set. + |- 'spark.rapids.memory.pinnedPool.size' was not set. + |- 'spark.rapids.shuffle.multiThreaded.reader.threads' was not set. + |- 'spark.rapids.shuffle.multiThreaded.writer.threads' was not set. + |- 'spark.rapids.sql.batchSizeBytes' was not set. + |- 'spark.rapids.sql.concurrentGpuTasks' was not set. + |- 'spark.rapids.sql.format.parquet.multithreaded.combine.waitTime' was not set. + |- 'spark.rapids.sql.multiThreadedRead.numThreads' was not set. + |- 'spark.rapids.sql.reader.multithreaded.combine.sizeBytes' was not set. + |- 'spark.shuffle.manager' was not set. + |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. + |- 'spark.sql.adaptive.coalescePartitions.minPartitionSize' was not set. + |- 'spark.sql.adaptive.enabled' should be enabled for better performance. + |- 'spark.sql.files.maxPartitionBytes' was not set. + |- 'spark.sql.shuffle.partitions' was not set. + |- 'spark.task.resource.gpu.amount' was not set. + |- RAPIDS Accelerator for Apache Spark jar is missing in "spark.plugins". Please refer to https://docs.nvidia.com/spark-rapids/user-guide/latest/getting-started/overview.html + |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |""".stripMargin + // scalastyle:on line.size.limit + assert(expectedResults == autoTunerOutput) + } }