Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unified platform handling and fetching of operator score files #661

Merged
merged 7 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids.tool

import org.apache.spark.internal.Logging

/**
* Utility object containing constants for various platform names.
*/
object PlatformNames {
val DATABRICKS_AWS = "databricks-aws"
val DATABRICKS_AZURE = "databricks-azure"
val DATAPROC = "dataproc"
val DATAPROC_GKE_L4 = "dataproc-gke-l4"
val DATAPROC_GKE_T4 = "dataproc-gke-t4"
val DATAPROC_L4 = "dataproc-l4"
val DATAPROC_SL_L4 = "dataproc-serverless-l4"
val DATAPROC_T4 = "dataproc-t4"
val EMR = "emr"
val EMR_A10 = "emr-a10"
val EMR_T4 = "emr-t4"
val ONPREM = "onprem"

/**
* Return a list of all platform names.
*/
def getAllNames: List[String] = List(
DATABRICKS_AWS, DATABRICKS_AZURE, DATAPROC, DATAPROC_GKE_L4, DATAPROC_GKE_T4,
DATAPROC_L4, DATAPROC_SL_L4, DATAPROC_T4, EMR, EMR_A10, EMR_T4, ONPREM
)
}

/**
* Represents a platform and its associated recommendations.
*
* @param platformName Name of the platform. See [[PlatformNames]] for supported platform names.
*/
class Platform(platformName: String) {
/**
* Recommendations to be excluded from the list of recommendations.
* These have the highest priority.
*/
val recommendationsToExclude: Seq[String] = Seq.empty
/**
* Recommendations to be included in the final list of recommendations.
* These properties should be specific to the platform and not general Spark properties.
* For example: "spark.databricks.optimizer.dynamicFilePruning" for the Databricks platform.
*
* Represented as a tuple of (propertyKey, propertyValue).
*/
val recommendationsToInclude: Seq[(String, String)] = Seq.empty
/**
* Dynamically calculates the recommendation for a specific Spark property by invoking
* the appropriate function based on `sparkProperty`.
* TODO: Implement this function and integrate with existing code in AutoTuner
*
* @param sparkProperty The Spark property for which the recommendation is calculated.
* @param args Variable list of arguments passed to the calculation function for dynamic
* processing.
* @return Optional string containing the recommendation, or `None` if unavailable.
*/
def getRecommendation(sparkProperty: String, args: Any*): Option[String] = None

/**
* Checks if the `property` is valid:
* 1. It should not be in exclusion list
* OR
* 2. It should be in the inclusion list
*/
def isValidRecommendation(property: String): Boolean = {
!recommendationsToExclude.contains(property) ||
recommendationsToInclude.map(_._1).contains(property)
}

/**
* Checks if the `comment` is valid:
* 1. It should not have any property from the exclusion list
*/
def isValidComment(comment: String): Boolean = {
recommendationsToExclude.forall(excluded => !comment.contains(excluded))
}

def getOperatorScoreFile: String = {
s"operatorsScore-$platformName.csv"
}
}

class DatabricksPlatform(platformType: String) extends Platform(platformType) {
override val recommendationsToExclude: Seq[String] = Seq(
"spark.executor.cores",
"spark.executor.instances",
"spark.executor.memory",
"spark.executor.memoryOverhead"
)
override val recommendationsToInclude: Seq[(String, String)] = Seq(
("spark.databricks.optimizer.dynamicFilePruning", "false")
)
}

class DataprocPlatform(platformType: String) extends Platform(platformType)

class EmrPlatform(platformType: String) extends Platform(platformType)

class OnPremPlatform extends Platform(PlatformNames.ONPREM)

/**
* Factory for creating instances of different platforms.
* This factory supports various platforms and provides methods for creating
* corresponding platform instances.
*/
object PlatformFactory extends Logging {
/**
* Creates an instance of a platform based on the specified platform key.
* If platform key is not defined, returns an instance of onprem platform.
*
* @param platformKey The key representing the desired platform.
* @return An instance of the specified platform.
* @throws IllegalArgumentException if the specified platform key is not supported.
*/
def createInstance(platformKey: String): Platform = {
platformKey match {
case PlatformNames.DATABRICKS_AWS => new DatabricksPlatform(PlatformNames.DATABRICKS_AWS)
case PlatformNames.DATABRICKS_AZURE => new DatabricksPlatform(PlatformNames.DATABRICKS_AZURE)
case PlatformNames.DATAPROC | PlatformNames.DATAPROC_T4 =>
// if no GPU specified, then default to dataproc-t4 for backward compatibility
new DataprocPlatform(PlatformNames.DATAPROC_T4)
case PlatformNames.DATAPROC_L4 => new DataprocPlatform(PlatformNames.DATAPROC_L4)
case PlatformNames.DATAPROC_SL_L4 => new DataprocPlatform(PlatformNames.DATAPROC_SL_L4)
case PlatformNames.DATAPROC_GKE_L4 => new DataprocPlatform(PlatformNames.DATAPROC_GKE_L4)
case PlatformNames.DATAPROC_GKE_T4 => new DataprocPlatform(PlatformNames.DATAPROC_GKE_T4)
case PlatformNames.EMR | PlatformNames.EMR_T4 =>
// if no GPU specified, then default to emr-t4 for backward compatibility
new EmrPlatform(PlatformNames.EMR_T4)
case PlatformNames.EMR_A10 => new EmrPlatform(PlatformNames.EMR_A10)
case PlatformNames.ONPREM => new OnPremPlatform
case p if p.isEmpty =>
logInfo(s"Platform is not specified. Using ${PlatformNames.ONPREM} as default.")
new OnPremPlatform
case _ => throw new IllegalArgumentException(s"Unsupported platform: $platformKey. " +
s"Options include ${PlatformNames.getAllNames.mkString(", ")}.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.util.matching.Regex

import com.nvidia.spark.rapids.tool.{Platform, PlatformFactory}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}
import org.yaml.snakeyaml.{DumperOptions, LoaderOptions, Yaml}
Expand Down Expand Up @@ -343,12 +344,7 @@ class AutoTuner(
private val limitedLogicRecommendations: mutable.HashSet[String] = mutable.HashSet[String]()
// When enabled, the profiler recommendations should only include updated settings.
private var filterByUpdatedPropertiesEnabled: Boolean = true
val selectedPlatform: Platform = platform match {
case "databricks" => new DatabricksPlatform()
case "dataproc" => new DataprocPlatform()
case "emr" => new EmrPlatform()
case "onprem" => new OnPremPlatform()
}
val selectedPlatform: Platform = PlatformFactory.createInstance(platform)

private def isCalculationEnabled(prop: String) : Boolean = {
!limitedLogicRecommendations.contains(prop)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.nvidia.spark.rapids.tool.profiling

import com.nvidia.spark.rapids.tool.PlatformNames
import org.rogach.scallop.{ScallopConf, ScallopOption}
import org.rogach.scallop.exceptions.ScallopException

Expand Down Expand Up @@ -67,9 +68,8 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
val platform: ScallopOption[String] =
opt[String](required = false,
descr = "Cluster platform where Spark GPU workloads were executed. Options include " +
"onprem, dataproc, emr, databricks." +
" Default is onprem.",
default = Some(Profiler.DEFAULT_PLATFORM))
s"${PlatformNames.getAllNames.mkString(", ")}. Default is ${PlatformNames.ONPREM}.",
default = Some(PlatformNames.ONPREM))
val generateTimeline: ScallopOption[Boolean] =
opt[Boolean](required = false,
descr = "Write an SVG graph out for the full application timeline.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal

import com.nvidia.spark.rapids.ThreadFactoryBuilder
import com.nvidia.spark.rapids.tool.{EventLogInfo, EventLogPathProcessor}
import com.nvidia.spark.rapids.tool.{EventLogInfo, EventLogPathProcessor, PlatformNames}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -533,7 +533,7 @@ object Profiler {
val COMPARE_LOG_FILE_NAME_PREFIX = "rapids_4_spark_tools_compare"
val COMBINED_LOG_FILE_NAME_PREFIX = "rapids_4_spark_tools_combined"
val SUBDIR = "rapids_4_spark_profile"
val DEFAULT_PLATFORM = "onprem"
val DEFAULT_PLATFORM: String = PlatformNames.ONPREM

def getAutoTunerResultsAsString(props: Seq[RecommendedPropertyResult],
comments: Seq[RecommendedCommentResult]): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import scala.collection.mutable.{ArrayBuffer,HashMap}
import scala.io.{BufferedSource, Source}
import scala.util.control.NonFatal

import com.nvidia.spark.rapids.tool.PlatformFactory
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

Expand All @@ -44,16 +45,6 @@ class PluginTypeChecker(platform: String = "onprem",
private val NA = "NA"

private val DEFAULT_DS_FILE = "supportedDataSource.csv"
private val OPERATORS_SCORE_FILE_ONPREM = "operatorsScore.csv"
private val OPERATORS_SCORE_FILE_DATAPROC_T4 = "operatorsScore-dataproc-t4.csv"
private val OPERATORS_SCORE_FILE_DATAPROC_L4 = "operatorsScore-dataproc-l4.csv"
private val OPERATORS_SCORE_FILE_DATAPROC_SL_L4 = "operatorsScore-dataproc-serverless-l4.csv"
private val OPERATORS_SCORE_FILE_DATAPROC_GKE_T4 = "operatorsScore-dataproc-gke-t4.csv"
private val OPERATORS_SCORE_FILE_DATAPROC_GKE_L4 = "operatorsScore-dataproc-gke-l4.csv"
private val OPERATORS_SCORE_FILE_EMR_T4 = "operatorsScore-emr-t4.csv"
private val OPERATORS_SCORE_FILE_EMR_A10 = "operatorsScore-emr-a10.csv"
private val OPERATORS_SCORE_FILE_DATABRICKS_AWS = "operatorsScore-databricks-aws.csv"
private val OPERATORS_SCORE_FILE_DATABRICKS_AZURE = "operatorsScore-databricks-azure.csv"
private val SUPPORTED_EXECS_FILE = "supportedExecs.csv"
private val SUPPORTED_EXPRS_FILE = "supportedExprs.csv"

Expand Down Expand Up @@ -101,20 +92,7 @@ class PluginTypeChecker(platform: String = "onprem",
speedupFactorFile match {
case None =>
logInfo(s"Reading operators scores with platform: $platform")
val file = platform match {
// if no GPU specified, then default to dataproc-t4 for backward compatibility
case "dataproc-t4" | "dataproc" => OPERATORS_SCORE_FILE_DATAPROC_T4
case "dataproc-l4" => OPERATORS_SCORE_FILE_DATAPROC_L4
case "dataproc-serverless-l4" => OPERATORS_SCORE_FILE_DATAPROC_SL_L4
case "dataproc-gke-t4" => OPERATORS_SCORE_FILE_DATAPROC_GKE_T4
case "dataproc-gke-l4" => OPERATORS_SCORE_FILE_DATAPROC_GKE_L4
// if no GPU specified, then default to emr-t4 for backward compatibility
case "emr-t4" | "emr" => OPERATORS_SCORE_FILE_EMR_T4
case "emr-a10" => OPERATORS_SCORE_FILE_EMR_A10
case "databricks-aws" => OPERATORS_SCORE_FILE_DATABRICKS_AWS
case "databricks-azure" => OPERATORS_SCORE_FILE_DATABRICKS_AZURE
case _ => OPERATORS_SCORE_FILE_ONPREM
}
val file = PlatformFactory.createInstance(platform).getOperatorScoreFile
val source = Source.fromResource(file)
readSupportedOperators(source, "score").map(x => (x._1, x._2.toDouble))
case Some(file) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.nvidia.spark.rapids.tool.qualification

import com.nvidia.spark.rapids.tool.PlatformNames
import org.rogach.scallop.{ScallopConf, ScallopOption}
import org.rogach.scallop.exceptions.ScallopException

Expand Down Expand Up @@ -155,10 +156,8 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
val platform: ScallopOption[String] =
opt[String](required = false,
descr = "Cluster platform where Spark CPU workloads were executed. Options include " +
"onprem, dataproc-t4, dataproc-l4, dataproc-serverless-l4, dataproc-gke-t4, " +
"dataproc-gke-l4, emr-t4, emr-a10, databricks-aws, and databricks-azure. Default " +
"is onprem.",
default = Some("onprem"))
s"${PlatformNames.getAllNames.mkString(", ")}. Default is ${PlatformNames.ONPREM}.",
default = Some(PlatformNames.ONPREM))
val speedupFactorFile: ScallopOption[String] =
opt[String](required = false,
descr = "Custom speedup factor file used to get estimated GPU speedup that is specific " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1283,10 +1283,10 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
assert(expectedResults == autoTunerOutput)
}

test("test recommendations for databricks platform argument") {
test("test recommendations for databricks-aws platform argument") {
val databricksWorkerInfo = buildWorkerInfoAsString()
val autoTuner = AutoTuner.buildAutoTunerFromProps(databricksWorkerInfo,
getGpuAppMockInfoProvider, "databricks")
getGpuAppMockInfoProvider, "databricks-aws")
val (properties, comments) = autoTuner.getRecommendedProperties()

// Assert recommendations are excluded in properties
Expand Down
Loading