diff --git a/core/docs/spark-qualification-tool.md b/core/docs/spark-qualification-tool.md index 023f4318b..25dd3cb5f 100644 --- a/core/docs/spark-qualification-tool.md +++ b/core/docs/spark-qualification-tool.md @@ -29,6 +29,7 @@ applicable environments. Here are the cluster information for the ETL benchmark | Dataproc (T4) | 4x n1-standard-32 | 4x n1-standard-32 + 8x T4 16GB | | Dataproc (L4) | 8x n1-standard-16 | 8x g2-standard-16 | | Dataproc Serverless (L4) | 8x 16 cores | 8x 16 cores + 8x L4 24GB | +| Dataproc GKE (T4) | 4x n1-standard-32 | 4x n1-standard-32 + 8x T4 16GB | | EMR (T4) | 8x m5d.8xlarge | 4x g4dn.12xlarge | | EMR (A10) | 8x m5d.8xlarge | 8x g5.8xlarge | | Databricks AWS | 8x m6gd.8xlage | 8x g5.8xlarge | @@ -248,8 +249,8 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* -p, --per-sql Report at the individual SQL query level. --platform Cluster platform where Spark CPU workloads were executed. Options include onprem, dataproc-t4, - dataproc-l4, dataproc-serverless-l4, emr-t4, - emr-a10, databricks-aws, and databricks-azure. + dataproc-l4, dataproc-serverless-l4, dataproc-gke-t4, + emr-t4, emr-a10, databricks-aws, and databricks-azure. Default is onprem. -r, --report-read-schema Whether to output the read formats and datatypes to the CSV file. This can be very diff --git a/core/src/main/resources/operatorsScore-dataproc-gke-t4.csv b/core/src/main/resources/operatorsScore-dataproc-gke-t4.csv new file mode 100644 index 000000000..493f1154c --- /dev/null +++ b/core/src/main/resources/operatorsScore-dataproc-gke-t4.csv @@ -0,0 +1,256 @@ +CPUOperator,Score +CoalesceExec,4.25 +CollectLimitExec,4.25 +ExpandExec,7.76 +FileSourceScanExec,3.64 +FilterExec,4.47 +GenerateExec,4.25 +GlobalLimitExec,4.25 +LocalLimitExec,4.25 +ProjectExec,4.25 +RangeExec,4.25 +SampleExec,4.25 +SortExec,4.25 +TakeOrderedAndProjectExec,20.96 +HashAggregateExec,5.54 +ObjectHashAggregateExec,5.54 +SortAggregateExec,5.54 +DataWritingCommandExec,4.25 +ExecutedCommandExec,4.25 +BatchScanExec,3.64 +ShuffleExchangeExec,5.21 +BroadcastHashJoinExec,6.42 +BroadcastNestedLoopJoinExec,17.46 +CartesianProductExec,4.25 +ShuffledHashJoinExec,4.25 +SortMergeJoinExec,7.4 +WindowExec,4.25 +Abs,4.25 +Acos,4.25 +Acosh,4.25 +Add,4.25 +AggregateExpression,4.25 +Alias,4.25 +And,4.25 +ApproximatePercentile,4.25 +ArrayContains,4.25 +ArrayExcept,4.25 +ArrayExists,4.25 +ArrayIntersect,4.25 +ArrayMax,4.25 +ArrayMin,4.25 +ArrayRemove,4.25 +ArrayRepeat,4.25 +ArrayTransform,4.25 +ArrayUnion,4.25 +ArraysOverlap,4.25 +ArraysZip,4.25 +Asin,4.25 +Asinh,4.25 +AtLeastNNonNulls,4.25 +Atan,4.25 +Atanh,4.25 +AttributeReference,4.25 +Average,4.25 +BRound,4.25 +BitLength,4.25 +BitwiseAnd,4.25 +BitwiseNot,4.25 +BitwiseOr,4.25 +BitwiseXor,4.25 +CaseWhen,4.25 +Cbrt,4.25 +Ceil,4.25 +CheckOverflow,4.25 +Coalesce,4.25 +CollectList,4.25 +CollectSet,4.25 +Concat,4.25 +ConcatWs,4.25 +Contains,4.25 +Conv,4.25 +Cos,4.25 +Cosh,4.25 +Cot,4.25 +Count,4.25 +CreateArray,4.25 +CreateMap,4.25 +CreateNamedStruct,4.25 +CurrentRow$,4.25 +DateAdd,4.25 +DateAddInterval,4.25 +DateDiff,4.25 +DateFormatClass,4.25 +DateSub,4.25 +DayOfMonth,4.25 +DayOfWeek,4.25 +DayOfYear,4.25 +DenseRank,4.25 +Divide,4.25 +DynamicPruningExpression,4.25 +ElementAt,4.25 +EndsWith,4.25 +EqualNullSafe,4.25 +EqualTo,4.25 +Exp,4.25 +Explode,4.25 +Expm1,4.25 +First,4.25 +Flatten,4.25 +Floor,4.25 +FromUTCTimestamp,4.25 +FromUnixTime,4.25 +GetArrayItem,4.25 +GetArrayStructFields,4.25 +GetJsonObject,4.25 +GetMapValue,4.25 +GetStructField,4.25 +GetTimestamp,4.25 +GreaterThan,4.25 +GreaterThanOrEqual,4.25 +Greatest,4.25 +HiveGenericUDF,4.25 +HiveSimpleUDF,4.25 +Hour,4.25 +Hypot,4.25 +If,4.25 +In,4.25 +InSet,4.25 +InitCap,4.25 +InputFileBlockLength,4.25 +InputFileBlockStart,4.25 +InputFileName,4.25 +IntegralDivide,4.25 +IsNaN,4.25 +IsNotNull,4.25 +IsNull,4.25 +JsonToStructs,4.25 +JsonTuple,4.25 +KnownFloatingPointNormalized,4.25 +KnownNotNull,4.25 +Lag,4.25 +LambdaFunction,4.25 +Last,4.25 +LastDay,4.25 +Lead,4.25 +Least,4.25 +Length,4.25 +LessThan,4.25 +LessThanOrEqual,4.25 +Like,4.25 +Literal,4.25 +Log,4.25 +Log10,4.25 +Log1p,4.25 +Log2,4.25 +Logarithm,4.25 +Lower,4.25 +MakeDecimal,4.25 +MapConcat,4.25 +MapEntries,4.25 +MapFilter,4.25 +MapKeys,4.25 +MapValues,4.25 +Max,4.25 +Md5,4.25 +MicrosToTimestamp,4.25 +MillisToTimestamp,4.25 +Min,4.25 +Minute,4.25 +MonotonicallyIncreasingID,4.25 +Month,4.25 +Multiply,4.25 +Murmur3Hash,4.25 +NaNvl,4.25 +NamedLambdaVariable,4.25 +NormalizeNaNAndZero,4.25 +Not,4.25 +NthValue,4.25 +OctetLength,4.25 +Or,4.25 +PercentRank,4.25 +PivotFirst,4.25 +Pmod,4.25 +PosExplode,4.25 +Pow,4.25 +PreciseTimestampConversion,4.25 +PromotePrecision,4.25 +PythonUDF,4.25 +Quarter,4.25 +RLike,4.25 +RaiseError,4.25 +Rand,4.25 +Rank,4.25 +RegExpExtract,4.25 +RegExpExtractAll,4.25 +RegExpReplace,4.25 +Remainder,4.25 +ReplicateRows,4.25 +Reverse,4.25 +Rint,4.25 +Round,4.25 +RowNumber,4.25 +ScalaUDF,4.25 +ScalarSubquery,4.25 +Second,4.25 +SecondsToTimestamp,4.25 +Sequence,4.25 +ShiftLeft,4.25 +ShiftRight,4.25 +ShiftRightUnsigned,4.25 +Signum,4.25 +Sin,4.25 +Sinh,4.25 +Size,4.25 +SortArray,4.25 +SortOrder,4.25 +SparkPartitionID,4.25 +SpecifiedWindowFrame,4.25 +Sqrt,4.25 +StartsWith,4.25 +StddevPop,4.25 +StddevSamp,4.25 +StringInstr,4.25 +StringLPad,4.25 +StringLocate,4.25 +StringRPad,4.25 +StringRepeat,4.25 +StringReplace,4.25 +StringSplit,4.25 +StringToMap,4.25 +StringTranslate,4.25 +StringTrim,4.25 +StringTrimLeft,4.25 +StringTrimRight,4.25 +Substring,4.25 +SubstringIndex,4.25 +Subtract,4.25 +Sum,4.25 +Tan,4.25 +Tanh,4.25 +TimeAdd,4.25 +ToDegrees,4.25 +ToRadians,4.25 +ToUnixTimestamp,4.25 +TransformKeys,4.25 +TransformValues,4.25 +UnaryMinus,4.25 +UnaryPositive,4.25 +UnboundedFollowing$,4.25 +UnboundedPreceding$,4.25 +UnixTimestamp,4.25 +UnscaledValue,4.25 +Upper,4.25 +VariancePop,4.25 +VarianceSamp,4.25 +WeekDay,4.25 +WindowExpression,4.25 +WindowSpecDefinition,4.25 +XxHash64,4.25 +Year,4.25 +AggregateInPandasExec,1.2 +ArrowEvalPythonExec,1.2 +FlatMapGroupsInPandasExec,1.2 +FlatMapCoGroupsInPandasExec,1.2 +MapInPandasExec,1.2 +WindowInPandasExec,1.2 diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala index bd5641363..c01586159 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala @@ -48,6 +48,8 @@ class PluginTypeChecker(platform: String = "onprem", private val OPERATORS_SCORE_FILE_DATAPROC_T4 = "operatorsScore-dataproc-t4.csv" private val OPERATORS_SCORE_FILE_DATAPROC_L4 = "operatorsScore-dataproc-l4.csv" private val OPERATORS_SCORE_FILE_DATAPROC_SL_L4 = "operatorsScore-dataproc-serverless-l4.csv" + // TODO: Replace this with GKE T4 speedup scores + private val OPERATORS_SCORE_FILE_DATAPROC_GKE_T4 = "operatorsScore.csv" private val OPERATORS_SCORE_FILE_EMR_T4 = "operatorsScore-emr-t4.csv" private val OPERATORS_SCORE_FILE_EMR_A10 = "operatorsScore-emr-a10.csv" private val OPERATORS_SCORE_FILE_DATABRICKS_AWS = "operatorsScore-databricks-aws.csv" @@ -104,6 +106,7 @@ class PluginTypeChecker(platform: String = "onprem", case "dataproc-t4" | "dataproc" => OPERATORS_SCORE_FILE_DATAPROC_T4 case "dataproc-l4" => OPERATORS_SCORE_FILE_DATAPROC_L4 case "dataproc-serverless-l4" => OPERATORS_SCORE_FILE_DATAPROC_SL_L4 + case "dataproc-gke-t4" => OPERATORS_SCORE_FILE_DATAPROC_GKE_T4 // if no GPU specified, then default to emr-t4 for backward compatibility case "emr-t4" | "emr" => OPERATORS_SCORE_FILE_EMR_T4 case "emr-a10" => OPERATORS_SCORE_FILE_EMR_A10 diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala index 075c3512f..76a2fba8e 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala @@ -148,9 +148,8 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* val platform: ScallopOption[String] = opt[String](required = false, descr = "Cluster platform where Spark CPU workloads were executed. Options include " + - "onprem, dataproc-t4, dataproc-l4, dataproc-serverless-l4, emr-t4, emr-a10, " + - "databricks-aws, and databricks-azure. " + - "Default is onprem.", + "onprem, dataproc-t4, dataproc-l4, dataproc-serverless-l4, dataproc-gke-t4, emr-t4, " + + "emr-a10, databricks-aws, and databricks-azure. Default is onprem.", default = Some("onprem")) val speedupFactorFile: ScallopOption[String] = opt[String](required = false, diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala index e7720fd28..6a183dd3e 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala @@ -188,12 +188,19 @@ class PluginTypeCheckerSuite extends FunSuite with Logging { assert(checker.getSpeedupFactor("WindowExec") == 4.25) assert(checker.getSpeedupFactor("Ceil") == 4.25) } + test("supported operator score from dataproc-l4") { val checker = new PluginTypeChecker("dataproc-l4") assert(checker.getSpeedupFactor("UnionExec") == 4.16) assert(checker.getSpeedupFactor("Ceil") == 4.16) } + test("supported operator score from dataproc-gke-t4") { + val checker = new PluginTypeChecker("dataproc-gke-t4") + assert(checker.getSpeedupFactor("WindowExec") == 4.25) + assert(checker.getSpeedupFactor("Ceil") == 4.25) + } + test("supported operator score from emr-a10") { val checker = new PluginTypeChecker("emr-a10") assert(checker.getSpeedupFactor("UnionExec") == 2.59) diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala index 502ddffc1..982a65ec8 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala @@ -1529,6 +1529,30 @@ class QualificationSuite extends BaseTestSuite { assert(outputActual.collect().size == 1) } + // run the qualification tool for dataproc-gke-t4 + TrampolineUtil.withTempDir { outpath => + val appArgs = new QualificationArgs(Array( + "--output-directory", + outpath.getAbsolutePath, + "--platform", + "dataproc-gke-t4", + eventLog)) + + val (exit, _) = + QualificationMain.mainInternal(appArgs) + assert(exit == 0) + + // the code above that runs the Spark query stops the Sparksession + // so create a new one to read in the csv file + createSparkSession() + + // validate that the SQL description in the csv file escapes commas properly + val outputResults = s"$outpath/rapids_4_spark_qualification_output/" + + s"rapids_4_spark_qualification_output.csv" + val outputActual = readExpectedFile(new File(outputResults)) + assert(outputActual.collect().size == 1) + } + // run the qualification tool for databricks-aws TrampolineUtil.withTempDir { outpath => val appArgs = new QualificationArgs(Array(