Skip to content

Commit

Permalink
Add recommendations in AutoTuner from driver logs
Browse files Browse the repository at this point in the history
Signed-off-by: Partho Sarthi <[email protected]>
  • Loading branch information
parthosa committed Nov 21, 2023
1 parent 0ff873b commit c128919
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ class RecommendationEntry(val name: String,
class AutoTuner(
val clusterProps: ClusterProperties,
val appInfoProvider: AppSummaryInfoBaseProvider,
val platform: Platform) extends Logging {
val platform: Platform,
unsupportedOperators: Seq[DriverLogUnsupportedOperators]) extends Logging {

import AutoTuner._

Expand All @@ -350,11 +351,9 @@ class AutoTuner(
}

def getPropertyValue(key: String): Option[String] = {
val fromProfile = appInfoProvider.getProperty(key)
fromProfile match {
case None => Option(clusterProps.softwareProperties.get(key))
case Some(_) => fromProfile
}
val fromProfile = Option(appInfoProvider).flatMap(_.getProperty(key))
// If the value is not found above, fallback to cluster properties
fromProfile.orElse(Option(clusterProps.softwareProperties.get(key)))
}

def initRecommendations(): Unit = {
Expand Down Expand Up @@ -819,6 +818,23 @@ class AutoTuner(
appendRecommendation("spark.sql.shuffle.partitions", s"$shufflePartitions")
}

/**
* Analyzes unsupported driver logs and generates recommendations for configuration properties.
*/
private def recommendFromDriverLogs(): Unit = {
val doc_url = "https://nvidia.github.io/spark-rapids/docs/additional-functionality/" +
"advanced_configs.html#advanced-configuration"
// Iterate through unsupported operators' reasons and check for matching properties
unsupportedOperators.map(_.reason).foreach { operatorReason =>
recommendationsFromDriverLogs.collect {
case (config, recommendedValue) if operatorReason.contains(config) =>
appendRecommendation(config, recommendedValue)
appendComment(s"Using $config does not guarantee to produce the same results as CPU. " +
s"Please refer to $doc_url")
}
}
}

def appendOptionalComment(lookup: String, comment: String): Unit = {
if (!skippedRecommendations.contains(lookup)) {
appendComment(comment)
Expand Down Expand Up @@ -921,6 +937,9 @@ class AutoTuner(
case (property, value) => appendRecommendation(property, value)
}
}
if(unsupportedOperators.nonEmpty) {
recommendFromDriverLogs()
}
(toRecommendationsProfileResult, toCommentProfileResult)
}
}
Expand Down Expand Up @@ -1017,15 +1036,22 @@ object AutoTuner extends Logging {
" If the Spark RAPIDS jar is being bundled with your Spark\n" +
" distribution, this step is not needed.")
)

// Recommended values for specific unsupported configurations
private val recommendationsFromDriverLogs: Map[String, String] = Map(
"spark.rapids.sql.incompatibleDateFormats.enabled" -> "true"
)

// the plugin jar is in the form of rapids-4-spark_scala_binary-(version)-*.jar
val pluginJarRegEx: Regex = "rapids-4-spark_\\d\\.\\d+-(\\d{2}\\.\\d{2}\\.\\d+).*\\.jar".r

private def handleException(
ex: Exception,
appInfo: AppSummaryInfoBaseProvider,
platform: Platform): AutoTuner = {
platform: Platform,
unsupportedOperators: Seq[DriverLogUnsupportedOperators]): AutoTuner = {
logError("Exception: " + ex.getStackTrace.mkString("Array(", ", ", ")"))
val tuning = new AutoTuner(new ClusterProperties(), appInfo, platform)
val tuning = new AutoTuner(new ClusterProperties(), appInfo, platform, unsupportedOperators)
val msg = ex match {
case cEx: ConstructorException => cEx.getContext
case _ => if (ex.getCause != null) ex.getCause.toString else ex.toString
Expand Down Expand Up @@ -1075,26 +1101,30 @@ object AutoTuner extends Logging {
def buildAutoTunerFromProps(
clusterProps: String,
singleAppProvider: AppSummaryInfoBaseProvider,
platform: Platform = PlatformFactory.getDefault): AutoTuner = {
platform: Platform = PlatformFactory.getDefault,
unsupportedOperators: Seq[DriverLogUnsupportedOperators] = Seq.empty): AutoTuner = {
try {
val clusterPropsOpt = loadClusterPropertiesFromContent(clusterProps)
new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider, platform)
new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider, platform,
unsupportedOperators)
} catch {
case e: Exception =>
handleException(e, singleAppProvider, platform)
handleException(e, singleAppProvider, platform, unsupportedOperators)
}
}

def buildAutoTuner(
filePath: String,
singleAppProvider: AppSummaryInfoBaseProvider,
platform: Platform = PlatformFactory.getDefault): AutoTuner = {
platform: Platform = PlatformFactory.getDefault,
unsupportedOperators: Seq[DriverLogUnsupportedOperators] = Seq.empty): AutoTuner = {
try {
val clusterPropsOpt = loadClusterProps(filePath)
new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider, platform)
new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider, platform,
unsupportedOperators)
} catch {
case e: Exception =>
handleException(e, singleAppProvider, platform)
handleException(e, singleAppProvider, platform, unsupportedOperators)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ object ProfileMain extends Logging {
}

val profiler = new Profiler(hadoopConf, appArgs, enablePB)
profiler.profile(eventLogFsFiltered)
if (driverLog.nonEmpty){
profiler.profileDriver(driverLog)
if (driverLog.nonEmpty) {
profiler.profileDriver(driverLog, eventLogFsFiltered.isEmpty)
}
profiler.profile(eventLogFsFiltered)
(0, filteredLogs.size)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea

private val useAutoTuner: Boolean = appArgs.autoTuner()
private var progressBar: Option[ConsoleProgressBar] = None
private var unsupportedDriverOperators: Seq[DriverLogUnsupportedOperators] = Seq.empty

logInfo(s"Threadpool size is $nThreads")

Expand Down Expand Up @@ -124,15 +125,19 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
progressBar.foreach(_.finishAll())
}

def profileDriver(driverLogInfos: String): Unit = {
def profileDriver(driverLogInfos: String, eventLogsEmpty: Boolean): Unit = {
val profileOutputWriter = new ProfileOutputWriter(s"$outputDir/driver",
Profiler.DRIVER_LOG_NAME, numOutputRows, true)

try {
val driverLogProcessor = new DriverLogProcessor(driverLogInfos)
val unsupportedDrivers = driverLogProcessor.processDriverLog()
unsupportedDriverOperators = driverLogProcessor.processDriverLog()
profileOutputWriter.write(s"Unsupported operators in driver log",
unsupportedDrivers)
unsupportedDriverOperators)
if (eventLogsEmpty && useAutoTuner) {
val (properties, comments) = runAutoTuner(profileOutputWriter, None)
profileOutputWriter.writeText("\n### A. Recommended Configuration ###\n")
profileOutputWriter.writeText(Profiler.getAutoTunerResultsAsString(properties, comments))
}
} finally {
profileOutputWriter.close()
}
Expand Down Expand Up @@ -403,6 +408,27 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
appLogPath, ioAnalysisMetrics), compareRes)
}

/**
* Runs the AutoTuner and writes the recommended configuration to the specified output writer.
*
* @param profileOutputWriter The output writer for writing the AutoTuner results.
* @param appInfo Summary of the application for tuning.
*/
private def runAutoTuner(profileOutputWriter: ProfileOutputWriter,
appInfo: Option[ApplicationSummaryInfo]): (Seq[RecommendedPropertyResult],
Seq[RecommendedCommentResult]) = {
val appInfoProvider = appInfo.map(new SingleAppSummaryInfoProvider(_)).orNull
val workerInfoPath = appArgs.workerInfo.getOrElse(AutoTuner.DEFAULT_WORKER_INFO_PATH)
val platform = appArgs.platform()
val autoTuner: AutoTuner = AutoTuner.buildAutoTuner(workerInfoPath, appInfoProvider,
PlatformFactory.createInstance(platform), unsupportedDriverOperators)

// The autotuner allows skipping some properties,
// e.g., getRecommendedProperties(Some(Seq("spark.executor.instances"))) skips the
// recommendation related to executor instances.
autoTuner.getRecommendedProperties()
}

def writeOutput(profileOutputWriter: ProfileOutputWriter,
appsSum: Seq[ApplicationSummaryInfo], outputCombined: Boolean,
comparedRes: Option[CompareSummaryInfo] = None): Unit = {
Expand Down Expand Up @@ -464,7 +490,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
} else {
appsSum
}
sums.foreach { app =>
sums.foreach { app: ApplicationSummaryInfo =>
profileOutputWriter.writeText("### A. Information Collected ###")
profileOutputWriter.write("Application Information", app.appInfo)
profileOutputWriter.write("Application Log Path Mapping", app.appLogPath)
Expand Down Expand Up @@ -510,15 +536,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
Some("Unsupported SQL Ops"))

if (useAutoTuner) {
val workerInfoPath = appArgs.workerInfo.getOrElse(AutoTuner.DEFAULT_WORKER_INFO_PATH)
val platform = appArgs.platform()
val autoTuner: AutoTuner = AutoTuner.buildAutoTuner(workerInfoPath,
new SingleAppSummaryInfoProvider(app),
PlatformFactory.createInstance(platform))
// the autotuner allows skipping some properties
// e.g. getRecommendedProperties(Some(Seq("spark.executor.instances"))) skips the
// recommendation related to executor instances.
val (properties, comments) = autoTuner.getRecommendedProperties()
val (properties, comments) = runAutoTuner(profileOutputWriter, Some(app))
profileOutputWriter.writeText("\n### D. Recommended Configuration ###\n")
profileOutputWriter.writeText(Profiler.getAutoTunerResultsAsString(properties, comments))
}
Expand Down

0 comments on commit c128919

Please sign in to comment.