Skip to content

Commit

Permalink
Unified platform handling and fetching of operator score files (#661)
Browse files Browse the repository at this point in the history
* Handle incorrect platform in core tools

Signed-off-by: Partho Sarthi <[email protected]>

* Add platform factory

Signed-off-by: Partho Sarthi <[email protected]>

* Add comments for default GPU types.

Signed-off-by: Partho Sarthi <[email protected]>

* Improve platform names in documentation

Signed-off-by: Partho Sarthi <[email protected]>

* Add docs

Signed-off-by: Partho Sarthi <[email protected]>

* Replace Map with pattern matching and rename PlatformTypes to PlatformNames

Signed-off-by: Partho Sarthi <[email protected]>

* Rename argument to platformName

Signed-off-by: Partho Sarthi <[email protected]>

---------

Signed-off-by: Partho Sarthi <[email protected]>
  • Loading branch information
parthosa authored Nov 20, 2023
1 parent cb34051 commit e612d79
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 124 deletions.
File renamed without changes.
156 changes: 156 additions & 0 deletions core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids.tool

import org.apache.spark.internal.Logging

/**
* Utility object containing constants for various platform names.
*/
object PlatformNames {
val DATABRICKS_AWS = "databricks-aws"
val DATABRICKS_AZURE = "databricks-azure"
val DATAPROC = "dataproc"
val DATAPROC_GKE_L4 = "dataproc-gke-l4"
val DATAPROC_GKE_T4 = "dataproc-gke-t4"
val DATAPROC_L4 = "dataproc-l4"
val DATAPROC_SL_L4 = "dataproc-serverless-l4"
val DATAPROC_T4 = "dataproc-t4"
val EMR = "emr"
val EMR_A10 = "emr-a10"
val EMR_T4 = "emr-t4"
val ONPREM = "onprem"

/**
* Return a list of all platform names.
*/
def getAllNames: List[String] = List(
DATABRICKS_AWS, DATABRICKS_AZURE, DATAPROC, DATAPROC_GKE_L4, DATAPROC_GKE_T4,
DATAPROC_L4, DATAPROC_SL_L4, DATAPROC_T4, EMR, EMR_A10, EMR_T4, ONPREM
)
}

/**
* Represents a platform and its associated recommendations.
*
* @param platformName Name of the platform. See [[PlatformNames]] for supported platform names.
*/
class Platform(platformName: String) {
/**
* Recommendations to be excluded from the list of recommendations.
* These have the highest priority.
*/
val recommendationsToExclude: Seq[String] = Seq.empty
/**
* Recommendations to be included in the final list of recommendations.
* These properties should be specific to the platform and not general Spark properties.
* For example: "spark.databricks.optimizer.dynamicFilePruning" for the Databricks platform.
*
* Represented as a tuple of (propertyKey, propertyValue).
*/
val recommendationsToInclude: Seq[(String, String)] = Seq.empty
/**
* Dynamically calculates the recommendation for a specific Spark property by invoking
* the appropriate function based on `sparkProperty`.
* TODO: Implement this function and integrate with existing code in AutoTuner
*
* @param sparkProperty The Spark property for which the recommendation is calculated.
* @param args Variable list of arguments passed to the calculation function for dynamic
* processing.
* @return Optional string containing the recommendation, or `None` if unavailable.
*/
def getRecommendation(sparkProperty: String, args: Any*): Option[String] = None

/**
* Checks if the `property` is valid:
* 1. It should not be in exclusion list
* OR
* 2. It should be in the inclusion list
*/
def isValidRecommendation(property: String): Boolean = {
!recommendationsToExclude.contains(property) ||
recommendationsToInclude.map(_._1).contains(property)
}

/**
* Checks if the `comment` is valid:
* 1. It should not have any property from the exclusion list
*/
def isValidComment(comment: String): Boolean = {
recommendationsToExclude.forall(excluded => !comment.contains(excluded))
}

def getOperatorScoreFile: String = {
s"operatorsScore-$platformName.csv"
}
}

class DatabricksPlatform(platformType: String) extends Platform(platformType) {
override val recommendationsToExclude: Seq[String] = Seq(
"spark.executor.cores",
"spark.executor.instances",
"spark.executor.memory",
"spark.executor.memoryOverhead"
)
override val recommendationsToInclude: Seq[(String, String)] = Seq(
("spark.databricks.optimizer.dynamicFilePruning", "false")
)
}

class DataprocPlatform(platformType: String) extends Platform(platformType)

class EmrPlatform(platformType: String) extends Platform(platformType)

class OnPremPlatform extends Platform(PlatformNames.ONPREM)

/**
* Factory for creating instances of different platforms.
* This factory supports various platforms and provides methods for creating
* corresponding platform instances.
*/
object PlatformFactory extends Logging {
/**
* Creates an instance of a platform based on the specified platform key.
* If platform key is not defined, returns an instance of onprem platform.
*
* @param platformKey The key representing the desired platform.
* @return An instance of the specified platform.
* @throws IllegalArgumentException if the specified platform key is not supported.
*/
def createInstance(platformKey: String): Platform = {
platformKey match {
case PlatformNames.DATABRICKS_AWS => new DatabricksPlatform(PlatformNames.DATABRICKS_AWS)
case PlatformNames.DATABRICKS_AZURE => new DatabricksPlatform(PlatformNames.DATABRICKS_AZURE)
case PlatformNames.DATAPROC | PlatformNames.DATAPROC_T4 =>
// if no GPU specified, then default to dataproc-t4 for backward compatibility
new DataprocPlatform(PlatformNames.DATAPROC_T4)
case PlatformNames.DATAPROC_L4 => new DataprocPlatform(PlatformNames.DATAPROC_L4)
case PlatformNames.DATAPROC_SL_L4 => new DataprocPlatform(PlatformNames.DATAPROC_SL_L4)
case PlatformNames.DATAPROC_GKE_L4 => new DataprocPlatform(PlatformNames.DATAPROC_GKE_L4)
case PlatformNames.DATAPROC_GKE_T4 => new DataprocPlatform(PlatformNames.DATAPROC_GKE_T4)
case PlatformNames.EMR | PlatformNames.EMR_T4 =>
// if no GPU specified, then default to emr-t4 for backward compatibility
new EmrPlatform(PlatformNames.EMR_T4)
case PlatformNames.EMR_A10 => new EmrPlatform(PlatformNames.EMR_A10)
case PlatformNames.ONPREM => new OnPremPlatform
case p if p.isEmpty =>
logInfo(s"Platform is not specified. Using ${PlatformNames.ONPREM} as default.")
new OnPremPlatform
case _ => throw new IllegalArgumentException(s"Unsupported platform: $platformKey. " +
s"Options include ${PlatformNames.getAllNames.mkString(", ")}.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.util.matching.Regex

import com.nvidia.spark.rapids.tool.{Platform, PlatformFactory}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}
import org.yaml.snakeyaml.{DumperOptions, LoaderOptions, Yaml}
Expand Down Expand Up @@ -343,12 +344,7 @@ class AutoTuner(
private val limitedLogicRecommendations: mutable.HashSet[String] = mutable.HashSet[String]()
// When enabled, the profiler recommendations should only include updated settings.
private var filterByUpdatedPropertiesEnabled: Boolean = true
val selectedPlatform: Platform = platform match {
case "databricks" => new DatabricksPlatform()
case "dataproc" => new DataprocPlatform()
case "emr" => new EmrPlatform()
case "onprem" => new OnPremPlatform()
}
val selectedPlatform: Platform = PlatformFactory.createInstance(platform)

private def isCalculationEnabled(prop: String) : Boolean = {
!limitedLogicRecommendations.contains(prop)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.nvidia.spark.rapids.tool.profiling

import com.nvidia.spark.rapids.tool.PlatformNames
import org.rogach.scallop.{ScallopConf, ScallopOption}
import org.rogach.scallop.exceptions.ScallopException

Expand Down Expand Up @@ -70,9 +71,8 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
val platform: ScallopOption[String] =
opt[String](required = false,
descr = "Cluster platform where Spark GPU workloads were executed. Options include " +
"onprem, dataproc, emr, databricks." +
" Default is onprem.",
default = Some(Profiler.DEFAULT_PLATFORM))
s"${PlatformNames.getAllNames.mkString(", ")}. Default is ${PlatformNames.ONPREM}.",
default = Some(PlatformNames.ONPREM))
val generateTimeline: ScallopOption[Boolean] =
opt[Boolean](required = false,
descr = "Write an SVG graph out for the full application timeline.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal

import com.nvidia.spark.rapids.ThreadFactoryBuilder
import com.nvidia.spark.rapids.tool.{EventLogInfo, EventLogPathProcessor}
import com.nvidia.spark.rapids.tool.{EventLogInfo, EventLogPathProcessor, PlatformNames}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -548,7 +548,7 @@ object Profiler {
val COMPARE_LOG_FILE_NAME_PREFIX = "rapids_4_spark_tools_compare"
val COMBINED_LOG_FILE_NAME_PREFIX = "rapids_4_spark_tools_combined"
val SUBDIR = "rapids_4_spark_profile"
val DEFAULT_PLATFORM = "onprem"
val DEFAULT_PLATFORM: String = PlatformNames.ONPREM

def getAutoTunerResultsAsString(props: Seq[RecommendedPropertyResult],
comments: Seq[RecommendedCommentResult]): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import scala.collection.mutable.{ArrayBuffer,HashMap}
import scala.io.{BufferedSource, Source}
import scala.util.control.NonFatal

import com.nvidia.spark.rapids.tool.PlatformFactory
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

Expand All @@ -44,16 +45,6 @@ class PluginTypeChecker(platform: String = "onprem",
private val NA = "NA"

private val DEFAULT_DS_FILE = "supportedDataSource.csv"
private val OPERATORS_SCORE_FILE_ONPREM = "operatorsScore.csv"
private val OPERATORS_SCORE_FILE_DATAPROC_T4 = "operatorsScore-dataproc-t4.csv"
private val OPERATORS_SCORE_FILE_DATAPROC_L4 = "operatorsScore-dataproc-l4.csv"
private val OPERATORS_SCORE_FILE_DATAPROC_SL_L4 = "operatorsScore-dataproc-serverless-l4.csv"
private val OPERATORS_SCORE_FILE_DATAPROC_GKE_T4 = "operatorsScore-dataproc-gke-t4.csv"
private val OPERATORS_SCORE_FILE_DATAPROC_GKE_L4 = "operatorsScore-dataproc-gke-l4.csv"
private val OPERATORS_SCORE_FILE_EMR_T4 = "operatorsScore-emr-t4.csv"
private val OPERATORS_SCORE_FILE_EMR_A10 = "operatorsScore-emr-a10.csv"
private val OPERATORS_SCORE_FILE_DATABRICKS_AWS = "operatorsScore-databricks-aws.csv"
private val OPERATORS_SCORE_FILE_DATABRICKS_AZURE = "operatorsScore-databricks-azure.csv"
private val SUPPORTED_EXECS_FILE = "supportedExecs.csv"
private val SUPPORTED_EXPRS_FILE = "supportedExprs.csv"

Expand Down Expand Up @@ -101,20 +92,7 @@ class PluginTypeChecker(platform: String = "onprem",
speedupFactorFile match {
case None =>
logInfo(s"Reading operators scores with platform: $platform")
val file = platform match {
// if no GPU specified, then default to dataproc-t4 for backward compatibility
case "dataproc-t4" | "dataproc" => OPERATORS_SCORE_FILE_DATAPROC_T4
case "dataproc-l4" => OPERATORS_SCORE_FILE_DATAPROC_L4
case "dataproc-serverless-l4" => OPERATORS_SCORE_FILE_DATAPROC_SL_L4
case "dataproc-gke-t4" => OPERATORS_SCORE_FILE_DATAPROC_GKE_T4
case "dataproc-gke-l4" => OPERATORS_SCORE_FILE_DATAPROC_GKE_L4
// if no GPU specified, then default to emr-t4 for backward compatibility
case "emr-t4" | "emr" => OPERATORS_SCORE_FILE_EMR_T4
case "emr-a10" => OPERATORS_SCORE_FILE_EMR_A10
case "databricks-aws" => OPERATORS_SCORE_FILE_DATABRICKS_AWS
case "databricks-azure" => OPERATORS_SCORE_FILE_DATABRICKS_AZURE
case _ => OPERATORS_SCORE_FILE_ONPREM
}
val file = PlatformFactory.createInstance(platform).getOperatorScoreFile
val source = Source.fromResource(file)
readSupportedOperators(source, "score").map(x => (x._1, x._2.toDouble))
case Some(file) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.nvidia.spark.rapids.tool.qualification

import com.nvidia.spark.rapids.tool.PlatformNames
import org.rogach.scallop.{ScallopConf, ScallopOption}
import org.rogach.scallop.exceptions.ScallopException

Expand Down Expand Up @@ -155,10 +156,8 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
val platform: ScallopOption[String] =
opt[String](required = false,
descr = "Cluster platform where Spark CPU workloads were executed. Options include " +
"onprem, dataproc-t4, dataproc-l4, dataproc-serverless-l4, dataproc-gke-t4, " +
"dataproc-gke-l4, emr-t4, emr-a10, databricks-aws, and databricks-azure. Default " +
"is onprem.",
default = Some("onprem"))
s"${PlatformNames.getAllNames.mkString(", ")}. Default is ${PlatformNames.ONPREM}.",
default = Some(PlatformNames.ONPREM))
val speedupFactorFile: ScallopOption[String] =
opt[String](required = false,
descr = "Custom speedup factor file used to get estimated GPU speedup that is specific " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1283,10 +1283,10 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
assert(expectedResults == autoTunerOutput)
}

test("test recommendations for databricks platform argument") {
test("test recommendations for databricks-aws platform argument") {
val databricksWorkerInfo = buildWorkerInfoAsString()
val autoTuner = AutoTuner.buildAutoTunerFromProps(databricksWorkerInfo,
getGpuAppMockInfoProvider, "databricks")
getGpuAppMockInfoProvider, "databricks-aws")
val (properties, comments) = autoTuner.getRecommendedProperties()

// Assert recommendations are excluded in properties
Expand Down

0 comments on commit e612d79

Please sign in to comment.