Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support in core tools for running qualification on Dataproc GKE #613

Merged
merged 5 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/docs/spark-qualification-tool.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ applicable environments. Here are the cluster information for the ETL benchmark
| Dataproc (T4) | 4x n1-standard-32 | 4x n1-standard-32 + 8x T4 16GB |
| Dataproc (L4) | 8x n1-standard-16 | 8x g2-standard-16 |
| Dataproc Serverless (L4) | 8x 16 cores | 8x 16 cores + 8x L4 24GB |
| Dataproc GKE (T4) | 8x n1-standard-32 | 8x n1-standard-32 + 8x T4 16GB |
| EMR (T4) | 8x m5d.8xlarge | 4x g4dn.12xlarge |
| EMR (A10) | 8x m5d.8xlarge | 8x g5.8xlarge |
| Databricks AWS | 8x m6gd.8xlage | 8x g5.8xlarge |
Expand Down Expand Up @@ -248,8 +249,8 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
-p, --per-sql Report at the individual SQL query level.
--platform <arg> Cluster platform where Spark CPU workloads were
executed. Options include onprem, dataproc-t4,
dataproc-l4, dataproc-serverless-l4, emr-t4,
emr-a10, databricks-aws, and databricks-azure.
dataproc-l4, dataproc-serverless-l4, dataproc-gke-t4,
emr-t4, emr-a10, databricks-aws, and databricks-azure.
Default is onprem.
-r, --report-read-schema Whether to output the read formats and
datatypes to the CSV file. This can be very
Expand Down
256 changes: 256 additions & 0 deletions core/src/main/resources/operatorsScore-dataproc-gke-t4.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
CPUOperator,Score
CoalesceExec,3.65
CollectLimitExec,3.65
ExpandExec,3.76
FileSourceScanExec,2.84
FilterExec,3.79
GenerateExec,3.65
GlobalLimitExec,3.65
LocalLimitExec,3.65
ProjectExec,3.65
RangeExec,3.65
SampleExec,3.65
SortExec,3.65
TakeOrderedAndProjectExec,3.65
HashAggregateExec,4.1
ObjectHashAggregateExec,4.1
SortAggregateExec,4.1
DataWritingCommandExec,3.65
ExecutedCommandExec,3.65
BatchScanExec,2.84
ShuffleExchangeExec,3.69
BroadcastHashJoinExec,3.72
BroadcastNestedLoopJoinExec,1.66
CartesianProductExec,3.65
ShuffledHashJoinExec,3.65
SortMergeJoinExec,5.64
WindowExec,3.65
Abs,3.65
Acos,3.65
Acosh,3.65
Add,3.65
AggregateExpression,3.65
Alias,3.65
And,3.65
ApproximatePercentile,3.65
ArrayContains,3.65
ArrayExcept,3.65
ArrayExists,3.65
ArrayIntersect,3.65
ArrayMax,3.65
ArrayMin,3.65
ArrayRemove,3.65
ArrayRepeat,3.65
ArrayTransform,3.65
ArrayUnion,3.65
ArraysOverlap,3.65
ArraysZip,3.65
Asin,3.65
Asinh,3.65
AtLeastNNonNulls,3.65
Atan,3.65
Atanh,3.65
AttributeReference,3.65
Average,3.65
BRound,3.65
BitLength,3.65
BitwiseAnd,3.65
BitwiseNot,3.65
BitwiseOr,3.65
BitwiseXor,3.65
CaseWhen,3.65
Cbrt,3.65
Ceil,3.65
CheckOverflow,3.65
Coalesce,3.65
CollectList,3.65
CollectSet,3.65
Concat,3.65
ConcatWs,3.65
Contains,3.65
Conv,3.65
Cos,3.65
Cosh,3.65
Cot,3.65
Count,3.65
CreateArray,3.65
CreateMap,3.65
CreateNamedStruct,3.65
CurrentRow$,3.65
DateAdd,3.65
DateAddInterval,3.65
DateDiff,3.65
DateFormatClass,3.65
DateSub,3.65
DayOfMonth,3.65
DayOfWeek,3.65
DayOfYear,3.65
DenseRank,3.65
Divide,3.65
DynamicPruningExpression,3.65
ElementAt,3.65
EndsWith,3.65
EqualNullSafe,3.65
EqualTo,3.65
Exp,3.65
Explode,3.65
Expm1,3.65
First,3.65
Flatten,3.65
Floor,3.65
FromUTCTimestamp,3.65
FromUnixTime,3.65
GetArrayItem,3.65
GetArrayStructFields,3.65
GetJsonObject,3.65
GetMapValue,3.65
GetStructField,3.65
GetTimestamp,3.65
GreaterThan,3.65
GreaterThanOrEqual,3.65
Greatest,3.65
HiveGenericUDF,3.65
HiveSimpleUDF,3.65
Hour,3.65
Hypot,3.65
If,3.65
In,3.65
InSet,3.65
InitCap,3.65
InputFileBlockLength,3.65
InputFileBlockStart,3.65
InputFileName,3.65
IntegralDivide,3.65
IsNaN,3.65
IsNotNull,3.65
IsNull,3.65
JsonToStructs,3.65
JsonTuple,3.65
KnownFloatingPointNormalized,3.65
KnownNotNull,3.65
Lag,3.65
LambdaFunction,3.65
Last,3.65
LastDay,3.65
Lead,3.65
Least,3.65
Length,3.65
LessThan,3.65
LessThanOrEqual,3.65
Like,3.65
Literal,3.65
Log,3.65
Log10,3.65
Log1p,3.65
Log2,3.65
Logarithm,3.65
Lower,3.65
MakeDecimal,3.65
MapConcat,3.65
MapEntries,3.65
MapFilter,3.65
MapKeys,3.65
MapValues,3.65
Max,3.65
Md5,3.65
MicrosToTimestamp,3.65
MillisToTimestamp,3.65
Min,3.65
Minute,3.65
MonotonicallyIncreasingID,3.65
Month,3.65
Multiply,3.65
Murmur3Hash,3.65
NaNvl,3.65
NamedLambdaVariable,3.65
NormalizeNaNAndZero,3.65
Not,3.65
NthValue,3.65
OctetLength,3.65
Or,3.65
PercentRank,3.65
PivotFirst,3.65
Pmod,3.65
PosExplode,3.65
Pow,3.65
PreciseTimestampConversion,3.65
PromotePrecision,3.65
PythonUDF,3.65
Quarter,3.65
RLike,3.65
RaiseError,3.65
Rand,3.65
Rank,3.65
RegExpExtract,3.65
RegExpExtractAll,3.65
RegExpReplace,3.65
Remainder,3.65
ReplicateRows,3.65
Reverse,3.65
Rint,3.65
Round,3.65
RowNumber,3.65
ScalaUDF,3.65
ScalarSubquery,3.65
Second,3.65
SecondsToTimestamp,3.65
Sequence,3.65
ShiftLeft,3.65
ShiftRight,3.65
ShiftRightUnsigned,3.65
Signum,3.65
Sin,3.65
Sinh,3.65
Size,3.65
SortArray,3.65
SortOrder,3.65
SparkPartitionID,3.65
SpecifiedWindowFrame,3.65
Sqrt,3.65
StartsWith,3.65
StddevPop,3.65
StddevSamp,3.65
StringInstr,3.65
StringLPad,3.65
StringLocate,3.65
StringRPad,3.65
StringRepeat,3.65
StringReplace,3.65
StringSplit,3.65
StringToMap,3.65
StringTranslate,3.65
StringTrim,3.65
StringTrimLeft,3.65
StringTrimRight,3.65
Substring,3.65
SubstringIndex,3.65
Subtract,3.65
Sum,3.65
Tan,3.65
Tanh,3.65
TimeAdd,3.65
ToDegrees,3.65
ToRadians,3.65
ToUnixTimestamp,3.65
TransformKeys,3.65
TransformValues,3.65
UnaryMinus,3.65
UnaryPositive,3.65
UnboundedFollowing$,3.65
UnboundedPreceding$,3.65
UnixTimestamp,3.65
UnscaledValue,3.65
Upper,3.65
VariancePop,3.65
VarianceSamp,3.65
WeekDay,3.65
WindowExpression,3.65
WindowSpecDefinition,3.65
XxHash64,3.65
Year,3.65
AggregateInPandasExec,1.2
ArrowEvalPythonExec,1.2
FlatMapGroupsInPandasExec,1.2
FlatMapCoGroupsInPandasExec,1.2
MapInPandasExec,1.2
WindowInPandasExec,1.2
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class PluginTypeChecker(platform: String = "onprem",
private val OPERATORS_SCORE_FILE_DATAPROC_T4 = "operatorsScore-dataproc-t4.csv"
private val OPERATORS_SCORE_FILE_DATAPROC_L4 = "operatorsScore-dataproc-l4.csv"
private val OPERATORS_SCORE_FILE_DATAPROC_SL_L4 = "operatorsScore-dataproc-serverless-l4.csv"
private val OPERATORS_SCORE_FILE_DATAPROC_GKE_T4 = "operatorsScore-dataproc-gke-t4.csv"
private val OPERATORS_SCORE_FILE_EMR_T4 = "operatorsScore-emr-t4.csv"
private val OPERATORS_SCORE_FILE_EMR_A10 = "operatorsScore-emr-a10.csv"
private val OPERATORS_SCORE_FILE_DATABRICKS_AWS = "operatorsScore-databricks-aws.csv"
Expand Down Expand Up @@ -104,6 +105,7 @@ class PluginTypeChecker(platform: String = "onprem",
case "dataproc-t4" | "dataproc" => OPERATORS_SCORE_FILE_DATAPROC_T4
case "dataproc-l4" => OPERATORS_SCORE_FILE_DATAPROC_L4
case "dataproc-serverless-l4" => OPERATORS_SCORE_FILE_DATAPROC_SL_L4
case "dataproc-gke-t4" => OPERATORS_SCORE_FILE_DATAPROC_GKE_T4
// if no GPU specified, then default to emr-t4 for backward compatibility
case "emr-t4" | "emr" => OPERATORS_SCORE_FILE_EMR_T4
case "emr-a10" => OPERATORS_SCORE_FILE_EMR_A10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,8 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
val platform: ScallopOption[String] =
opt[String](required = false,
descr = "Cluster platform where Spark CPU workloads were executed. Options include " +
"onprem, dataproc-t4, dataproc-l4, dataproc-serverless-l4, emr-t4, emr-a10, " +
"databricks-aws, and databricks-azure. " +
"Default is onprem.",
"onprem, dataproc-t4, dataproc-l4, dataproc-serverless-l4, dataproc-gke-t4, emr-t4, " +
"emr-a10, databricks-aws, and databricks-azure. Default is onprem.",
default = Some("onprem"))
val speedupFactorFile: ScallopOption[String] =
opt[String](required = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,19 @@ class PluginTypeCheckerSuite extends FunSuite with Logging {
assert(checker.getSpeedupFactor("WindowExec") == 4.25)
assert(checker.getSpeedupFactor("Ceil") == 4.25)
}

test("supported operator score from dataproc-l4") {
val checker = new PluginTypeChecker("dataproc-l4")
assert(checker.getSpeedupFactor("UnionExec") == 4.16)
assert(checker.getSpeedupFactor("Ceil") == 4.16)
}

test("supported operator score from dataproc-gke-t4") {
val checker = new PluginTypeChecker("dataproc-gke-t4")
assert(checker.getSpeedupFactor("WindowExec") == 3.65)
assert(checker.getSpeedupFactor("Ceil") == 3.65)
}

test("supported operator score from emr-a10") {
val checker = new PluginTypeChecker("emr-a10")
assert(checker.getSpeedupFactor("UnionExec") == 2.59)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1529,6 +1529,30 @@ class QualificationSuite extends BaseTestSuite {
assert(outputActual.collect().size == 1)
}

// run the qualification tool for dataproc-gke-t4
TrampolineUtil.withTempDir { outpath =>
val appArgs = new QualificationArgs(Array(
"--output-directory",
outpath.getAbsolutePath,
"--platform",
"dataproc-gke-t4",
eventLog))

val (exit, _) =
QualificationMain.mainInternal(appArgs)
assert(exit == 0)

// the code above that runs the Spark query stops the Sparksession
// so create a new one to read in the csv file
createSparkSession()

// validate that the SQL description in the csv file escapes commas properly
val outputResults = s"$outpath/rapids_4_spark_qualification_output/" +
s"rapids_4_spark_qualification_output.csv"
val outputActual = readExpectedFile(new File(outputResults))
assert(outputActual.collect().size == 1)
}

// run the qualification tool for databricks-aws
TrampolineUtil.withTempDir { outpath =>
val appArgs = new QualificationArgs(Array(
Expand Down
Loading