Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split AutoTuner for Profiling and Qualification and Override BATCH_SIZE_BYTES #1471

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids.tool
import scala.annotation.tailrec

import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper
import com.nvidia.spark.rapids.tool.profiling.ClusterProperties
import com.nvidia.spark.rapids.tool.tuning.ClusterProperties

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.{ExistingClusterInfo, RecommendedClusterInfo}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.nvidia.spark.rapids.tool.profiling

import com.nvidia.spark.rapids.tool.PlatformNames
import com.nvidia.spark.rapids.tool.tuning.ProfilingAutoTunerConfigsProvider
import org.rogach.scallop.{ScallopConf, ScallopOption}
import org.rogach.scallop.exceptions.ScallopException

Expand Down Expand Up @@ -117,7 +118,7 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
descr = "File path containing the system information of a worker node. It is assumed " +
"that all workers are homogenous. It requires the AutoTuner to be enabled. Default is " +
"./worker_info.yaml",
default = Some(AutoTuner.DEFAULT_WORKER_INFO_PATH))
default = Some(ProfilingAutoTunerConfigsProvider.DEFAULT_WORKER_INFO_PATH))

validate(filterCriteria) {
case crit if (crit.endsWith("-newest-filesystem") ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal

import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, EventLogInfo, EventLogPathProcessor, FailedEventLog, PlatformFactory, ToolBase}
import com.nvidia.spark.rapids.tool.profiling.AutoTuner.loadClusterProps
import com.nvidia.spark.rapids.tool.tuning.{AutoTuner, ProfilingAutoTunerConfigsProvider}
import com.nvidia.spark.rapids.tool.views._
import org.apache.hadoop.conf.Configuration

Expand Down Expand Up @@ -419,9 +419,10 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
// assumptions made in the code
if (appInfo.isDefined && appInfo.get.appInfo.head.pluginEnabled) {
val appInfoProvider = AppSummaryInfoBaseProvider.fromAppInfo(appInfo)
val workerInfoPath = appArgs.workerInfo.getOrElse(AutoTuner.DEFAULT_WORKER_INFO_PATH)
val clusterPropsOpt = loadClusterProps(workerInfoPath)
val autoTuner: AutoTuner = AutoTuner.buildAutoTuner(appInfoProvider,
val workerInfoPath = appArgs.workerInfo
.getOrElse(ProfilingAutoTunerConfigsProvider.DEFAULT_WORKER_INFO_PATH)
val clusterPropsOpt = ProfilingAutoTunerConfigsProvider.loadClusterProps(workerInfoPath)
val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider.buildAutoTuner(appInfoProvider,
PlatformFactory.createInstance(appArgs.platform(), clusterPropsOpt), driverInfoProvider)

// The autotuner allows skipping some properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import scala.collection.JavaConverters._

import com.nvidia.spark.rapids.tool.{EventLogInfo, FailedEventLog, PlatformFactory, ToolBase}
import com.nvidia.spark.rapids.tool.profiling.AutoTuner
import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.DEFAULT_JOB_FREQUENCY
import com.nvidia.spark.rapids.tool.tuning.TunerContext
import com.nvidia.spark.rapids.tool.tuning.{QualificationAutoTunerConfigsProvider, TunerContext}
import com.nvidia.spark.rapids.tool.views.QualRawReportGenerator
import org.apache.hadoop.conf.Configuration

Expand Down Expand Up @@ -147,7 +146,7 @@ class Qualification(outputPath: String, numRows: Int, hadoopConf: Configuration,
// we need a platform per application because it's storing cluster information which could
// vary between applications, especially when using dynamic allocation
val platform = {
val clusterPropsOpt = AutoTuner.loadClusterProps(workerInfoPath)
val clusterPropsOpt = QualificationAutoTunerConfigsProvider.loadClusterProps(workerInfoPath)
PlatformFactory.createInstance(platformArg, clusterPropsOpt)
}
val appResult = QualificationAppInfo.createApp(path, hadoopConf, pluginTypeChecker,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.nvidia.spark.rapids.tool.qualification

import com.nvidia.spark.rapids.tool.PlatformNames
import com.nvidia.spark.rapids.tool.profiling.AutoTuner
import com.nvidia.spark.rapids.tool.tuning.QualificationAutoTunerConfigsProvider
import org.rogach.scallop.{ScallopConf, ScallopOption}
import org.rogach.scallop.exceptions.ScallopException

Expand Down Expand Up @@ -195,7 +195,7 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
descr = "File path containing the system information of a worker node. It is assumed " +
"that all workers are homogenous. It requires the AutoTuner to be enabled. Default is " +
"./worker_info.yaml",
default = Some(AutoTuner.DEFAULT_WORKER_INFO_PATH))
default = Some(QualificationAutoTunerConfigsProvider.DEFAULT_WORKER_INFO_PATH))
val clusterReport: ScallopOption[Boolean] =
toggle("cluster-report",
default = Some(true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package com.nvidia.spark.rapids.tool.qualification
import scala.util.control.NonFatal

import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, PlatformFactory}
import com.nvidia.spark.rapids.tool.profiling.AutoTuner.loadClusterProps
import com.nvidia.spark.rapids.tool.tuning.TunerContext
import com.nvidia.spark.rapids.tool.tuning.{QualificationAutoTunerConfigsProvider, TunerContext}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.AppFilterImpl
Expand Down Expand Up @@ -75,7 +74,8 @@ object QualificationMain extends Logging {
// This platform instance should not be used for anything other then referencing the
// files for this particular Platform.
val referencePlatform = try {
val clusterPropsOpt = loadClusterProps(appArgs.workerInfo())
val clusterPropsOpt =
QualificationAutoTunerConfigsProvider.loadClusterProps(appArgs.workerInfo())
PlatformFactory.createInstance(appArgs.platform(), clusterPropsOpt)
} catch {
case NonFatal(e) =>
Expand Down
Loading
Loading