Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Profiler autotuner should only specify standard Spark versions for shuffle manager setting #662

Merged
merged 5 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.util.matching.Regex

import com.nvidia.spark.rapids.tool.{Platform, PlatformFactory}
import com.nvidia.spark.rapids.tool.{Platform, PlatformFactory, PlatformNames}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}
import org.yaml.snakeyaml.{DumperOptions, LoaderOptions, Yaml}
Expand Down Expand Up @@ -591,9 +591,8 @@ class AutoTuner(
}

def calculateJobLevelRecommendations(): Unit = {
val shuffleManagerVersion = appInfoProvider.getSparkVersion.get.filterNot("().".toSet)
appendRecommendation("spark.shuffle.manager",
"com.nvidia.spark.rapids.spark" + shuffleManagerVersion + ".RapidsShuffleManager")
val smClassName = getShuffleManagerClassName
appendRecommendation("spark.shuffle.manager", smClassName)
appendComment(classPathComments("rapids.shuffle.jars"))

recommendFileCache()
Expand All @@ -603,6 +602,21 @@ class AutoTuner(
recommendClassPathEntries()
}

def getShuffleManagerClassName() : String = {
val shuffleManagerVersion = appInfoProvider.getSparkVersion.get.filterNot("().".toSet)
val finalShuffleVersion : String = if (platform.getName == PlatformNames.DATABRICKS_AWS
|| platform.getName == PlatformNames.DATABRICKS_AZURE) {
val dbVersion = appInfoProvider.getProperty(
"spark.databricks.clusterUsageTags.sparkVersion").getOrElse("")
dbVersion match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the if-condition, I would think that we should not rely on the platform input.
Instead we look for a property that distinguished eventlogs generated on Databricks.
For example,

  • if this configuration spark.databricks.clusterUsageTags.sparkVersion is always set for all Databricks executions, then all we need to do is to check that this config entry is not emptyt
  • If the above is not always true, then we can use pattern match of SparkVersion pulled from the app configuration indicates it is a databricks-spark

Deciding the platform from the configuration is going to be a good step toward improving the analysis. We are going to need that anyway.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I sort of alluded to that when I first put the PR up. I like trying out option#1 above. Will update.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

case ver if ver.contains("10.4") => "321db"
case ver if ver.contains("11.3") => "330db"
case _ => "332db"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good start. We can have this list hardcoded for this PR.
However moving forward, it is going to maintain the code everytime a new DB class is added.

Perhaps later we can create a static properties file to map between db and plugin versions? At least we won't need to change scala code for that. Also, we can automate a script whenever a new class is added in the plugin repo. WDYT?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely agree. This needs to be maintained some place. Do we have examples of static properties files elsewhere at the moment? If not, we can start with this property. Automating is more interesting as adding a new class may not be a complete indicator that we need a new config key, DB in particular might fit that use case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started the seed to this infrastructure in the existing PR #705 (loadPropertiesFile)
My plan is to build on that by adding trait configurable that will load the the properties file from a static file.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I address the change here as part of this PR or are we good with this for now?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good for this PR.

}
} else shuffleManagerVersion
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be more readable

} else {
  shuffleManagerVersion
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

"com.nvidia.spark.rapids.spark" + finalShuffleVersion + ".RapidsShuffleManager"
}

/**
* Checks whether the cluster properties are valid.
* If the cluster worker-info is missing entries (i.e., CPU and GPU count), it sets the entries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
val systemProperties = customProps match {
case None => mutable.Map[String, String]()
case Some(newProps) => newProps
}
}
val convertedMap = new util.LinkedHashMap[String, String](systemProperties.asJava)
val clusterProps = new ClusterProperties(cpuSystem, gpuWorkerProps, convertedMap)
// set the options to convert the object into formatted yaml content
Expand Down Expand Up @@ -654,7 +654,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
assert(expectedResults == autoTunerOutput)
}

test("test AutoTuner with empty sparkProperties" ) {
test("test AutoTuner with empty sparkProperties") {
val dataprocWorkerInfo = buildWorkerInfoAsString(None)
val expectedResults =
s"""|
Expand Down Expand Up @@ -1507,4 +1507,20 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
}
}

test("test shuffle manager version for databricks") {
val customProps = mutable.LinkedHashMap(
"spark.databricks.clusterUsageTags.sparkVersion" -> "11.3.x-gpu-ml-scala2.12")
val databricksWorkerInfo = buildWorkerInfoAsString(Some(customProps))
val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0),
mutable.Map("spark.rapids.sql.enabled" -> "true",
"spark.plugins" -> "com.nvidia.spark.AnotherPlugin, com.nvidia.spark.SQLPlugin",
"spark.databricks.clusterUsageTags.sparkVersion" -> "11.3.x-gpu-ml-scala2.12"),
Some("3.3.0"), Seq())
val autoTuner = AutoTuner.buildAutoTunerFromProps(databricksWorkerInfo,
infoProvider, PlatformFactory.createInstance(PlatformNames.DATABRICKS_AZURE))
val smVersion = autoTuner.getShuffleManagerClassName()
// Assert shuffle manager string for DB 11.3 tag
assert(smVersion == "com.nvidia.spark.rapids.spark330db.RapidsShuffleManager")
}
}
amahussein marked this conversation as resolved.
Show resolved Hide resolved
Loading