Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Profiler autotuner should only specify standard Spark versions for shuffle manager setting #662

Merged
merged 5 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -591,9 +591,8 @@ class AutoTuner(
}

def calculateJobLevelRecommendations(): Unit = {
val shuffleManagerVersion = appInfoProvider.getSparkVersion.get.filterNot("().".toSet)
appendRecommendation("spark.shuffle.manager",
"com.nvidia.spark.rapids.spark" + shuffleManagerVersion + ".RapidsShuffleManager")
val smClassName = getShuffleManagerClassName
appendRecommendation("spark.shuffle.manager", smClassName)
appendComment(classPathComments("rapids.shuffle.jars"))

recommendFileCache()
Expand All @@ -603,6 +602,22 @@ class AutoTuner(
recommendClassPathEntries()
}

def getShuffleManagerClassName() : String = {
val shuffleManagerVersion = appInfoProvider.getSparkVersion.get.filterNot("().".toSet)
val dbVersion = appInfoProvider.getProperty(
"spark.databricks.clusterUsageTags.sparkVersion").getOrElse("")
val finalShuffleVersion : String = if (dbVersion.nonEmpty) {
dbVersion match {
case ver if ver.contains("10.4") => "321db"
case ver if ver.contains("11.3") => "330db"
case _ => "332db"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good start. We can have this list hardcoded for this PR.
However moving forward, it is going to maintain the code everytime a new DB class is added.

Perhaps later we can create a static properties file to map between db and plugin versions? At least we won't need to change scala code for that. Also, we can automate a script whenever a new class is added in the plugin repo. WDYT?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely agree. This needs to be maintained some place. Do we have examples of static properties files elsewhere at the moment? If not, we can start with this property. Automating is more interesting as adding a new class may not be a complete indicator that we need a new config key, DB in particular might fit that use case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started the seed to this infrastructure in the existing PR #705 (loadPropertiesFile)
My plan is to build on that by adding trait configurable that will load the the properties file from a static file.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I address the change here as part of this PR or are we good with this for now?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good for this PR.

}
} else {
shuffleManagerVersion
}
"com.nvidia.spark.rapids.spark" + finalShuffleVersion + ".RapidsShuffleManager"
}

/**
* Checks whether the cluster properties are valid.
* If the cluster worker-info is missing entries (i.e., CPU and GPU count), it sets the entries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
val systemProperties = customProps match {
case None => mutable.Map[String, String]()
case Some(newProps) => newProps
}
}
val convertedMap = new util.LinkedHashMap[String, String](systemProperties.asJava)
val clusterProps = new ClusterProperties(cpuSystem, gpuWorkerProps, convertedMap)
// set the options to convert the object into formatted yaml content
Expand Down Expand Up @@ -654,7 +654,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
assert(expectedResults == autoTunerOutput)
}

test("test AutoTuner with empty sparkProperties" ) {
test("test AutoTuner with empty sparkProperties") {
val dataprocWorkerInfo = buildWorkerInfoAsString(None)
val expectedResults =
s"""|
Expand Down Expand Up @@ -1507,4 +1507,31 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
}

test("test shuffle manager version for databricks") {
val databricksWorkerInfo = buildWorkerInfoAsString(None)
val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0),
mutable.Map("spark.rapids.sql.enabled" -> "true",
"spark.plugins" -> "com.nvidia.spark.AnotherPlugin, com.nvidia.spark.SQLPlugin",
"spark.databricks.clusterUsageTags.sparkVersion" -> "11.3.x-gpu-ml-scala2.12"),
Some("3.3.0"), Seq())
// Do not set the platform as DB to see if it can work correctly irrespective
val autoTuner = AutoTuner.buildAutoTunerFromProps(databricksWorkerInfo,
infoProvider, PlatformFactory.createInstance())
val smVersion = autoTuner.getShuffleManagerClassName()
// Assert shuffle manager string for DB 11.3 tag
assert(smVersion == "com.nvidia.spark.rapids.spark330db.RapidsShuffleManager")
}

test("test shuffle manager version for non-databricks") {
val databricksWorkerInfo = buildWorkerInfoAsString(None)
val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0),
mutable.Map("spark.rapids.sql.enabled" -> "true",
"spark.plugins" -> "com.nvidia.spark.AnotherPlugin, com.nvidia.spark.SQLPlugin"),
Some("3.3.0"), Seq())
val autoTuner = AutoTuner.buildAutoTunerFromProps(databricksWorkerInfo,
infoProvider, PlatformFactory.createInstance())
val smVersion = autoTuner.getShuffleManagerClassName()
assert(smVersion == "com.nvidia.spark.rapids.spark330.RapidsShuffleManager")
}
}
Loading