From 3610023f602db6ed2944ac5323a2fa10b692c23b Mon Sep 17 00:00:00 2001 From: Cindy Jiang <47068112+cindyyuanjiang@users.noreply.github.com> Date: Tue, 26 Sep 2023 11:56:07 -0700 Subject: [PATCH] added flatten as supported expr (#587) Signed-off-by: cindyyuanjiang --- .../operatorsScore-databricks-aws.csv | 1 + .../operatorsScore-databricks-azure.csv | 1 + .../resources/operatorsScore-dataproc-l4.csv | 1 + .../resources/operatorsScore-dataproc-t4.csv | 1 + .../main/resources/operatorsScore-emr-a10.csv | 1 + .../main/resources/operatorsScore-emr-t4.csv | 1 + core/src/main/resources/operatorsScore.csv | 1 + core/src/main/resources/supportedExprs.csv | 2 ++ .../tool/planparser/SqlPlanParserSuite.scala | 32 ++++++++++++++++++- .../custom_speedup_factors/operatorsList.csv | 1 + 10 files changed, 41 insertions(+), 1 deletion(-) diff --git a/core/src/main/resources/operatorsScore-databricks-aws.csv b/core/src/main/resources/operatorsScore-databricks-aws.csv index 06a6b7d2d..573b4c57b 100644 --- a/core/src/main/resources/operatorsScore-databricks-aws.csv +++ b/core/src/main/resources/operatorsScore-databricks-aws.csv @@ -102,6 +102,7 @@ Exp,2.45 Explode,2.45 Expm1,2.45 First,2.45 +Flatten,2.45 Floor,2.45 FromUTCTimestamp,2.45 FromUnixTime,2.45 diff --git a/core/src/main/resources/operatorsScore-databricks-azure.csv b/core/src/main/resources/operatorsScore-databricks-azure.csv index def5b1578..6acf4c295 100644 --- a/core/src/main/resources/operatorsScore-databricks-azure.csv +++ b/core/src/main/resources/operatorsScore-databricks-azure.csv @@ -102,6 +102,7 @@ Exp,2.73 Explode,2.73 Expm1,2.73 First,2.73 +Flatten,2.73 Floor,2.73 FromUTCTimestamp,2.73 FromUnixTime,2.73 diff --git a/core/src/main/resources/operatorsScore-dataproc-l4.csv b/core/src/main/resources/operatorsScore-dataproc-l4.csv index 1231b196a..a3d9aeba5 100644 --- a/core/src/main/resources/operatorsScore-dataproc-l4.csv +++ b/core/src/main/resources/operatorsScore-dataproc-l4.csv @@ -102,6 +102,7 @@ Exp,4.16 Explode,4.16 Expm1,4.16 First,4.16 +Flatten,4.16 Floor,4.16 FromUTCTimestamp,4.16 FromUnixTime,4.16 diff --git a/core/src/main/resources/operatorsScore-dataproc-t4.csv b/core/src/main/resources/operatorsScore-dataproc-t4.csv index d91bc9827..20e8cd820 100644 --- a/core/src/main/resources/operatorsScore-dataproc-t4.csv +++ b/core/src/main/resources/operatorsScore-dataproc-t4.csv @@ -102,6 +102,7 @@ Exp,4.88 Explode,4.88 Expm1,4.88 First,4.88 +Flatten,4.88 Floor,4.88 FromUTCTimestamp,4.88 FromUnixTime,4.88 diff --git a/core/src/main/resources/operatorsScore-emr-a10.csv b/core/src/main/resources/operatorsScore-emr-a10.csv index 3c58aa652..ea283c9d8 100644 --- a/core/src/main/resources/operatorsScore-emr-a10.csv +++ b/core/src/main/resources/operatorsScore-emr-a10.csv @@ -102,6 +102,7 @@ Exp,2.59 Explode,2.59 Expm1,2.59 First,2.59 +Flatten,2.59 Floor,2.59 FromUTCTimestamp,2.59 FromUnixTime,2.59 diff --git a/core/src/main/resources/operatorsScore-emr-t4.csv b/core/src/main/resources/operatorsScore-emr-t4.csv index 3630cb5d2..9b517152d 100644 --- a/core/src/main/resources/operatorsScore-emr-t4.csv +++ b/core/src/main/resources/operatorsScore-emr-t4.csv @@ -102,6 +102,7 @@ Exp,2.07 Explode,2.07 Expm1,2.07 First,2.07 +Flatten,2.07 Floor,2.07 FromUTCTimestamp,2.07 FromUnixTime,2.07 diff --git a/core/src/main/resources/operatorsScore.csv b/core/src/main/resources/operatorsScore.csv index b851700b7..5d4a022e1 100644 --- a/core/src/main/resources/operatorsScore.csv +++ b/core/src/main/resources/operatorsScore.csv @@ -107,6 +107,7 @@ Exp,4 Explode,4 Expm1,4 First,4 +Flatten,4 Floor,4 FromUTCTimestamp,4 FromUnixTime,4 diff --git a/core/src/main/resources/supportedExprs.csv b/core/src/main/resources/supportedExprs.csv index 1de488209..3b335fd8f 100644 --- a/core/src/main/resources/supportedExprs.csv +++ b/core/src/main/resources/supportedExprs.csv @@ -202,6 +202,8 @@ Expm1,S,`expm1`,None,project,input,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,N Expm1,S,`expm1`,None,project,result,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA Expm1,S,`expm1`,None,AST,input,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA Expm1,S,`expm1`,None,AST,result,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA +Flatten,S,`flatten`,None,project,input,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA +Flatten,S,`flatten`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA Floor,S,`floor`,None,project,input,NA,NA,NA,NA,S,NA,S,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA Floor,S,`floor`,None,project,result,NA,NA,NA,NA,S,NA,S,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA FromUTCTimestamp,S,`from_utc_timestamp`,None,project,timestamp,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA,NA diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala index a5a7c8262..6772e6140 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala @@ -30,7 +30,7 @@ import org.scalatest.exceptions.TestFailedException import org.apache.spark.sql.TrampolineUtil import org.apache.spark.sql.expressions.Window -import org.apache.spark.sql.functions.{ceil, col, collect_list, count, explode, floor, hex, json_tuple, round, row_number, sum, translate} +import org.apache.spark.sql.functions.{ceil, col, collect_list, count, explode, flatten, floor, hex, json_tuple, round, row_number, sum, translate} import org.apache.spark.sql.rapids.tool.ToolUtils import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo import org.apache.spark.sql.rapids.tool.util.RapidsToolsConfUtil @@ -976,6 +976,36 @@ class SQLPlanParserSuite extends BaseTestSuite { } } + test("flatten is supported in ProjectExec") { + TrampolineUtil.withTempDir { parquetoutputLoc => + TrampolineUtil.withTempDir { eventLogDir => + val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir, + "ProjectExprsSupported") { spark => + import spark.implicits._ + val df1 = Seq(Seq(Seq(1, 2), Seq(3, 4))).toDF("value") + // write df1 to parquet to transform LocalTableScan to ProjectExec + df1.write.parquet(s"$parquetoutputLoc/testtext") + val df2 = spark.read.parquet(s"$parquetoutputLoc/testtext") + // flatten should be part of ProjectExec + df2.select(flatten(df2("value"))) + } + val pluginTypeChecker = new PluginTypeChecker() + val app = createAppFromEventlog(eventLog) + assert(app.sqlPlans.size == 2) + val parsedPlans = app.sqlPlans.map { case (sqlID, plan) => + SQLPlanParser.parseSQLPlan(app.appId, plan, sqlID, "", pluginTypeChecker, app) + } + val allExecInfo = getAllExecsFromPlan(parsedPlans.toSeq) + val wholeStages = allExecInfo.filter(_.exec.contains("WholeStageCodegen")) + assert(wholeStages.size == 1) + assert(wholeStages.forall(_.duration.nonEmpty)) + val allChildren = wholeStages.flatMap(_.children).flatten + val projects = allChildren.filter(_.exec == "Project") + assertSizeAndSupported(1, projects) + } + } + } + test("Parse SQL function Name in HashAggregateExec") { TrampolineUtil.withTempDir { eventLogDir => val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir, "sqlmetric") { spark => diff --git a/user_tools/custom_speedup_factors/operatorsList.csv b/user_tools/custom_speedup_factors/operatorsList.csv index afdd815df..d382da6e7 100644 --- a/user_tools/custom_speedup_factors/operatorsList.csv +++ b/user_tools/custom_speedup_factors/operatorsList.csv @@ -95,6 +95,7 @@ Exp Explode Expm1 First +Flatten Floor FromUTCTimestamp FromUnixTime