From 526a073c9ce78806f4ce2c37c4402e88a10497f0 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Tue, 31 Dec 2024 14:00:43 -0800 Subject: [PATCH 1/9] Add checks for supported RapidsShuffleManager versions Signed-off-by: Partho Sarthi --- .../spark/rapids/tool/tuning/AutoTuner.scala | 146 ++++++++++++++---- .../tool/tuning/ProfilingAutoTunerSuite.scala | 113 +++++++++++++- 2 files changed, 225 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala index 7471491e4..f05ab77a6 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala @@ -264,6 +264,112 @@ class RecommendationEntry(val name: String, } } +// scalastyle:off line.size.limit +/** + * Resolves the appropriate RapidsShuffleManager class name based on Spark or Databricks version. + * + * Note: + * - Supported RapidsShuffleManagers: https://docs.nvidia.com/spark-rapids/user-guide/latest/additional-functionality/rapids-shuffle.html#rapids-shuffle-manager + * - Version mappings need to be updated as new versions are supported. + * - This can be extended to support more version mappings (e.g. Cloudera). + */ +// scalastyle:on line.size.limit +object ShuffleManagerResolver { + // Databricks version to RapidsShuffleManager version mapping. + private val DatabricksVersionMap = Map( + "11.3" -> "330db", + "12.3" -> "332db", + "13.3" -> "341db" + ) + + // Spark version to RapidsShuffleManager version mapping. + private val SparkVersionMap = Map( + "3.2.0" -> "320", + "3.2.1" -> "321", + "3.2.2" -> "322", + "3.2.3" -> "323", + "3.2.4" -> "324", + "3.3.0" -> "330", + "3.3.1" -> "331", + "3.3.2" -> "332", + "3.3.3" -> "333", + "3.3.4" -> "334", + "3.4.0" -> "340", + "3.4.1" -> "341", + "3.4.2" -> "342", + "3.4.3" -> "343", + "3.5.0" -> "350", + "3.5.1" -> "351", + "4.0.0" -> "400" + ) + + def buildShuffleManagerClassName(smVersion: String): String = { + s"com.nvidia.spark.rapids.spark$smVersion.RapidsShuffleManager" + } + + def commentForUnsupportedVersion(sparkVersion: String): String = { + s"Could not recommend RapidsShuffleManager as the provided version " + + s"$sparkVersion is not supported." + } + + def commentForMissingVersion: String = { + "Could not recommend RapidsShuffleManager as neither Spark nor Databricks version is provided." + } + + /** + * Internal method to determine the appropriate RapidsShuffleManager class name based on the + * provided databricks or spark version. + * + * Example: + * sparkVersion: "3.2.0-amzn-1" + * versionMap: {"3.2.0" -> "320", "3.2.1" -> "321"} + * Then, smVersion: "320" + * + * sparkVersion: "13.3-ml-1" + * versionMap: {"11.3" -> "330db", "12.3" -> "332db", "13.3" -> "341db"} + * Then, smVersion: "341db" + * + * sparkVersion: "3.1.2" + * versionMap: {"3.2.0" -> "320", "3.2.1" -> "321"} + * Then, smVersion: None + * + * @return Either an error message (Left) or the RapidsShuffleManager class name (Right) + */ + private def getClassNameInternal( + versionMap: Map[String, String], sparkVersion: String): Either[String, String] = { + val smVersionOpt = versionMap.collectFirst { + case (key, value) if sparkVersion.contains(key) => value + } + smVersionOpt match { + case Some(smVersion) => + Right(buildShuffleManagerClassName(smVersion)) + case None => + Left(commentForUnsupportedVersion(sparkVersion)) + } + } + + /** + * Determines the appropriate RapidsShuffleManager class name based on the provided versions. + * Databricks version takes precedence over Spark version. If a valid class name is not found, + * an error message is returned. + * + * @param dbVersion Databricks version. + * @param sparkVersion Spark version. + * @return Either an error message (Left) or the RapidsShuffleManager class name (Right) + */ + def getClassName( + dbVersion: Option[String], sparkVersion: Option[String]): Either[String, String] = { + (dbVersion, sparkVersion) match { + case (Some(dbVer), _) => + getClassNameInternal(DatabricksVersionMap, dbVer) + case (None, Some(sparkVer)) => + getClassNameInternal(SparkVersionMap, sparkVer) + case _ => + Left(commentForMissingVersion) + } + } +} + /** * AutoTuner module that uses event logs and worker's system properties to recommend Spark * RAPIDS configuration based on heuristics. @@ -717,9 +823,11 @@ class AutoTuner( def calculateJobLevelRecommendations(): Unit = { // TODO - do we do anything with 200 shuffle partitions or maybe if its close // set the Spark config spark.shuffle.sort.bypassMergeThreshold - getShuffleManagerClassName match { - case Some(smClassName) => appendRecommendation("spark.shuffle.manager", smClassName) - case None => appendComment("Could not define the Spark Version") + getShuffleManagerClassName match { + case Right(smClassName) => + appendRecommendation("spark.shuffle.manager", smClassName) + case Left(errMessage) => + appendComment(errMessage) } appendComment(autoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")) recommendFileCache() @@ -752,31 +860,13 @@ class AutoTuner( } } - def getShuffleManagerClassName() : Option[String] = { - appInfoProvider.getSparkVersion.map { sparkVersion => - val shuffleManagerVersion = sparkVersion.filterNot("().".toSet) - val dbVersion = getPropertyValue( - DatabricksParseHelper.PROP_TAG_CLUSTER_SPARK_VERSION_KEY).getOrElse("") - val finalShuffleVersion : String = if (dbVersion.nonEmpty) { - dbVersion match { - case ver if ver.contains("10.4") => "321db" - case ver if ver.contains("11.3") => "330db" - case _ => "332db" - } - } else if (sparkVersion.contains("amzn")) { - sparkVersion match { - case ver if ver.contains("3.5.2") => "352" - case ver if ver.contains("3.5.1") => "351" - case ver if ver.contains("3.5.0") => "350" - case ver if ver.contains("3.4.1") => "341" - case ver if ver.contains("3.4.0") => "340" - case _ => "332" - } - } else { - shuffleManagerVersion - } - "com.nvidia.spark.rapids.spark" + finalShuffleVersion + ".RapidsShuffleManager" - } + /** + * Resolves the RapidsShuffleManager class name based on the Spark or Databricks version. + * If a valid class name is not found an error message is appended as a comment. + */ + def getShuffleManagerClassName: Either[String, String] = { + val dbVersion = getPropertyValue(DatabricksParseHelper.PROP_TAG_CLUSTER_SPARK_VERSION_KEY) + ShuffleManagerResolver.getClassName(dbVersion, appInfoProvider.getSparkVersion) } /** diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala index bf3f6c61b..af9a28a26 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala @@ -2183,7 +2183,22 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." assert(expectedResults == autoTunerOutput) } - test("test shuffle manager version for databricks") { + /** + * Helper method to verify that the recommended shuffle manager version matches the + * expected version. + */ + private def verifyRecommendedShuffleManagerVersion( + autoTuner: AutoTuner, + expectedSmVersion: String): Unit = { + autoTuner.getShuffleManagerClassName match { + case Right(smVersion) => + assert(smVersion == ShuffleManagerResolver.buildShuffleManagerClassName(expectedSmVersion)) + case Left(comment) => + fail(s"Expected valid RapidsShuffleManager but got comment: $comment") + } + } + + test("test shuffle manager version for supported databricks") { val databricksWorkerInfo = buildGpuWorkerInfoAsString(None) val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), mutable.Map("spark.rapids.sql.enabled" -> "true", @@ -2194,12 +2209,11 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." val autoTuner = ProfilingAutoTunerConfigsProvider .buildAutoTunerFromProps(databricksWorkerInfo, infoProvider, PlatformFactory.createInstance()) - val smVersion = autoTuner.getShuffleManagerClassName() // Assert shuffle manager string for DB 11.3 tag - assert(smVersion.get == "com.nvidia.spark.rapids.spark330db.RapidsShuffleManager") + verifyRecommendedShuffleManagerVersion(autoTuner, expectedSmVersion="330db") } - test("test shuffle manager version for non-databricks") { + test("test shuffle manager version for supported non-databricks") { val databricksWorkerInfo = buildGpuWorkerInfoAsString(None) val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), mutable.Map("spark.rapids.sql.enabled" -> "true", @@ -2208,8 +2222,95 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." val autoTuner = ProfilingAutoTunerConfigsProvider .buildAutoTunerFromProps(databricksWorkerInfo, infoProvider, PlatformFactory.createInstance()) - val smVersion = autoTuner.getShuffleManagerClassName() - assert(smVersion.get == "com.nvidia.spark.rapids.spark330.RapidsShuffleManager") + // Assert shuffle manager string for supported Spark v3.3.0 + verifyRecommendedShuffleManagerVersion(autoTuner, expectedSmVersion="330") + } + + test("test shuffle manager version for supported custom version") { + val databricksWorkerInfo = buildGpuWorkerInfoAsString(None) + val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), + mutable.Map("spark.rapids.sql.enabled" -> "true", + "spark.plugins" -> "com.nvidia.spark.AnotherPlugin, com.nvidia.spark.SQLPlugin"), + Some("3.3.0-custom"), Seq()) + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(databricksWorkerInfo, + infoProvider, PlatformFactory.createInstance()) + // Assert shuffle manager string for supported custom Spark v3.3.0 + verifyRecommendedShuffleManagerVersion(autoTuner, expectedSmVersion="330") + } + + /** + * Helper method to verify that the shuffle manager version is not recommended + * for the unsupported Spark version. + */ + private def verifyUnsupportedSparkVersionForShuffleManager( + autoTuner: AutoTuner, + sparkVersion: String): Unit = { + autoTuner.getShuffleManagerClassName match { + case Right(smVersion) => + fail(s"Expected error comment but got valid RapidsShuffleManager with version $smVersion") + case Left(comment) => + assert(comment == ShuffleManagerResolver.commentForUnsupportedVersion(sparkVersion)) + } + } + + test("test shuffle manager version for unsupported databricks") { + val databricksVersion = "9.1.x-gpu-ml-scala2.12" + val databricksWorkerInfo = buildGpuWorkerInfoAsString(None) + val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), + mutable.Map("spark.rapids.sql.enabled" -> "true", + "spark.plugins" -> "com.nvidia.spark.AnotherPlugin, com.nvidia.spark.SQLPlugin", + DatabricksParseHelper.PROP_TAG_CLUSTER_SPARK_VERSION_KEY -> databricksVersion), + Some(databricksVersion), Seq()) + // Do not set the platform as DB to see if it can work correctly irrespective + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(databricksWorkerInfo, + infoProvider, PlatformFactory.createInstance()) + verifyUnsupportedSparkVersionForShuffleManager(autoTuner, databricksVersion) + } + + test("test shuffle manager version for unsupported non-databricks") { + val sparkVersion = "3.1.2" + val databricksWorkerInfo = buildGpuWorkerInfoAsString(None) + val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), + mutable.Map("spark.rapids.sql.enabled" -> "true", + "spark.plugins" -> "com.nvidia.spark.AnotherPlugin, com.nvidia.spark.SQLPlugin"), + Some(sparkVersion), Seq()) + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(databricksWorkerInfo, + infoProvider, PlatformFactory.createInstance()) + verifyUnsupportedSparkVersionForShuffleManager(autoTuner, sparkVersion) + } + + test("test shuffle manager version for unsupported custom version") { + val customSparkVersion = "3.1.2-custom" + val databricksWorkerInfo = buildGpuWorkerInfoAsString(None) + val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), + mutable.Map("spark.rapids.sql.enabled" -> "true", + "spark.plugins" -> "com.nvidia.spark.AnotherPlugin, com.nvidia.spark.SQLPlugin"), + Some(customSparkVersion), Seq()) + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(databricksWorkerInfo, + infoProvider, PlatformFactory.createInstance()) + verifyUnsupportedSparkVersionForShuffleManager(autoTuner, customSparkVersion) + } + + test("test shuffle manager version for missing spark version") { + val databricksWorkerInfo = buildGpuWorkerInfoAsString(None) + val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), + mutable.Map("spark.rapids.sql.enabled" -> "true", + "spark.plugins" -> "com.nvidia.spark.AnotherPlugin, com.nvidia.spark.SQLPlugin"), + None, Seq()) + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(databricksWorkerInfo, + infoProvider, PlatformFactory.createInstance()) + // Verify that the shuffle manager is not recommended for missing Spark version + autoTuner.getShuffleManagerClassName match { + case Right(smVersion) => + fail(s"Expected error comment but got valid RapidsShuffleManager with version $smVersion") + case Left(comment) => + assert(comment == ShuffleManagerResolver.commentForMissingVersion) + } } test("Test spilling occurred in shuffle stages") { From 12595344f98bd061c9187435260ce0d6dfa44728 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Thu, 2 Jan 2025 14:36:53 -0800 Subject: [PATCH 2/9] Minor refactoring Signed-off-by: Partho Sarthi --- .../spark/rapids/tool/tuning/AutoTuner.scala | 69 +++++++++---------- .../tool/tuning/ProfilingAutoTunerSuite.scala | 25 +++---- 2 files changed, 44 insertions(+), 50 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala index f05ab77a6..c10f0f099 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -275,15 +275,15 @@ class RecommendationEntry(val name: String, */ // scalastyle:on line.size.limit object ShuffleManagerResolver { - // Databricks version to RapidsShuffleManager version mapping. - private val DatabricksVersionMap = Map( + // Supported Databricks version to RapidsShuffleManager version mapping. + private val supportedDatabricksVersionMap = Array( "11.3" -> "330db", "12.3" -> "332db", "13.3" -> "341db" ) - // Spark version to RapidsShuffleManager version mapping. - private val SparkVersionMap = Map( + // Supported Spark version to RapidsShuffleManager version mapping. + private val supportedSparkVersionMap = Array( "3.2.0" -> "320", "3.2.1" -> "321", "3.2.2" -> "322", @@ -299,8 +299,7 @@ object ShuffleManagerResolver { "3.4.2" -> "342", "3.4.3" -> "343", "3.5.0" -> "350", - "3.5.1" -> "351", - "4.0.0" -> "400" + "3.5.1" -> "351" ) def buildShuffleManagerClassName(smVersion: String): String = { @@ -322,36 +321,35 @@ object ShuffleManagerResolver { * * Example: * sparkVersion: "3.2.0-amzn-1" - * versionMap: {"3.2.0" -> "320", "3.2.1" -> "321"} - * Then, smVersion: "320" + * supportedVersionsMap: ["3.2.0" -> "320", "3.2.1" -> "321"] + * return: Right("com.nvidia.spark.rapids.spark320.RapidsShuffleManager") * - * sparkVersion: "13.3-ml-1" - * versionMap: {"11.3" -> "330db", "12.3" -> "332db", "13.3" -> "341db"} - * Then, smVersion: "341db" + * sparkVersion: "13.3.x-gpu-ml-scala2.12" + * supportedVersionsMap: ["11.3" -> "330db", "12.3" -> "332db", "13.3" -> "341db"] + * return: Right("com.nvidia.spark.rapids.spark341db.RapidsShuffleManager") * * sparkVersion: "3.1.2" - * versionMap: {"3.2.0" -> "320", "3.2.1" -> "321"} - * Then, smVersion: None + * supportedVersionsMap: ["3.2.0" -> "320", "3.2.1" -> "321"] + * return: Left("Could not recommend RapidsShuffleManager as the provided version + * 3.1.2 is not supported.") * * @return Either an error message (Left) or the RapidsShuffleManager class name (Right) */ private def getClassNameInternal( - versionMap: Map[String, String], sparkVersion: String): Either[String, String] = { - val smVersionOpt = versionMap.collectFirst { - case (key, value) if sparkVersion.contains(key) => value - } - smVersionOpt match { - case Some(smVersion) => - Right(buildShuffleManagerClassName(smVersion)) - case None => - Left(commentForUnsupportedVersion(sparkVersion)) + supportedVersionsMap: Array[(String, String)], + sparkVersion: String): Either[String, String] = { + supportedVersionsMap.collectFirst { + case (supportedVersion, smVersion) if sparkVersion.contains(supportedVersion) => smVersion + } match { + case Some(smVersion) => Right(buildShuffleManagerClassName(smVersion)) + case None => Left(commentForUnsupportedVersion(sparkVersion)) } } /** - * Determines the appropriate RapidsShuffleManager class name based on the provided versions. - * Databricks version takes precedence over Spark version. If a valid class name is not found, - * an error message is returned. + * Determines the appropriate RapidsShuffleManager class name based on the provided Databricks or + * Spark version. Databricks version takes precedence over Spark version. If a valid class name + * is not found, an error message is returned. * * @param dbVersion Databricks version. * @param sparkVersion Spark version. @@ -360,12 +358,9 @@ object ShuffleManagerResolver { def getClassName( dbVersion: Option[String], sparkVersion: Option[String]): Either[String, String] = { (dbVersion, sparkVersion) match { - case (Some(dbVer), _) => - getClassNameInternal(DatabricksVersionMap, dbVer) - case (None, Some(sparkVer)) => - getClassNameInternal(SparkVersionMap, sparkVer) - case _ => - Left(commentForMissingVersion) + case (Some(dbVer), _) => getClassNameInternal(supportedDatabricksVersionMap, dbVer) + case (None, Some(sparkVer)) => getClassNameInternal(supportedSparkVersionMap, sparkVer) + case _ => Left(commentForMissingVersion) } } } @@ -824,10 +819,8 @@ class AutoTuner( // TODO - do we do anything with 200 shuffle partitions or maybe if its close // set the Spark config spark.shuffle.sort.bypassMergeThreshold getShuffleManagerClassName match { - case Right(smClassName) => - appendRecommendation("spark.shuffle.manager", smClassName) - case Left(errMessage) => - appendComment(errMessage) + case Right(smClassName) => appendRecommendation("spark.shuffle.manager", smClassName) + case Left(comment) => appendComment(comment) } appendComment(autoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")) recommendFileCache() @@ -861,8 +854,8 @@ class AutoTuner( } /** - * Resolves the RapidsShuffleManager class name based on the Spark or Databricks version. - * If a valid class name is not found an error message is appended as a comment. + * Resolves the RapidsShuffleManager class name based on the Databricks or Spark version. + * If a valid class name is not found, an error message is returned. */ def getShuffleManagerClassName: Either[String, String] = { val dbVersion = getPropertyValue(DatabricksParseHelper.PROP_TAG_CLUSTER_SPARK_VERSION_KEY) diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala index af9a28a26..f6acc325e 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala @@ -2191,14 +2191,15 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." autoTuner: AutoTuner, expectedSmVersion: String): Unit = { autoTuner.getShuffleManagerClassName match { - case Right(smVersion) => - assert(smVersion == ShuffleManagerResolver.buildShuffleManagerClassName(expectedSmVersion)) + case Right(smClassName) => + assert(smClassName == + ShuffleManagerResolver.buildShuffleManagerClassName(expectedSmVersion)) case Left(comment) => fail(s"Expected valid RapidsShuffleManager but got comment: $comment") } } - test("test shuffle manager version for supported databricks") { + test("test shuffle manager version for supported databricks version") { val databricksWorkerInfo = buildGpuWorkerInfoAsString(None) val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), mutable.Map("spark.rapids.sql.enabled" -> "true", @@ -2213,7 +2214,7 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." verifyRecommendedShuffleManagerVersion(autoTuner, expectedSmVersion="330db") } - test("test shuffle manager version for supported non-databricks") { + test("test shuffle manager version for supported spark version") { val databricksWorkerInfo = buildGpuWorkerInfoAsString(None) val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), mutable.Map("spark.rapids.sql.enabled" -> "true", @@ -2226,7 +2227,7 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." verifyRecommendedShuffleManagerVersion(autoTuner, expectedSmVersion="330") } - test("test shuffle manager version for supported custom version") { + test("test shuffle manager version for supported custom spark version") { val databricksWorkerInfo = buildGpuWorkerInfoAsString(None) val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), mutable.Map("spark.rapids.sql.enabled" -> "true", @@ -2247,14 +2248,14 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." autoTuner: AutoTuner, sparkVersion: String): Unit = { autoTuner.getShuffleManagerClassName match { - case Right(smVersion) => - fail(s"Expected error comment but got valid RapidsShuffleManager with version $smVersion") + case Right(smClassName) => + fail(s"Expected error comment but got valid RapidsShuffleManager: $smClassName") case Left(comment) => assert(comment == ShuffleManagerResolver.commentForUnsupportedVersion(sparkVersion)) } } - test("test shuffle manager version for unsupported databricks") { + test("test shuffle manager version for unsupported databricks version") { val databricksVersion = "9.1.x-gpu-ml-scala2.12" val databricksWorkerInfo = buildGpuWorkerInfoAsString(None) val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), @@ -2269,7 +2270,7 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." verifyUnsupportedSparkVersionForShuffleManager(autoTuner, databricksVersion) } - test("test shuffle manager version for unsupported non-databricks") { + test("test shuffle manager version for unsupported spark version") { val sparkVersion = "3.1.2" val databricksWorkerInfo = buildGpuWorkerInfoAsString(None) val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), @@ -2282,7 +2283,7 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." verifyUnsupportedSparkVersionForShuffleManager(autoTuner, sparkVersion) } - test("test shuffle manager version for unsupported custom version") { + test("test shuffle manager version for unsupported custom spark version") { val customSparkVersion = "3.1.2-custom" val databricksWorkerInfo = buildGpuWorkerInfoAsString(None) val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), @@ -2306,8 +2307,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." infoProvider, PlatformFactory.createInstance()) // Verify that the shuffle manager is not recommended for missing Spark version autoTuner.getShuffleManagerClassName match { - case Right(smVersion) => - fail(s"Expected error comment but got valid RapidsShuffleManager with version $smVersion") + case Right(smClassName) => + fail(s"Expected error comment but got valid RapidsShuffleManager: $smClassName") case Left(comment) => assert(comment == ShuffleManagerResolver.commentForMissingVersion) } From 1ccec41af01cf11590e3b7b5e9e85f3730c6ad16 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Thu, 2 Jan 2025 21:25:06 -0800 Subject: [PATCH 3/9] Add doc url in error comment Signed-off-by: Partho Sarthi --- .../com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala index c10f0f099..68291baf2 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala @@ -302,13 +302,16 @@ object ShuffleManagerResolver { "3.5.1" -> "351" ) + private val shuffleManagerDocUrl = "https://docs.nvidia.com/spark-rapids/user-guide/latest/" + + "additional-functionality/rapids-shuffle.html#rapids-shuffle-manager" + def buildShuffleManagerClassName(smVersion: String): String = { s"com.nvidia.spark.rapids.spark$smVersion.RapidsShuffleManager" } def commentForUnsupportedVersion(sparkVersion: String): String = { - s"Could not recommend RapidsShuffleManager as the provided version " + - s"$sparkVersion is not supported." + s"Cannot recommend RAPIDS Shuffle Manager for unsupported \'$sparkVersion\' version.\n" + + s" See supported versions: $shuffleManagerDocUrl." } def commentForMissingVersion: String = { From 7842e9d7a5c5e1b0a36d484c989a4f344867a275 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Fri, 3 Jan 2025 13:40:43 -0800 Subject: [PATCH 4/9] Refactor to make shuffle manager validation platform specific Signed-off-by: Partho Sarthi --- .../nvidia/spark/rapids/tool/Platform.scala | 42 ++++- .../spark/rapids/tool/tuning/AutoTuner.scala | 145 +++++------------- .../tool/tuning/ProfilingAutoTunerSuite.scala | 42 ++--- 3 files changed, 101 insertions(+), 128 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala index 866e1fbbd..91872e4b1 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * Copyright (c) 2023-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -139,6 +139,39 @@ abstract class Platform(var gpuDevice: Option[GpuDevice], SparkRuntime.SPARK, SparkRuntime.SPARK_RAPIDS ) + // scalastyle:off line.size.limit + // Supported Spark version to RapidsShuffleManager version mapping. + // Reference: https://docs.nvidia.com/spark-rapids/user-guide/latest/additional-functionality/rapids-shuffle.html#rapids-shuffle-manager + // scalastyle:on line.size.limit + val supportedShuffleManagerVersionMap: Array[(String, String)] = Array( + "3.2.0" -> "320", + "3.2.1" -> "321", + "3.2.2" -> "322", + "3.2.3" -> "323", + "3.2.4" -> "324", + "3.3.0" -> "330", + "3.3.1" -> "331", + "3.3.2" -> "332", + "3.3.3" -> "333", + "3.3.4" -> "334", + "3.4.0" -> "340", + "3.4.1" -> "341", + "3.4.2" -> "342", + "3.4.3" -> "343", + "3.5.0" -> "350", + "3.5.1" -> "351" + ) + + /** + * Determine the appropriate RapidsShuffleManager version based on the + * provided spark version. + */ + def getShuffleManagerVersion(sparkVersion: String): Option[String] = { + supportedShuffleManagerVersionMap.collectFirst { + case (supportedVersion, smVersion) if sparkVersion.contains(supportedVersion) => smVersion + } + } + /** * Checks if the given runtime is supported by the platform. */ @@ -538,6 +571,13 @@ abstract class DatabricksPlatform(gpuDevice: Option[GpuDevice], "spark.executor.memoryOverhead" ) + // Supported Databricks version to RapidsShuffleManager version mapping. + override val supportedShuffleManagerVersionMap: Array[(String, String)] = Array( + "11.3" -> "330db", + "12.2" -> "332db", + "13.3" -> "341db" + ) + override def createClusterInfo(coresPerExecutor: Int, numExecsPerNode: Int, numExecs: Int, diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala index 68291baf2..f6b66dd7f 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala @@ -27,7 +27,6 @@ import scala.util.control.NonFatal import scala.util.matching.Regex import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, GpuDevice, Platform, PlatformFactory} -import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper import com.nvidia.spark.rapids.tool.profiling._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path} @@ -264,110 +263,6 @@ class RecommendationEntry(val name: String, } } -// scalastyle:off line.size.limit -/** - * Resolves the appropriate RapidsShuffleManager class name based on Spark or Databricks version. - * - * Note: - * - Supported RapidsShuffleManagers: https://docs.nvidia.com/spark-rapids/user-guide/latest/additional-functionality/rapids-shuffle.html#rapids-shuffle-manager - * - Version mappings need to be updated as new versions are supported. - * - This can be extended to support more version mappings (e.g. Cloudera). - */ -// scalastyle:on line.size.limit -object ShuffleManagerResolver { - // Supported Databricks version to RapidsShuffleManager version mapping. - private val supportedDatabricksVersionMap = Array( - "11.3" -> "330db", - "12.3" -> "332db", - "13.3" -> "341db" - ) - - // Supported Spark version to RapidsShuffleManager version mapping. - private val supportedSparkVersionMap = Array( - "3.2.0" -> "320", - "3.2.1" -> "321", - "3.2.2" -> "322", - "3.2.3" -> "323", - "3.2.4" -> "324", - "3.3.0" -> "330", - "3.3.1" -> "331", - "3.3.2" -> "332", - "3.3.3" -> "333", - "3.3.4" -> "334", - "3.4.0" -> "340", - "3.4.1" -> "341", - "3.4.2" -> "342", - "3.4.3" -> "343", - "3.5.0" -> "350", - "3.5.1" -> "351" - ) - - private val shuffleManagerDocUrl = "https://docs.nvidia.com/spark-rapids/user-guide/latest/" + - "additional-functionality/rapids-shuffle.html#rapids-shuffle-manager" - - def buildShuffleManagerClassName(smVersion: String): String = { - s"com.nvidia.spark.rapids.spark$smVersion.RapidsShuffleManager" - } - - def commentForUnsupportedVersion(sparkVersion: String): String = { - s"Cannot recommend RAPIDS Shuffle Manager for unsupported \'$sparkVersion\' version.\n" + - s" See supported versions: $shuffleManagerDocUrl." - } - - def commentForMissingVersion: String = { - "Could not recommend RapidsShuffleManager as neither Spark nor Databricks version is provided." - } - - /** - * Internal method to determine the appropriate RapidsShuffleManager class name based on the - * provided databricks or spark version. - * - * Example: - * sparkVersion: "3.2.0-amzn-1" - * supportedVersionsMap: ["3.2.0" -> "320", "3.2.1" -> "321"] - * return: Right("com.nvidia.spark.rapids.spark320.RapidsShuffleManager") - * - * sparkVersion: "13.3.x-gpu-ml-scala2.12" - * supportedVersionsMap: ["11.3" -> "330db", "12.3" -> "332db", "13.3" -> "341db"] - * return: Right("com.nvidia.spark.rapids.spark341db.RapidsShuffleManager") - * - * sparkVersion: "3.1.2" - * supportedVersionsMap: ["3.2.0" -> "320", "3.2.1" -> "321"] - * return: Left("Could not recommend RapidsShuffleManager as the provided version - * 3.1.2 is not supported.") - * - * @return Either an error message (Left) or the RapidsShuffleManager class name (Right) - */ - private def getClassNameInternal( - supportedVersionsMap: Array[(String, String)], - sparkVersion: String): Either[String, String] = { - supportedVersionsMap.collectFirst { - case (supportedVersion, smVersion) if sparkVersion.contains(supportedVersion) => smVersion - } match { - case Some(smVersion) => Right(buildShuffleManagerClassName(smVersion)) - case None => Left(commentForUnsupportedVersion(sparkVersion)) - } - } - - /** - * Determines the appropriate RapidsShuffleManager class name based on the provided Databricks or - * Spark version. Databricks version takes precedence over Spark version. If a valid class name - * is not found, an error message is returned. - * - * @param dbVersion Databricks version. - * @param sparkVersion Spark version. - * @return Either an error message (Left) or the RapidsShuffleManager class name (Right) - */ - def getClassName( - dbVersion: Option[String], sparkVersion: Option[String]): Either[String, String] = { - (dbVersion, sparkVersion) match { - case (Some(dbVer), _) => getClassNameInternal(supportedDatabricksVersionMap, dbVer) - case (None, Some(sparkVer)) => getClassNameInternal(supportedSparkVersionMap, sparkVer) - case _ => Left(commentForMissingVersion) - } - } -} - /** * AutoTuner module that uses event logs and worker's system properties to recommend Spark * RAPIDS configuration based on heuristics. @@ -857,12 +752,30 @@ class AutoTuner( } /** - * Resolves the RapidsShuffleManager class name based on the Databricks or Spark version. + * Resolves the RapidsShuffleManager class name based on the Spark version. * If a valid class name is not found, an error message is returned. + * + * Example: + * sparkVersion: "3.2.0-amzn-1" + * return: Right("com.nvidia.spark.rapids.spark320.RapidsShuffleManager") + * + * sparkVersion: "3.1.2" + * return: Left("Cannot recommend RAPIDS Shuffle Manager for unsupported '3.1.2' version.") + * + * @return Either an error message (Left) or the RapidsShuffleManager class name (Right) */ def getShuffleManagerClassName: Either[String, String] = { - val dbVersion = getPropertyValue(DatabricksParseHelper.PROP_TAG_CLUSTER_SPARK_VERSION_KEY) - ShuffleManagerResolver.getClassName(dbVersion, appInfoProvider.getSparkVersion) + appInfoProvider.getSparkVersion match { + case Some(sparkVersion) => + platform.getShuffleManagerVersion(sparkVersion) match { + case Some(smVersion) => + Right(autoTunerConfigsProvider.buildShuffleManagerClassName(smVersion)) + case None => + Left(autoTunerConfigsProvider.shuffleManagerCommentForUnsupportedVersion(sparkVersion)) + } + case None => + Left(autoTunerConfigsProvider.shuffleManagerCommentForMissingVersion) + } } /** @@ -1430,6 +1343,9 @@ trait AutoTunerConfigsProvider extends Logging { // the plugin jar is in the form of rapids-4-spark_scala_binary-(version)-*.jar val pluginJarRegEx: Regex = "rapids-4-spark_\\d\\.\\d+-(\\d{2}\\.\\d{2}\\.\\d+).*\\.jar".r + private val shuffleManagerDocUrl = "https://docs.nvidia.com/spark-rapids/user-guide/latest/" + + "additional-functionality/rapids-shuffle.html#rapids-shuffle-manager" + /** * Abstract method to create an instance of the AutoTuner. */ @@ -1546,6 +1462,19 @@ trait AutoTunerConfigsProvider extends Logging { case _ => true } } + + def buildShuffleManagerClassName(smVersion: String): String = { + s"com.nvidia.spark.rapids.spark$smVersion.RapidsShuffleManager" + } + + def shuffleManagerCommentForUnsupportedVersion(sparkVersion: String): String = { + s"Cannot recommend RAPIDS Shuffle Manager for unsupported \'$sparkVersion\' version.\n" + + s" See supported versions: $shuffleManagerDocUrl." + } + + def shuffleManagerCommentForMissingVersion: String = { + "Could not recommend RapidsShuffleManager as Spark version cannot be determined." + } } /** diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala index f6acc325e..f458d2dec 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala @@ -2193,48 +2193,51 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." autoTuner.getShuffleManagerClassName match { case Right(smClassName) => assert(smClassName == - ShuffleManagerResolver.buildShuffleManagerClassName(expectedSmVersion)) + ProfilingAutoTunerConfigsProvider.buildShuffleManagerClassName(expectedSmVersion)) case Left(comment) => fail(s"Expected valid RapidsShuffleManager but got comment: $comment") } } test("test shuffle manager version for supported databricks version") { + val databricksVersion = "11.3.x-gpu-ml-scala2.12" val databricksWorkerInfo = buildGpuWorkerInfoAsString(None) val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), mutable.Map("spark.rapids.sql.enabled" -> "true", "spark.plugins" -> "com.nvidia.spark.AnotherPlugin, com.nvidia.spark.SQLPlugin", - DatabricksParseHelper.PROP_TAG_CLUSTER_SPARK_VERSION_KEY -> "11.3.x-gpu-ml-scala2.12"), - Some("3.3.0"), Seq()) + DatabricksParseHelper.PROP_TAG_CLUSTER_SPARK_VERSION_KEY -> databricksVersion), + Some(databricksVersion), Seq()) // Do not set the platform as DB to see if it can work correctly irrespective val autoTuner = ProfilingAutoTunerConfigsProvider .buildAutoTunerFromProps(databricksWorkerInfo, - infoProvider, PlatformFactory.createInstance()) + infoProvider, PlatformFactory.createInstance(PlatformNames.DATABRICKS_AWS)) // Assert shuffle manager string for DB 11.3 tag verifyRecommendedShuffleManagerVersion(autoTuner, expectedSmVersion="330db") } test("test shuffle manager version for supported spark version") { - val databricksWorkerInfo = buildGpuWorkerInfoAsString(None) + val sparkVersion = "3.3.0" + val workerInfo = buildGpuWorkerInfoAsString(None) val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), mutable.Map("spark.rapids.sql.enabled" -> "true", "spark.plugins" -> "com.nvidia.spark.AnotherPlugin, com.nvidia.spark.SQLPlugin"), - Some("3.3.0"), Seq()) + Some(sparkVersion), Seq()) val autoTuner = ProfilingAutoTunerConfigsProvider - .buildAutoTunerFromProps(databricksWorkerInfo, + .buildAutoTunerFromProps(workerInfo, infoProvider, PlatformFactory.createInstance()) // Assert shuffle manager string for supported Spark v3.3.0 verifyRecommendedShuffleManagerVersion(autoTuner, expectedSmVersion="330") } test("test shuffle manager version for supported custom spark version") { - val databricksWorkerInfo = buildGpuWorkerInfoAsString(None) + val customSparkVersion = "3.3.0-custom" + val workerInfo = buildGpuWorkerInfoAsString(None) val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), mutable.Map("spark.rapids.sql.enabled" -> "true", "spark.plugins" -> "com.nvidia.spark.AnotherPlugin, com.nvidia.spark.SQLPlugin"), - Some("3.3.0-custom"), Seq()) + Some(customSparkVersion), Seq()) val autoTuner = ProfilingAutoTunerConfigsProvider - .buildAutoTunerFromProps(databricksWorkerInfo, + .buildAutoTunerFromProps(workerInfo, infoProvider, PlatformFactory.createInstance()) // Assert shuffle manager string for supported custom Spark v3.3.0 verifyRecommendedShuffleManagerVersion(autoTuner, expectedSmVersion="330") @@ -2251,7 +2254,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." case Right(smClassName) => fail(s"Expected error comment but got valid RapidsShuffleManager: $smClassName") case Left(comment) => - assert(comment == ShuffleManagerResolver.commentForUnsupportedVersion(sparkVersion)) + assert(comment == + ProfilingAutoTunerConfigsProvider.shuffleManagerCommentForUnsupportedVersion(sparkVersion)) } } @@ -2266,51 +2270,51 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." // Do not set the platform as DB to see if it can work correctly irrespective val autoTuner = ProfilingAutoTunerConfigsProvider .buildAutoTunerFromProps(databricksWorkerInfo, - infoProvider, PlatformFactory.createInstance()) + infoProvider, PlatformFactory.createInstance(PlatformNames.DATABRICKS_AWS)) verifyUnsupportedSparkVersionForShuffleManager(autoTuner, databricksVersion) } test("test shuffle manager version for unsupported spark version") { val sparkVersion = "3.1.2" - val databricksWorkerInfo = buildGpuWorkerInfoAsString(None) + val workerInfo = buildGpuWorkerInfoAsString(None) val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), mutable.Map("spark.rapids.sql.enabled" -> "true", "spark.plugins" -> "com.nvidia.spark.AnotherPlugin, com.nvidia.spark.SQLPlugin"), Some(sparkVersion), Seq()) val autoTuner = ProfilingAutoTunerConfigsProvider - .buildAutoTunerFromProps(databricksWorkerInfo, + .buildAutoTunerFromProps(workerInfo, infoProvider, PlatformFactory.createInstance()) verifyUnsupportedSparkVersionForShuffleManager(autoTuner, sparkVersion) } test("test shuffle manager version for unsupported custom spark version") { val customSparkVersion = "3.1.2-custom" - val databricksWorkerInfo = buildGpuWorkerInfoAsString(None) + val workerInfo = buildGpuWorkerInfoAsString(None) val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), mutable.Map("spark.rapids.sql.enabled" -> "true", "spark.plugins" -> "com.nvidia.spark.AnotherPlugin, com.nvidia.spark.SQLPlugin"), Some(customSparkVersion), Seq()) val autoTuner = ProfilingAutoTunerConfigsProvider - .buildAutoTunerFromProps(databricksWorkerInfo, + .buildAutoTunerFromProps(workerInfo, infoProvider, PlatformFactory.createInstance()) verifyUnsupportedSparkVersionForShuffleManager(autoTuner, customSparkVersion) } test("test shuffle manager version for missing spark version") { - val databricksWorkerInfo = buildGpuWorkerInfoAsString(None) + val workerInfo = buildGpuWorkerInfoAsString(None) val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), mutable.Map("spark.rapids.sql.enabled" -> "true", "spark.plugins" -> "com.nvidia.spark.AnotherPlugin, com.nvidia.spark.SQLPlugin"), None, Seq()) val autoTuner = ProfilingAutoTunerConfigsProvider - .buildAutoTunerFromProps(databricksWorkerInfo, + .buildAutoTunerFromProps(workerInfo, infoProvider, PlatformFactory.createInstance()) // Verify that the shuffle manager is not recommended for missing Spark version autoTuner.getShuffleManagerClassName match { case Right(smClassName) => fail(s"Expected error comment but got valid RapidsShuffleManager: $smClassName") case Left(comment) => - assert(comment == ShuffleManagerResolver.commentForMissingVersion) + assert(comment == ProfilingAutoTunerConfigsProvider.shuffleManagerCommentForMissingVersion) } } From 81d0c6c659a2b76569ccfb8a28d04e5a800f1643 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Fri, 3 Jan 2025 13:53:43 -0800 Subject: [PATCH 5/9] Fix line length Signed-off-by: Partho Sarthi --- .../rapids/tool/tuning/ProfilingAutoTunerSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala index f458d2dec..c5f284178 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala @@ -2192,8 +2192,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." expectedSmVersion: String): Unit = { autoTuner.getShuffleManagerClassName match { case Right(smClassName) => - assert(smClassName == - ProfilingAutoTunerConfigsProvider.buildShuffleManagerClassName(expectedSmVersion)) + assert(smClassName == ProfilingAutoTunerConfigsProvider + .buildShuffleManagerClassName(expectedSmVersion)) case Left(comment) => fail(s"Expected valid RapidsShuffleManager but got comment: $comment") } @@ -2254,8 +2254,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." case Right(smClassName) => fail(s"Expected error comment but got valid RapidsShuffleManager: $smClassName") case Left(comment) => - assert(comment == - ProfilingAutoTunerConfigsProvider.shuffleManagerCommentForUnsupportedVersion(sparkVersion)) + assert(comment == ProfilingAutoTunerConfigsProvider + .shuffleManagerCommentForUnsupportedVersion(sparkVersion)) } } From 782e82a38c8b1f1e7399a5e42a813d49b80329ec Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Fri, 3 Jan 2025 14:23:33 -0800 Subject: [PATCH 6/9] Update comment for unsupported version Signed-off-by: Partho Sarthi --- .../com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala index f6b66dd7f..249e058a3 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala @@ -1468,8 +1468,10 @@ trait AutoTunerConfigsProvider extends Logging { } def shuffleManagerCommentForUnsupportedVersion(sparkVersion: String): String = { - s"Cannot recommend RAPIDS Shuffle Manager for unsupported \'$sparkVersion\' version.\n" + - s" See supported versions: $shuffleManagerDocUrl." + s"Cannot recommend RAPIDS Shuffle Manager for unsupported '$sparkVersion' version.\n" + + " To enable RAPIDS Shuffle Manager, set 'spark.shuffle.manager' to a value\n" + + " from the supported versions. \n" + + s" See supported versions: $shuffleManagerDocUrl." } def shuffleManagerCommentForMissingVersion: String = { From 2a84edad48e34c4a5693d9bfcafddddbaaef878a Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Fri, 3 Jan 2025 14:40:43 -0800 Subject: [PATCH 7/9] Refactor and update comments Signed-off-by: Partho Sarthi --- .../spark/rapids/tool/tuning/AutoTuner.scala | 25 +++++++++---------- .../tool/tuning/ProfilingAutoTunerSuite.scala | 7 +++--- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala index 249e058a3..b05cbda61 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala @@ -771,10 +771,10 @@ class AutoTuner( case Some(smVersion) => Right(autoTunerConfigsProvider.buildShuffleManagerClassName(smVersion)) case None => - Left(autoTunerConfigsProvider.shuffleManagerCommentForUnsupportedVersion(sparkVersion)) + Left(autoTunerConfigsProvider.shuffleManagerComments("unsupported")(sparkVersion)) } case None => - Left(autoTunerConfigsProvider.shuffleManagerCommentForMissingVersion) + Left(autoTunerConfigsProvider.shuffleManagerComments("missing")("")) } } @@ -1346,6 +1346,16 @@ trait AutoTunerConfigsProvider extends Logging { private val shuffleManagerDocUrl = "https://docs.nvidia.com/spark-rapids/user-guide/latest/" + "additional-functionality/rapids-shuffle.html#rapids-shuffle-manager" + val shuffleManagerComments: Map[String, String => String] = Map( + "unsupported" -> ((sparkVersion: String) => + s"Cannot recommend RAPIDS Shuffle Manager for unsupported '$sparkVersion' version.\n" + + " To enable RAPIDS Shuffle Manager, use a supported Spark version and set \n" + + " 'spark.shuffle.manager' to a valid RAPIDS Shuffle Manager version. \n" + + s" See supported versions: $shuffleManagerDocUrl."), + "missing" -> (_ => + "Could not recommend RapidsShuffleManager as Spark version cannot be determined.") + ) + /** * Abstract method to create an instance of the AutoTuner. */ @@ -1466,17 +1476,6 @@ trait AutoTunerConfigsProvider extends Logging { def buildShuffleManagerClassName(smVersion: String): String = { s"com.nvidia.spark.rapids.spark$smVersion.RapidsShuffleManager" } - - def shuffleManagerCommentForUnsupportedVersion(sparkVersion: String): String = { - s"Cannot recommend RAPIDS Shuffle Manager for unsupported '$sparkVersion' version.\n" + - " To enable RAPIDS Shuffle Manager, set 'spark.shuffle.manager' to a value\n" + - " from the supported versions. \n" + - s" See supported versions: $shuffleManagerDocUrl." - } - - def shuffleManagerCommentForMissingVersion: String = { - "Could not recommend RapidsShuffleManager as Spark version cannot be determined." - } } /** diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala index c5f284178..3a1b2d824 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala @@ -2254,8 +2254,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." case Right(smClassName) => fail(s"Expected error comment but got valid RapidsShuffleManager: $smClassName") case Left(comment) => - assert(comment == ProfilingAutoTunerConfigsProvider - .shuffleManagerCommentForUnsupportedVersion(sparkVersion)) + assert(comment == + ProfilingAutoTunerConfigsProvider.shuffleManagerComments("unsupported")(sparkVersion)) } } @@ -2314,7 +2314,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." case Right(smClassName) => fail(s"Expected error comment but got valid RapidsShuffleManager: $smClassName") case Left(comment) => - assert(comment == ProfilingAutoTunerConfigsProvider.shuffleManagerCommentForMissingVersion) + assert(comment == + ProfilingAutoTunerConfigsProvider.shuffleManagerComments("missing")("")) } } From 790c8b9002b616cbdc1da3f0c8742ec3cac910b7 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Mon, 6 Jan 2025 10:48:59 -0800 Subject: [PATCH 8/9] Revert "Refactor and update comments" This reverts commit 2a84edad48e34c4a5693d9bfcafddddbaaef878a. --- .../spark/rapids/tool/tuning/AutoTuner.scala | 25 ++++++++++--------- .../tool/tuning/ProfilingAutoTunerSuite.scala | 7 +++--- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala index b05cbda61..249e058a3 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala @@ -771,10 +771,10 @@ class AutoTuner( case Some(smVersion) => Right(autoTunerConfigsProvider.buildShuffleManagerClassName(smVersion)) case None => - Left(autoTunerConfigsProvider.shuffleManagerComments("unsupported")(sparkVersion)) + Left(autoTunerConfigsProvider.shuffleManagerCommentForUnsupportedVersion(sparkVersion)) } case None => - Left(autoTunerConfigsProvider.shuffleManagerComments("missing")("")) + Left(autoTunerConfigsProvider.shuffleManagerCommentForMissingVersion) } } @@ -1346,16 +1346,6 @@ trait AutoTunerConfigsProvider extends Logging { private val shuffleManagerDocUrl = "https://docs.nvidia.com/spark-rapids/user-guide/latest/" + "additional-functionality/rapids-shuffle.html#rapids-shuffle-manager" - val shuffleManagerComments: Map[String, String => String] = Map( - "unsupported" -> ((sparkVersion: String) => - s"Cannot recommend RAPIDS Shuffle Manager for unsupported '$sparkVersion' version.\n" + - " To enable RAPIDS Shuffle Manager, use a supported Spark version and set \n" + - " 'spark.shuffle.manager' to a valid RAPIDS Shuffle Manager version. \n" + - s" See supported versions: $shuffleManagerDocUrl."), - "missing" -> (_ => - "Could not recommend RapidsShuffleManager as Spark version cannot be determined.") - ) - /** * Abstract method to create an instance of the AutoTuner. */ @@ -1476,6 +1466,17 @@ trait AutoTunerConfigsProvider extends Logging { def buildShuffleManagerClassName(smVersion: String): String = { s"com.nvidia.spark.rapids.spark$smVersion.RapidsShuffleManager" } + + def shuffleManagerCommentForUnsupportedVersion(sparkVersion: String): String = { + s"Cannot recommend RAPIDS Shuffle Manager for unsupported '$sparkVersion' version.\n" + + " To enable RAPIDS Shuffle Manager, set 'spark.shuffle.manager' to a value\n" + + " from the supported versions. \n" + + s" See supported versions: $shuffleManagerDocUrl." + } + + def shuffleManagerCommentForMissingVersion: String = { + "Could not recommend RapidsShuffleManager as Spark version cannot be determined." + } } /** diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala index 3a1b2d824..c5f284178 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala @@ -2254,8 +2254,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." case Right(smClassName) => fail(s"Expected error comment but got valid RapidsShuffleManager: $smClassName") case Left(comment) => - assert(comment == - ProfilingAutoTunerConfigsProvider.shuffleManagerComments("unsupported")(sparkVersion)) + assert(comment == ProfilingAutoTunerConfigsProvider + .shuffleManagerCommentForUnsupportedVersion(sparkVersion)) } } @@ -2314,8 +2314,7 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." case Right(smClassName) => fail(s"Expected error comment but got valid RapidsShuffleManager: $smClassName") case Left(comment) => - assert(comment == - ProfilingAutoTunerConfigsProvider.shuffleManagerComments("missing")("")) + assert(comment == ProfilingAutoTunerConfigsProvider.shuffleManagerCommentForMissingVersion) } } From 4b57134e15d30a31928f428989dfc5fe97519356 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Mon, 6 Jan 2025 14:07:53 -0800 Subject: [PATCH 9/9] Update comment to include an example config Signed-off-by: Partho Sarthi --- .../nvidia/spark/rapids/tool/Platform.scala | 9 +++++++++ .../spark/rapids/tool/tuning/AutoTuner.scala | 19 +++++++++++++------ .../tool/tuning/ProfilingAutoTunerSuite.scala | 2 +- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala index 91872e4b1..24cfbfeba 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala @@ -121,6 +121,7 @@ abstract class Platform(var gpuDevice: Option[GpuDevice], val clusterProperties: Option[ClusterProperties]) extends Logging { val platformName: String val defaultGpuDevice: GpuDevice + val sparkVersionLabel: String = "Spark version" // It's not deal to use vars here but to minimize changes and // keep backwards compatibility we put them here for now and hopefully @@ -172,6 +173,13 @@ abstract class Platform(var gpuDevice: Option[GpuDevice], } } + /** + * Identify the latest supported Spark and RapidsShuffleManager version for the platform. + */ + lazy val latestSupportedShuffleManagerInfo: (String, String) = { + supportedShuffleManagerVersionMap.maxBy(_._1) + } + /** * Checks if the given runtime is supported by the platform. */ @@ -555,6 +563,7 @@ abstract class Platform(var gpuDevice: Option[GpuDevice], abstract class DatabricksPlatform(gpuDevice: Option[GpuDevice], clusterProperties: Option[ClusterProperties]) extends Platform(gpuDevice, clusterProperties) { override val defaultGpuDevice: GpuDevice = T4Gpu + override val sparkVersionLabel: String = "Databricks runtime" override def isPlatformCSP: Boolean = true override val supportedRuntimes: Set[SparkRuntime.SparkRuntime] = Set( diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala index 249e058a3..048e71ada 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala @@ -771,7 +771,8 @@ class AutoTuner( case Some(smVersion) => Right(autoTunerConfigsProvider.buildShuffleManagerClassName(smVersion)) case None => - Left(autoTunerConfigsProvider.shuffleManagerCommentForUnsupportedVersion(sparkVersion)) + Left(autoTunerConfigsProvider.shuffleManagerCommentForUnsupportedVersion( + sparkVersion, platform)) } case None => Left(autoTunerConfigsProvider.shuffleManagerCommentForMissingVersion) @@ -1467,11 +1468,17 @@ trait AutoTunerConfigsProvider extends Logging { s"com.nvidia.spark.rapids.spark$smVersion.RapidsShuffleManager" } - def shuffleManagerCommentForUnsupportedVersion(sparkVersion: String): String = { - s"Cannot recommend RAPIDS Shuffle Manager for unsupported '$sparkVersion' version.\n" + - " To enable RAPIDS Shuffle Manager, set 'spark.shuffle.manager' to a value\n" + - " from the supported versions. \n" + - s" See supported versions: $shuffleManagerDocUrl." + def shuffleManagerCommentForUnsupportedVersion( + sparkVersion: String, platform: Platform): String = { + val (latestSparkVersion, latestSmVersion) = platform.latestSupportedShuffleManagerInfo + // scalastyle:off line.size.limit + s""" + |Cannot recommend RAPIDS Shuffle Manager for unsupported ${platform.sparkVersionLabel}: '$sparkVersion'. + |To enable RAPIDS Shuffle Manager, use a supported ${platform.sparkVersionLabel} (e.g., '$latestSparkVersion') + |and set: '--conf spark.shuffle.manager=com.nvidia.spark.rapids.spark$latestSmVersion.RapidsShuffleManager'. + |See supported versions: $shuffleManagerDocUrl. + |""".stripMargin.trim.replaceAll("\n", "\n ") + // scalastyle:on line.size.limit } def shuffleManagerCommentForMissingVersion: String = { diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala index c5f284178..8b1007a9c 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala @@ -2255,7 +2255,7 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." fail(s"Expected error comment but got valid RapidsShuffleManager: $smClassName") case Left(comment) => assert(comment == ProfilingAutoTunerConfigsProvider - .shuffleManagerCommentForUnsupportedVersion(sparkVersion)) + .shuffleManagerCommentForUnsupportedVersion(sparkVersion, autoTuner.platform)) } }