From e612d79426d80a4d7b48a3492bb4c8b526aff6df Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Mon, 20 Nov 2023 10:00:38 -0800 Subject: [PATCH] Unified platform handling and fetching of operator score files (#661) * Handle incorrect platform in core tools Signed-off-by: Partho Sarthi * Add platform factory Signed-off-by: Partho Sarthi * Add comments for default GPU types. Signed-off-by: Partho Sarthi * Improve platform names in documentation Signed-off-by: Partho Sarthi * Add docs Signed-off-by: Partho Sarthi * Replace Map with pattern matching and rename PlatformTypes to PlatformNames Signed-off-by: Partho Sarthi * Rename argument to platformName Signed-off-by: Partho Sarthi --------- Signed-off-by: Partho Sarthi --- ...orsScore.csv => operatorsScore-onprem.csv} | 0 .../nvidia/spark/rapids/tool/Platform.scala | 156 ++++++++++++++++++ .../rapids/tool/profiling/AutoTuner.scala | 8 +- .../rapids/tool/profiling/Platform.scala | 83 ---------- .../rapids/tool/profiling/ProfileArgs.scala | 6 +- .../rapids/tool/profiling/Profiler.scala | 4 +- .../qualification/PluginTypeChecker.scala | 26 +-- .../qualification/QualificationArgs.scala | 7 +- .../tool/profiling/AutoTunerSuite.scala | 4 +- 9 files changed, 170 insertions(+), 124 deletions(-) rename core/src/main/resources/{operatorsScore.csv => operatorsScore-onprem.csv} (100%) create mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Platform.scala diff --git a/core/src/main/resources/operatorsScore.csv b/core/src/main/resources/operatorsScore-onprem.csv similarity index 100% rename from core/src/main/resources/operatorsScore.csv rename to core/src/main/resources/operatorsScore-onprem.csv diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala new file mode 100644 index 000000000..b38b7db14 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala @@ -0,0 +1,156 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.nvidia.spark.rapids.tool + +import org.apache.spark.internal.Logging + +/** + * Utility object containing constants for various platform names. + */ +object PlatformNames { + val DATABRICKS_AWS = "databricks-aws" + val DATABRICKS_AZURE = "databricks-azure" + val DATAPROC = "dataproc" + val DATAPROC_GKE_L4 = "dataproc-gke-l4" + val DATAPROC_GKE_T4 = "dataproc-gke-t4" + val DATAPROC_L4 = "dataproc-l4" + val DATAPROC_SL_L4 = "dataproc-serverless-l4" + val DATAPROC_T4 = "dataproc-t4" + val EMR = "emr" + val EMR_A10 = "emr-a10" + val EMR_T4 = "emr-t4" + val ONPREM = "onprem" + + /** + * Return a list of all platform names. + */ + def getAllNames: List[String] = List( + DATABRICKS_AWS, DATABRICKS_AZURE, DATAPROC, DATAPROC_GKE_L4, DATAPROC_GKE_T4, + DATAPROC_L4, DATAPROC_SL_L4, DATAPROC_T4, EMR, EMR_A10, EMR_T4, ONPREM + ) +} + +/** + * Represents a platform and its associated recommendations. + * + * @param platformName Name of the platform. See [[PlatformNames]] for supported platform names. + */ +class Platform(platformName: String) { + /** + * Recommendations to be excluded from the list of recommendations. + * These have the highest priority. + */ + val recommendationsToExclude: Seq[String] = Seq.empty + /** + * Recommendations to be included in the final list of recommendations. + * These properties should be specific to the platform and not general Spark properties. + * For example: "spark.databricks.optimizer.dynamicFilePruning" for the Databricks platform. + * + * Represented as a tuple of (propertyKey, propertyValue). + */ + val recommendationsToInclude: Seq[(String, String)] = Seq.empty + /** + * Dynamically calculates the recommendation for a specific Spark property by invoking + * the appropriate function based on `sparkProperty`. + * TODO: Implement this function and integrate with existing code in AutoTuner + * + * @param sparkProperty The Spark property for which the recommendation is calculated. + * @param args Variable list of arguments passed to the calculation function for dynamic + * processing. + * @return Optional string containing the recommendation, or `None` if unavailable. + */ + def getRecommendation(sparkProperty: String, args: Any*): Option[String] = None + + /** + * Checks if the `property` is valid: + * 1. It should not be in exclusion list + * OR + * 2. It should be in the inclusion list + */ + def isValidRecommendation(property: String): Boolean = { + !recommendationsToExclude.contains(property) || + recommendationsToInclude.map(_._1).contains(property) + } + + /** + * Checks if the `comment` is valid: + * 1. It should not have any property from the exclusion list + */ + def isValidComment(comment: String): Boolean = { + recommendationsToExclude.forall(excluded => !comment.contains(excluded)) + } + + def getOperatorScoreFile: String = { + s"operatorsScore-$platformName.csv" + } +} + +class DatabricksPlatform(platformType: String) extends Platform(platformType) { + override val recommendationsToExclude: Seq[String] = Seq( + "spark.executor.cores", + "spark.executor.instances", + "spark.executor.memory", + "spark.executor.memoryOverhead" + ) + override val recommendationsToInclude: Seq[(String, String)] = Seq( + ("spark.databricks.optimizer.dynamicFilePruning", "false") + ) +} + +class DataprocPlatform(platformType: String) extends Platform(platformType) + +class EmrPlatform(platformType: String) extends Platform(platformType) + +class OnPremPlatform extends Platform(PlatformNames.ONPREM) + +/** + * Factory for creating instances of different platforms. + * This factory supports various platforms and provides methods for creating + * corresponding platform instances. + */ +object PlatformFactory extends Logging { + /** + * Creates an instance of a platform based on the specified platform key. + * If platform key is not defined, returns an instance of onprem platform. + * + * @param platformKey The key representing the desired platform. + * @return An instance of the specified platform. + * @throws IllegalArgumentException if the specified platform key is not supported. + */ + def createInstance(platformKey: String): Platform = { + platformKey match { + case PlatformNames.DATABRICKS_AWS => new DatabricksPlatform(PlatformNames.DATABRICKS_AWS) + case PlatformNames.DATABRICKS_AZURE => new DatabricksPlatform(PlatformNames.DATABRICKS_AZURE) + case PlatformNames.DATAPROC | PlatformNames.DATAPROC_T4 => + // if no GPU specified, then default to dataproc-t4 for backward compatibility + new DataprocPlatform(PlatformNames.DATAPROC_T4) + case PlatformNames.DATAPROC_L4 => new DataprocPlatform(PlatformNames.DATAPROC_L4) + case PlatformNames.DATAPROC_SL_L4 => new DataprocPlatform(PlatformNames.DATAPROC_SL_L4) + case PlatformNames.DATAPROC_GKE_L4 => new DataprocPlatform(PlatformNames.DATAPROC_GKE_L4) + case PlatformNames.DATAPROC_GKE_T4 => new DataprocPlatform(PlatformNames.DATAPROC_GKE_T4) + case PlatformNames.EMR | PlatformNames.EMR_T4 => + // if no GPU specified, then default to emr-t4 for backward compatibility + new EmrPlatform(PlatformNames.EMR_T4) + case PlatformNames.EMR_A10 => new EmrPlatform(PlatformNames.EMR_A10) + case PlatformNames.ONPREM => new OnPremPlatform + case p if p.isEmpty => + logInfo(s"Platform is not specified. Using ${PlatformNames.ONPREM} as default.") + new OnPremPlatform + case _ => throw new IllegalArgumentException(s"Unsupported platform: $platformKey. " + + s"Options include ${PlatformNames.getAllNames.mkString(", ")}.") + } + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala index 9b78a5b41..5e3475604 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala @@ -25,6 +25,7 @@ import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.util.matching.Regex +import com.nvidia.spark.rapids.tool.{Platform, PlatformFactory} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path} import org.yaml.snakeyaml.{DumperOptions, LoaderOptions, Yaml} @@ -343,12 +344,7 @@ class AutoTuner( private val limitedLogicRecommendations: mutable.HashSet[String] = mutable.HashSet[String]() // When enabled, the profiler recommendations should only include updated settings. private var filterByUpdatedPropertiesEnabled: Boolean = true - val selectedPlatform: Platform = platform match { - case "databricks" => new DatabricksPlatform() - case "dataproc" => new DataprocPlatform() - case "emr" => new EmrPlatform() - case "onprem" => new OnPremPlatform() - } + val selectedPlatform: Platform = PlatformFactory.createInstance(platform) private def isCalculationEnabled(prop: String) : Boolean = { !limitedLogicRecommendations.contains(prop) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Platform.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Platform.scala deleted file mode 100644 index 46c6bc8e0..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Platform.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright (c) 2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.nvidia.spark.rapids.tool.profiling - -/** - * Represents a platform and its associated recommendations. - */ -class Platform { - /** - * Recommendations to be excluded from the list of recommendations. - * These have the highest priority. - */ - val recommendationsToExclude: Seq[String] = Seq.empty - /** - * Recommendations to be included in the final list of recommendations. - * These properties should be specific to the platform and not general Spark properties. - * For example: "spark.databricks.optimizer.dynamicFilePruning" for the Databricks platform. - * - * Represented as a tuple of (propertyKey, propertyValue). - */ - val recommendationsToInclude: Seq[(String, String)] = Seq.empty - /** - * Dynamically calculates the recommendation for a specific Spark property by invoking - * the appropriate function based on `sparkProperty`. - * TODO: Implement this function and integrate with existing code in AutoTuner - * - * @param sparkProperty The Spark property for which the recommendation is calculated. - * @param args Variable list of arguments passed to the calculation function for dynamic - * processing. - * @return Optional string containing the recommendation, or `None` if unavailable. - */ - def getRecommendation(sparkProperty: String, args: Any*): Option[String] = None - - /** - * Checks if the `property` is valid: - * 1. It should not be in exclusion list - * OR - * 2. It should be in the inclusion list - */ - def isValidRecommendation(property: String): Boolean = { - !recommendationsToExclude.contains(property) || - recommendationsToInclude.map(_._1).contains(property) - } - - /** - * Checks if the `comment` is valid: - * 1. It should not have any property from the exclusion list - */ - def isValidComment(comment: String): Boolean = { - recommendationsToExclude.forall(excluded => !comment.contains(excluded)) - } -} - -class DatabricksPlatform extends Platform { - override val recommendationsToExclude: Seq[String] = Seq( - "spark.executor.cores", - "spark.executor.instances", - "spark.executor.memory", - "spark.executor.memoryOverhead" - ) - override val recommendationsToInclude: Seq[(String, String)] = Seq( - ("spark.databricks.optimizer.dynamicFilePruning", "false") - ) -} - -class DataprocPlatform extends Platform {} - -class EmrPlatform extends Platform {} - -class OnPremPlatform extends Platform {} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileArgs.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileArgs.scala index b1044a4ed..c40d6bdca 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileArgs.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileArgs.scala @@ -15,6 +15,7 @@ */ package com.nvidia.spark.rapids.tool.profiling +import com.nvidia.spark.rapids.tool.PlatformNames import org.rogach.scallop.{ScallopConf, ScallopOption} import org.rogach.scallop.exceptions.ScallopException @@ -70,9 +71,8 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* val platform: ScallopOption[String] = opt[String](required = false, descr = "Cluster platform where Spark GPU workloads were executed. Options include " + - "onprem, dataproc, emr, databricks." + - " Default is onprem.", - default = Some(Profiler.DEFAULT_PLATFORM)) + s"${PlatformNames.getAllNames.mkString(", ")}. Default is ${PlatformNames.ONPREM}.", + default = Some(PlatformNames.ONPREM)) val generateTimeline: ScallopOption[Boolean] = opt[Boolean](required = false, descr = "Write an SVG graph out for the full application timeline.") diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala index 44528966b..57108ec33 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.util.control.NonFatal import com.nvidia.spark.rapids.ThreadFactoryBuilder -import com.nvidia.spark.rapids.tool.{EventLogInfo, EventLogPathProcessor} +import com.nvidia.spark.rapids.tool.{EventLogInfo, EventLogPathProcessor, PlatformNames} import org.apache.hadoop.conf.Configuration import org.apache.spark.internal.Logging @@ -548,7 +548,7 @@ object Profiler { val COMPARE_LOG_FILE_NAME_PREFIX = "rapids_4_spark_tools_compare" val COMBINED_LOG_FILE_NAME_PREFIX = "rapids_4_spark_tools_combined" val SUBDIR = "rapids_4_spark_profile" - val DEFAULT_PLATFORM = "onprem" + val DEFAULT_PLATFORM: String = PlatformNames.ONPREM def getAutoTunerResultsAsString(props: Seq[RecommendedPropertyResult], comments: Seq[RecommendedCommentResult]): String = { diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala index 7baf5455e..eead523d6 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala @@ -20,6 +20,7 @@ import scala.collection.mutable.{ArrayBuffer,HashMap} import scala.io.{BufferedSource, Source} import scala.util.control.NonFatal +import com.nvidia.spark.rapids.tool.PlatformFactory import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -44,16 +45,6 @@ class PluginTypeChecker(platform: String = "onprem", private val NA = "NA" private val DEFAULT_DS_FILE = "supportedDataSource.csv" - private val OPERATORS_SCORE_FILE_ONPREM = "operatorsScore.csv" - private val OPERATORS_SCORE_FILE_DATAPROC_T4 = "operatorsScore-dataproc-t4.csv" - private val OPERATORS_SCORE_FILE_DATAPROC_L4 = "operatorsScore-dataproc-l4.csv" - private val OPERATORS_SCORE_FILE_DATAPROC_SL_L4 = "operatorsScore-dataproc-serverless-l4.csv" - private val OPERATORS_SCORE_FILE_DATAPROC_GKE_T4 = "operatorsScore-dataproc-gke-t4.csv" - private val OPERATORS_SCORE_FILE_DATAPROC_GKE_L4 = "operatorsScore-dataproc-gke-l4.csv" - private val OPERATORS_SCORE_FILE_EMR_T4 = "operatorsScore-emr-t4.csv" - private val OPERATORS_SCORE_FILE_EMR_A10 = "operatorsScore-emr-a10.csv" - private val OPERATORS_SCORE_FILE_DATABRICKS_AWS = "operatorsScore-databricks-aws.csv" - private val OPERATORS_SCORE_FILE_DATABRICKS_AZURE = "operatorsScore-databricks-azure.csv" private val SUPPORTED_EXECS_FILE = "supportedExecs.csv" private val SUPPORTED_EXPRS_FILE = "supportedExprs.csv" @@ -101,20 +92,7 @@ class PluginTypeChecker(platform: String = "onprem", speedupFactorFile match { case None => logInfo(s"Reading operators scores with platform: $platform") - val file = platform match { - // if no GPU specified, then default to dataproc-t4 for backward compatibility - case "dataproc-t4" | "dataproc" => OPERATORS_SCORE_FILE_DATAPROC_T4 - case "dataproc-l4" => OPERATORS_SCORE_FILE_DATAPROC_L4 - case "dataproc-serverless-l4" => OPERATORS_SCORE_FILE_DATAPROC_SL_L4 - case "dataproc-gke-t4" => OPERATORS_SCORE_FILE_DATAPROC_GKE_T4 - case "dataproc-gke-l4" => OPERATORS_SCORE_FILE_DATAPROC_GKE_L4 - // if no GPU specified, then default to emr-t4 for backward compatibility - case "emr-t4" | "emr" => OPERATORS_SCORE_FILE_EMR_T4 - case "emr-a10" => OPERATORS_SCORE_FILE_EMR_A10 - case "databricks-aws" => OPERATORS_SCORE_FILE_DATABRICKS_AWS - case "databricks-azure" => OPERATORS_SCORE_FILE_DATABRICKS_AZURE - case _ => OPERATORS_SCORE_FILE_ONPREM - } + val file = PlatformFactory.createInstance(platform).getOperatorScoreFile val source = Source.fromResource(file) readSupportedOperators(source, "score").map(x => (x._1, x._2.toDouble)) case Some(file) => diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala index d998cb0ff..509402c9d 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala @@ -15,6 +15,7 @@ */ package com.nvidia.spark.rapids.tool.qualification +import com.nvidia.spark.rapids.tool.PlatformNames import org.rogach.scallop.{ScallopConf, ScallopOption} import org.rogach.scallop.exceptions.ScallopException @@ -155,10 +156,8 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* val platform: ScallopOption[String] = opt[String](required = false, descr = "Cluster platform where Spark CPU workloads were executed. Options include " + - "onprem, dataproc-t4, dataproc-l4, dataproc-serverless-l4, dataproc-gke-t4, " + - "dataproc-gke-l4, emr-t4, emr-a10, databricks-aws, and databricks-azure. Default " + - "is onprem.", - default = Some("onprem")) + s"${PlatformNames.getAllNames.mkString(", ")}. Default is ${PlatformNames.ONPREM}.", + default = Some(PlatformNames.ONPREM)) val speedupFactorFile: ScallopOption[String] = opt[String](required = false, descr = "Custom speedup factor file used to get estimated GPU speedup that is specific " + diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala index 36831386b..0428005f7 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala @@ -1283,10 +1283,10 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { assert(expectedResults == autoTunerOutput) } - test("test recommendations for databricks platform argument") { + test("test recommendations for databricks-aws platform argument") { val databricksWorkerInfo = buildWorkerInfoAsString() val autoTuner = AutoTuner.buildAutoTunerFromProps(databricksWorkerInfo, - getGpuAppMockInfoProvider, "databricks") + getGpuAppMockInfoProvider, "databricks-aws") val (properties, comments) = autoTuner.getRecommendedProperties() // Assert recommendations are excluded in properties