Skip to content

Commit

Permalink
add unit test
Browse files Browse the repository at this point in the history
Signed-off-by: cindyyuanjiang <[email protected]>
  • Loading branch information
cindyyuanjiang committed Oct 25, 2024
1 parent 41f6ff0 commit 7cd25e9
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.spark.sql.rapids.tool.util.ToolsPlanGraph
* @param app the Application info objects that contains the SQL plans to be processed
*/
class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(app) {
val GPU_SEMAPHORE_WAIT_METRIC_NAME = "gpuSemaphoreWait"
// A map between (SQL ID, Node ID) and the set of stage IDs
// TODO: The Qualification should use this map instead of building a new set for each exec.
private val sqlPlanNodeIdToStageIds: HashMap[(Long, Long), Set[Int]] =
Expand Down Expand Up @@ -344,7 +345,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
} else {
taskUpatesSubset(taskUpatesSubset.size / 2)
}
if (accumInfo.infoRef.getName.contains("gpuSemaphoreWait")) {
if (accumInfo.infoRef.getName.contains(GPU_SEMAPHORE_WAIT_METRIC_NAME)) {
stageToGpuSemaphoreWaitTime(stageId) = sum
}
Some(AccumProfileResults(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.rapids.tool.benchmarks
import com.nvidia.spark.rapids.tool.qualification.QualificationArgs
import com.nvidia.spark.rapids.tool.qualification.QualificationMain.mainInternal


/**
* This class is used to run the QualificationMain class as a benchmark.
* This can be used as a reference to write any benchmark class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,24 @@ class AnalysisSuite extends FunSuite {
expectFile("sql"), expectFile("job"), expectFile("stage"))
}

test("test stage-level diagnostic aggregation simple") {
val expectFile = "rapids_join_eventlog_stagediagnosticmetricsagg_expectation.csv"
val logs = Array(s"$logDir/rapids_join_eventlog.zstd")
val apps = ToolTestUtils.processProfileApps(logs, sparkSession)
assert(apps.size == logs.size)

// This step is to compute stage to node names mapping
val collect = new CollectInformation(apps)
collect.getSQLToStage

val aggResults = RawMetricProfilerView.getAggMetrics(apps)
import org.apache.spark.sql.functions._
import sparkSession.implicits._
val actualDf = aggResults.stageDiagnostics.toDF.
withColumn("nodeNames", concat_ws(",", col("nodeNames")))
compareMetrics(actualDf, expectFile)
}

private def testSqlMetricsAggregation(logs: Array[String], expectFileSQL: String,
expectFileJob: String, expectFileStage: String): Unit = {
val apps = ToolTestUtils.processProfileApps(logs, sparkSession)
Expand Down

0 comments on commit 7cd25e9

Please sign in to comment.