Skip to content

Commit

Permalink
Add unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Partho Sarthi <[email protected]>
  • Loading branch information
parthosa committed Nov 28, 2023
1 parent 8720dd5 commit 54749ac
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -822,15 +822,12 @@ class AutoTuner(
* Analyzes unsupported driver logs and generates recommendations for configuration properties.
*/
private def recommendFromDriverLogs(): Unit = {
val doc_url = "https://nvidia.github.io/spark-rapids/docs/additional-functionality/" +
"advanced_configs.html#advanced-configuration"
// Iterate through unsupported operators' reasons and check for matching properties
unsupportedOperators.map(_.reason).foreach { operatorReason =>
recommendationsFromDriverLogs.collect {
case (config, recommendedValue) if operatorReason.contains(config) =>
appendRecommendation(config, recommendedValue)
appendComment(s"Using $config does not guarantee to produce the same results as CPU. " +
s"Please refer to $doc_url")
appendComment(commentForExperimentalConfig(config))
}
}
}
Expand Down Expand Up @@ -989,6 +986,8 @@ object AutoTuner extends Logging {
val DEF_READ_SIZE_THRESHOLD = 100 * 1024L * 1024L * 1024L
val DEFAULT_WORKER_INFO_PATH = "./worker_info.yaml"
val SUPPORTED_SIZE_UNITS: Seq[String] = Seq("b", "k", "m", "g", "t", "p")
private val DOC_URL: String = "https://nvidia.github.io/spark-rapids/docs/additional-functionality/" +
"advanced_configs.html#advanced-configuration"

val commentsForMissingProps: Map[String, String] = Map(
"spark.executor.memory" ->
Expand Down Expand Up @@ -1042,6 +1041,11 @@ object AutoTuner extends Logging {
"spark.rapids.sql.incompatibleDateFormats.enabled" -> "true"
)

def commentForExperimentalConfig(config: String): String = {
s"Using $config does not guarantee to produce the same results as CPU. " +
s"Please refer to $DOC_URL."
}

// the plugin jar is in the form of rapids-4-spark_scala_binary-(version)-*.jar
val pluginJarRegEx: Regex = "rapids-4-spark_\\d\\.\\d+-(\\d{2}\\.\\d{2}\\.\\d+).*\\.jar".r

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1360,9 +1360,140 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
assert(expectedResults == autoTunerOutput)
}

test("Recommendations generated for unsupported operators from driver logs") {
// mock the properties loaded from eventLog
val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider)
test("Recommendations generated for unsupported operators from driver logs only") {
val customProps = mutable.LinkedHashMap(
"spark.executor.cores" -> "8",
"spark.executor.memory" -> "47222m",
"spark.rapids.sql.concurrentGpuTasks" -> "2",
"spark.task.resource.gpu.amount" -> "0.0625")
val unsupportedDriverOperators = Seq(
DriverLogUnsupportedOperators(
"FromUnixTime", 1,
"Only UTC zone id is supported. Actual default zone id: America/Los_Angeles; " +
"CORRECTED format 'yyyyMMdd' on the GPU is not guaranteed to produce the same " +
"results as Spark on CPU. Set spark.rapids.sql.incompatibleDateFormats.enabled=true " +
"to force onto GPU.")
)
val workerInfo = buildWorkerInfoAsString(Some(customProps))
val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(workerInfo, null,
PlatformFactory.createInstance(), unsupportedDriverOperators)
val (properties, comments) = autoTuner.getRecommendedProperties()
val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments)
// scalastyle:off line.size.limit
val expectedResults =
s"""|
|Spark Properties:
|--conf spark.rapids.sql.incompatibleDateFormats.enabled=true
|
|Comments:
|- 'spark.rapids.sql.incompatibleDateFormats.enabled' was not set.
|- AutoTuner recommendations only support eventlogs generated by Spark applications utilizing RAPIDS Accelerator for Apache Spark
|- RAPIDS Accelerator for Apache Spark jar is missing in "spark.plugins". Please refer to https://docs.nvidia.com/spark-rapids/user-guide/latest/getting-started/overview.html
|- ${AutoTuner.commentForExperimentalConfig("spark.rapids.sql.incompatibleDateFormats.enabled")}
|""".stripMargin
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
}

test("Recommendations generated for unsupported operators from driver and event logs") {
val customProps = mutable.LinkedHashMap(
"spark.executor.cores" -> "8",
"spark.executor.memory" -> "47222m",
"spark.rapids.sql.concurrentGpuTasks" -> "2",
"spark.task.resource.gpu.amount" -> "0.0625")
val unsupportedDriverOperators = Seq(
DriverLogUnsupportedOperators(
"FromUnixTime", 1,
"Only UTC zone id is supported. Actual default zone id: America/Los_Angeles; " +
"CORRECTED format 'yyyyMMdd' on the GPU is not guaranteed to produce the same " +
"results as Spark on CPU. Set spark.rapids.sql.incompatibleDateFormats.enabled=true " +
"to force onto GPU.")
)
val workerInfo = buildWorkerInfoAsString(Some(customProps))
val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(workerInfo,
getGpuAppMockInfoProvider, PlatformFactory.createInstance(), unsupportedDriverOperators)
val (properties, comments) = autoTuner.getRecommendedProperties()
val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments)
// scalastyle:off line.size.limit
val expectedResults =
s"""|
|Spark Properties:
|--conf spark.executor.cores=16
|--conf spark.executor.instances=8
|--conf spark.executor.memory=32768m
|--conf spark.executor.memoryOverhead=8396m
|--conf spark.rapids.memory.pinnedPool.size=4096m
|--conf spark.rapids.shuffle.multiThreaded.reader.threads=16
|--conf spark.rapids.shuffle.multiThreaded.writer.threads=16
|--conf spark.rapids.sql.incompatibleDateFormats.enabled=true
|--conf spark.rapids.sql.multiThreadedRead.numThreads=20
|--conf spark.shuffle.manager=com.nvidia.spark.rapids.spark311.RapidsShuffleManager
|--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m
|--conf spark.sql.adaptive.coalescePartitions.minPartitionNum=128
|--conf spark.sql.files.maxPartitionBytes=512m
|--conf spark.sql.shuffle.partitions=200
|
|Comments:
|- 'spark.executor.instances' was not set.
|- 'spark.executor.memoryOverhead' was not set.
|- 'spark.rapids.memory.pinnedPool.size' was not set.
|- 'spark.rapids.shuffle.multiThreaded.reader.threads' was not set.
|- 'spark.rapids.shuffle.multiThreaded.writer.threads' was not set.
|- 'spark.rapids.sql.incompatibleDateFormats.enabled' was not set.
|- 'spark.rapids.sql.multiThreadedRead.numThreads' was not set.
|- 'spark.shuffle.manager' was not set.
|- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set.
|- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set.
|- 'spark.sql.adaptive.enabled' should be enabled for better performance.
|- 'spark.sql.files.maxPartitionBytes' was not set.
|- 'spark.sql.shuffle.partitions' was not set.
|- ${AutoTuner.classPathComments("rapids.jars.missing")}
|- ${AutoTuner.classPathComments("rapids.shuffle.jars")}
|- ${AutoTuner.commentForExperimentalConfig("spark.rapids.sql.incompatibleDateFormats.enabled")}
|""".stripMargin
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
}


test("Recommendations generated for empty unsupported operators from driver logs only") {
val customProps = mutable.LinkedHashMap(
"spark.executor.cores" -> "8",
"spark.executor.memory" -> "47222m",
"spark.rapids.sql.concurrentGpuTasks" -> "2",
"spark.task.resource.gpu.amount" -> "0.0625")
val workerInfo = buildWorkerInfoAsString(Some(customProps))
val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(workerInfo, null,
PlatformFactory.createInstance(), Seq.empty)
val (properties, comments) = autoTuner.getRecommendedProperties()
val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments)
// scalastyle:off line.size.limit
val expectedResults =
s"""|Cannot recommend properties. See Comments.
|
|Comments:
|- AutoTuner recommendations only support eventlogs generated by Spark applications utilizing RAPIDS Accelerator for Apache Spark
|- RAPIDS Accelerator for Apache Spark jar is missing in "spark.plugins". Please refer to https://docs.nvidia.com/spark-rapids/user-guide/latest/getting-started/overview.html
|""".stripMargin
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
}

test("Recommendations not generated for unsupported operators from driver logs") {
// This test does not generate any recommendations for the unsupported operator 'Literal'
val customProps = mutable.LinkedHashMap(
"spark.executor.cores" -> "8",
"spark.executor.memory" -> "47222m",
"spark.rapids.sql.concurrentGpuTasks" -> "2",
"spark.task.resource.gpu.amount" -> "0.0625")
val unsupportedDriverOperators = Seq(
DriverLogUnsupportedOperators(
"Literal", 3,
"expression Literal 1700518632630000 produces an unsupported type TimestampType")
)
val workerInfo = buildWorkerInfoAsString(Some(customProps))
val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(workerInfo, null,
PlatformFactory.createInstance(), unsupportedDriverOperators)
val (properties, comments) = autoTuner.getRecommendedProperties()
val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments)
// scalastyle:off line.size.limit
Expand Down

0 comments on commit 54749ac

Please sign in to comment.