Skip to content

Commit

Permalink
added flatten as supported expr (#587)
Browse files Browse the repository at this point in the history
Signed-off-by: cindyyuanjiang <[email protected]>
  • Loading branch information
cindyyuanjiang authored Sep 26, 2023
1 parent 8137d8f commit 3610023
Show file tree
Hide file tree
Showing 10 changed files with 41 additions and 1 deletion.
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-databricks-aws.csv
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Exp,2.45
Explode,2.45
Expm1,2.45
First,2.45
Flatten,2.45
Floor,2.45
FromUTCTimestamp,2.45
FromUnixTime,2.45
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Exp,2.73
Explode,2.73
Expm1,2.73
First,2.73
Flatten,2.73
Floor,2.73
FromUTCTimestamp,2.73
FromUnixTime,2.73
Expand Down
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-dataproc-l4.csv
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Exp,4.16
Explode,4.16
Expm1,4.16
First,4.16
Flatten,4.16
Floor,4.16
FromUTCTimestamp,4.16
FromUnixTime,4.16
Expand Down
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-dataproc-t4.csv
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Exp,4.88
Explode,4.88
Expm1,4.88
First,4.88
Flatten,4.88
Floor,4.88
FromUTCTimestamp,4.88
FromUnixTime,4.88
Expand Down
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-emr-a10.csv
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Exp,2.59
Explode,2.59
Expm1,2.59
First,2.59
Flatten,2.59
Floor,2.59
FromUTCTimestamp,2.59
FromUnixTime,2.59
Expand Down
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-emr-t4.csv
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Exp,2.07
Explode,2.07
Expm1,2.07
First,2.07
Flatten,2.07
Floor,2.07
FromUTCTimestamp,2.07
FromUnixTime,2.07
Expand Down
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore.csv
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ Exp,4
Explode,4
Expm1,4
First,4
Flatten,4
Floor,4
FromUTCTimestamp,4
FromUnixTime,4
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/resources/supportedExprs.csv
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ Expm1,S,`expm1`,None,project,input,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,N
Expm1,S,`expm1`,None,project,result,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Expm1,S,`expm1`,None,AST,input,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Expm1,S,`expm1`,None,AST,result,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Flatten,S,`flatten`,None,project,input,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA
Flatten,S,`flatten`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA
Floor,S,`floor`,None,project,input,NA,NA,NA,NA,S,NA,S,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA
Floor,S,`floor`,None,project,result,NA,NA,NA,NA,S,NA,S,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA
FromUTCTimestamp,S,`from_utc_timestamp`,None,project,timestamp,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA,NA
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.scalatest.exceptions.TestFailedException

import org.apache.spark.sql.TrampolineUtil
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{ceil, col, collect_list, count, explode, floor, hex, json_tuple, round, row_number, sum, translate}
import org.apache.spark.sql.functions.{ceil, col, collect_list, count, explode, flatten, floor, hex, json_tuple, round, row_number, sum, translate}
import org.apache.spark.sql.rapids.tool.ToolUtils
import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo
import org.apache.spark.sql.rapids.tool.util.RapidsToolsConfUtil
Expand Down Expand Up @@ -976,6 +976,36 @@ class SQLPlanParserSuite extends BaseTestSuite {
}
}

test("flatten is supported in ProjectExec") {
TrampolineUtil.withTempDir { parquetoutputLoc =>
TrampolineUtil.withTempDir { eventLogDir =>
val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir,
"ProjectExprsSupported") { spark =>
import spark.implicits._
val df1 = Seq(Seq(Seq(1, 2), Seq(3, 4))).toDF("value")
// write df1 to parquet to transform LocalTableScan to ProjectExec
df1.write.parquet(s"$parquetoutputLoc/testtext")
val df2 = spark.read.parquet(s"$parquetoutputLoc/testtext")
// flatten should be part of ProjectExec
df2.select(flatten(df2("value")))
}
val pluginTypeChecker = new PluginTypeChecker()
val app = createAppFromEventlog(eventLog)
assert(app.sqlPlans.size == 2)
val parsedPlans = app.sqlPlans.map { case (sqlID, plan) =>
SQLPlanParser.parseSQLPlan(app.appId, plan, sqlID, "", pluginTypeChecker, app)
}
val allExecInfo = getAllExecsFromPlan(parsedPlans.toSeq)
val wholeStages = allExecInfo.filter(_.exec.contains("WholeStageCodegen"))
assert(wholeStages.size == 1)
assert(wholeStages.forall(_.duration.nonEmpty))
val allChildren = wholeStages.flatMap(_.children).flatten
val projects = allChildren.filter(_.exec == "Project")
assertSizeAndSupported(1, projects)
}
}
}

test("Parse SQL function Name in HashAggregateExec") {
TrampolineUtil.withTempDir { eventLogDir =>
val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir, "sqlmetric") { spark =>
Expand Down
1 change: 1 addition & 0 deletions user_tools/custom_speedup_factors/operatorsList.csv
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ Exp
Explode
Expm1
First
Flatten
Floor
FromUTCTimestamp
FromUnixTime
Expand Down

0 comments on commit 3610023

Please sign in to comment.