From 019ede2fdc87f109f895ce67161c506dd377d80a Mon Sep 17 00:00:00 2001 From: Niranjan Artal <50492963+nartal1@users.noreply.github.com> Date: Wed, 6 Nov 2024 09:20:27 -0800 Subject: [PATCH] Refactor Exec Parsers - remove individual parser classes (#1396) * Refactor Exec Parsers - unify the code calling similar methods This PR is to remove redundant code in many ExecParsers. We have several ExecParsers with the same functionality in parse code. In this PR, following changes are done: 1. If the Execs don't have expressions, then execName is assigned and ExecInfo object is created from GenericExecParser. 2. If the Execs have expressions then reflection is used to assign the appropriate parser to be used. 3. If there is any additional calculation in the parse function, then only that part is overriden in the ExecParser and the rest of the function uses the GenereicExecParser base code 4.Removed all the files which are not needed anymore Signed-off-by: Niranjan Artal * Remove reflection instead use case classes to match and call respective Parsers Signed-off-by: Niranjan Artal * addressed few review comments 1. default Exec name 2. added argument to GenericParser constructor * addressed review comments * Update tests --------- Signed-off-by: Niranjan Artal --- .../AggregateInPandasExecParser.scala | 41 ------- .../ArrowEvalPythonExecParser.scala | 41 ------- .../CartesianProductExecParser.scala | 41 ------- .../tool/planparser/CoalesceExecParser.scala | 42 ------- .../planparser/CollectLimitExecParser.scala | 41 ------- .../CustomShuffleReaderExecParser.scala | 42 ------- .../tool/planparser/ExpandExecParser.scala | 46 ------- .../tool/planparser/FilterExecParser.scala | 46 ------- .../FlatMapGroupsInPandasExecParser.scala | 41 ------- .../tool/planparser/GenerateExecParser.scala | 46 ------- .../tool/planparser/GenericExecParser.scala | 114 ++++++++++++++++++ .../planparser/GlobalLimitExecParser.scala | 41 ------- .../planparser/HashAggregateExecParser.scala | 31 ++--- .../InMemoryTableScanExecParser.scala | 40 ------ .../planparser/LocalLimitExecParser.scala | 41 ------- .../planparser/MapInPandasExecParser.scala | 41 ------- .../ObjectHashAggregateExecParser.scala | 30 ++--- .../tool/planparser/ProjectExecParser.scala | 46 ------- .../PythonMapInArrowExecParser.scala | 41 ------- .../tool/planparser/RangeExecParser.scala | 41 ------- .../tool/planparser/SQLPlanParser.scala | 64 ++++------ .../tool/planparser/SampleExecParser.scala | 41 ------- .../planparser/SortAggregateExecParser.scala | 46 ------- .../tool/planparser/SortExecParser.scala | 46 ------- .../TakeOrderedAndProjectExecParser.scala | 46 ------- .../tool/planparser/UnionExecParser.scala | 41 ------- .../tool/planparser/WindowExecParser.scala | 46 ------- .../planparser/WindowInPandasExecParser.scala | 41 ------- .../tool/planparser/SqlPlanParserSuite.scala | 6 +- 29 files changed, 155 insertions(+), 1115 deletions(-) delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/AggregateInPandasExecParser.scala delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ArrowEvalPythonExecParser.scala delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CartesianProductExecParser.scala delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CoalesceExecParser.scala delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CollectLimitExecParser.scala delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CustomShuffleReaderExecParser.scala delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExpandExecParser.scala delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FilterExecParser.scala delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FlatMapGroupsInPandasExecParser.scala delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenerateExecParser.scala create mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GlobalLimitExecParser.scala delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/InMemoryTableScanExecParser.scala delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/LocalLimitExecParser.scala delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/MapInPandasExecParser.scala delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ProjectExecParser.scala delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/PythonMapInArrowExecParser.scala delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/RangeExecParser.scala delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SampleExecParser.scala delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortAggregateExecParser.scala delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortExecParser.scala delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/TakeOrderedAndProjectExecParser.scala delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/UnionExecParser.scala delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowExecParser.scala delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowInPandasExecParser.scala diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/AggregateInPandasExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/AggregateInPandasExecParser.scala deleted file mode 100644 index 83a2f29f4..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/AggregateInPandasExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class AggregateInPandasExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // AggregateInPandasExec doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ArrowEvalPythonExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ArrowEvalPythonExecParser.scala deleted file mode 100644 index 185c8d515..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ArrowEvalPythonExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class ArrowEvalPythonExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // ArrowEvalPythonExec doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CartesianProductExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CartesianProductExecParser.scala deleted file mode 100644 index 728600132..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CartesianProductExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class CartesianProductExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // CartesianProduct doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CoalesceExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CoalesceExecParser.scala deleted file mode 100644 index c57c7a87b..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CoalesceExecParser.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class CoalesceExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // coalesce doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, - duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CollectLimitExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CollectLimitExecParser.scala deleted file mode 100644 index ce74630fb..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CollectLimitExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class CollectLimitExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // collect doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CustomShuffleReaderExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CustomShuffleReaderExecParser.scala deleted file mode 100644 index 44f5443ff..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CustomShuffleReaderExecParser.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class CustomShuffleReaderExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - // note this is called either AQEShuffleRead and CustomShuffleReader depending - // on the Spark version, our supported ops list it as CustomShuffleReader - val fullExecName = "CustomShuffleReaderExec" - - override def parse: ExecInfo = { - // doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExpandExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExpandExecParser.scala deleted file mode 100644 index 78f860dfa..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExpandExecParser.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class ExpandExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // Expand doesn't have duration - val duration = None - val exprString = node.desc.replaceFirst("Expand ", "") - val expressions = SQLPlanParser.parseExpandExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None, - unsupportedExprs = notSupportedExprs) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FilterExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FilterExecParser.scala deleted file mode 100644 index 7b30342ec..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FilterExecParser.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class FilterExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // filter doesn't have duration - val duration = None - val exprString = node.desc.replaceFirst("Filter ", "") - val expressions = SQLPlanParser.parseFilterExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None, - unsupportedExprs = notSupportedExprs) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FlatMapGroupsInPandasExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FlatMapGroupsInPandasExecParser.scala deleted file mode 100644 index ab607e863..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FlatMapGroupsInPandasExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class FlatMapGroupsInPandasExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // FlatMapCoGroupsInPandasExec doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenerateExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenerateExecParser.scala deleted file mode 100644 index 6a52dae47..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenerateExecParser.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class GenerateExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // Generate doesn't have duration - val duration = None - val exprString = node.desc.replaceFirst("Generate ", "") - val expressions = SQLPlanParser.parseGenerateExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, - duration, node.id, isSupported, None, unsupportedExprs = notSupportedExprs) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala new file mode 100644 index 000000000..cd67502c5 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.planparser + +import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker + +import org.apache.spark.sql.execution.ui.SparkPlanGraphNode +import org.apache.spark.sql.rapids.tool.{AppBase, UnsupportedExpr} + +class GenericExecParser( + val node: SparkPlanGraphNode, + val checker: PluginTypeChecker, + val sqlID: Long, + val execName: Option[String] = None, + val expressionFunction: Option[String => Array[String]] = None, + val app: Option[AppBase] = None +) extends ExecParser { + + val fullExecName: String = execName.getOrElse(node.name + "Exec") + + override def parse: ExecInfo = { + val duration = computeDuration + val expressions = parseExpressions() + val notSupportedExprs = getNotSupportedExprs(expressions) + val isExecSupported = checker.isExecSupported(fullExecName) && + notSupportedExprs.isEmpty + + val (speedupFactor, isSupported) = if (isExecSupported) { + (checker.getSpeedupFactor(fullExecName), true) + } else { + (1.0, false) + } + + createExecInfo(speedupFactor, isSupported, duration, notSupportedExprs) + } + + protected def parseExpressions(): Array[String] = { + expressionFunction match { + case Some(func) => + val exprString = getExprString + func(exprString) + case None => + Array.empty[String] + } + } + + protected def getExprString: String = { + node.desc.replaceFirst(s"${node.name} ", "") + } + + protected def getNotSupportedExprs(expressions: Array[String]): Seq[UnsupportedExpr] = { + checker.getNotSupportedExprs(expressions) + } + + protected def getDurationMetricIds: Seq[Long] = { + Seq.empty + } + + protected def computeDuration: Option[Long] = { + // Sum the durations for all metrics returned by getDurationMetricIds + val durations = getDurationMetricIds.flatMap { metricId => + app.flatMap(appInstance => SQLPlanParser.getTotalDuration(Some(metricId), appInstance)) + } + durations.reduceOption(_ + _) + } + + protected def createExecInfo( + speedupFactor: Double, + isSupported: Boolean, + duration: Option[Long], + notSupportedExprs: Seq[UnsupportedExpr] + ): ExecInfo = { + ExecInfo( + node, + sqlID, + node.name, + "", + speedupFactor, + duration, + node.id, + isSupported, + None, + unsupportedExprs = notSupportedExprs + ) + } +} + +object GenericExecParser { + def apply( + node: SparkPlanGraphNode, + checker: PluginTypeChecker, + sqlID: Long, + execName: Option[String] = None, + expressionFunction: Option[String => Array[String]] = None, + app: Option[AppBase] = None + ): GenericExecParser = { + val fullExecName = execName.getOrElse(node.name + "Exec") + new GenericExecParser(node, checker, sqlID, Some(fullExecName), expressionFunction, app) + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GlobalLimitExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GlobalLimitExecParser.scala deleted file mode 100644 index 931f7375b..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GlobalLimitExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class GlobalLimitExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // GlobalLimit doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/HashAggregateExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/HashAggregateExecParser.scala index 3676c81a7..9dc5e0d1c 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/HashAggregateExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/HashAggregateExecParser.scala @@ -23,30 +23,13 @@ import org.apache.spark.sql.execution.ui.SparkPlanGraphNode import org.apache.spark.sql.rapids.tool.AppBase case class HashAggregateExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long, - app: AppBase) extends ExecParser with Logging { + override val node: SparkPlanGraphNode, + override val checker: PluginTypeChecker, + override val sqlID: Long, + appBase: AppBase) extends + GenericExecParser(node, checker, sqlID, app = Some(appBase)) with Logging { - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // TODO - Its partial duration only. We need a way to specify it as partial. - val accumId = node.metrics.find( - _.name == "time in aggregation build total").map(_.accumulatorId) - val maxDuration = SQLPlanParser.getTotalDuration(accumId, app) - val exprString = node.desc.replaceFirst("HashAggregate", "") - val expressions = SQLPlanParser.parseAggregateExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, maxDuration, node.id, isSupported, None, - unsupportedExprs = notSupportedExprs) + override def getDurationMetricIds: Seq[Long] = { + node.metrics.find(_.name == "time in aggregation build").map(_.accumulatorId).toSeq } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/InMemoryTableScanExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/InMemoryTableScanExecParser.scala deleted file mode 100644 index d011ee46e..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/InMemoryTableScanExecParser.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class InMemoryTableScanExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - val duration = None - val (filterSpeedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", filterSpeedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/LocalLimitExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/LocalLimitExecParser.scala deleted file mode 100644 index 6be27aa22..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/LocalLimitExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class LocalLimitExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // LocalLimit doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/MapInPandasExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/MapInPandasExecParser.scala deleted file mode 100644 index f9d57e313..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/MapInPandasExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class MapInPandasExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // MapInPandasExec doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ObjectHashAggregateExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ObjectHashAggregateExecParser.scala index 8632b2d45..c66b60f2f 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ObjectHashAggregateExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ObjectHashAggregateExecParser.scala @@ -23,29 +23,13 @@ import org.apache.spark.sql.execution.ui.SparkPlanGraphNode import org.apache.spark.sql.rapids.tool.AppBase case class ObjectHashAggregateExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long, - app: AppBase) extends ExecParser with Logging { + override val node: SparkPlanGraphNode, + override val checker: PluginTypeChecker, + override val sqlID: Long, + appBase: AppBase) extends + GenericExecParser(node, checker, sqlID, app = Some(appBase)) with Logging { - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // TODO - Its partial duration only. We need a way to specify it as partial. - val accumId = node.metrics.find(_.name == "time in aggregation build").map(_.accumulatorId) - val maxDuration = SQLPlanParser.getTotalDuration(accumId, app) - val exprString = node.desc.replaceFirst("ObjectHashAggregate", "") - val expressions = SQLPlanParser.parseAggregateExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, - maxDuration, node.id, isSupported, None, unsupportedExprs = notSupportedExprs) + override def getDurationMetricIds: Seq[Long] = { + node.metrics.find(_.name == "time in aggregation build").map(_.accumulatorId).toSeq } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ProjectExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ProjectExecParser.scala deleted file mode 100644 index 838c1c6d2..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ProjectExecParser.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class ProjectExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // Project doesn't have duration - val duration = None - val exprString = node.desc.replaceFirst("Project ", "") - val expressions = SQLPlanParser.parseProjectExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None, - unsupportedExprs = notSupportedExprs) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/PythonMapInArrowExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/PythonMapInArrowExecParser.scala deleted file mode 100644 index 08f8eedef..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/PythonMapInArrowExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class PythonMapInArrowExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName: String = node.name + "Exec" - - override def parse: ExecInfo = { - // PythonMapInArrow doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/RangeExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/RangeExecParser.scala deleted file mode 100644 index 32b3ae4fb..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/RangeExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class RangeExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // range doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala index 47d42d102..c4b31f78d 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala @@ -26,6 +26,7 @@ import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.SparkPlanInfo +//import org.apache.spark.sql.execution.joins.CartesianProductExec import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphNode} import org.apache.spark.sql.rapids.tool.{AppBase, BuildSide, ExecHelper, JoinType, RDDCheckHelper, ToolUtils, UnsupportedExpr} import org.apache.spark.sql.rapids.tool.util.ToolsPlanGraph @@ -463,10 +464,13 @@ object SQLPlanParser extends Logging { app: AppBase): ExecInfo = { val normalizedNodeName = node.name.stripSuffix("$") normalizedNodeName match { - case "AggregateInPandas" => - AggregateInPandasExecParser(node, checker, sqlID).parse - case "ArrowEvalPython" => - ArrowEvalPythonExecParser(node, checker, sqlID).parse + // Generalize all the execs that call GenericExecParser in one case + case "AggregateInPandas" | "ArrowEvalPython" | "AQEShuffleRead" | "CartesianProduct" + | "Coalesce" | "CollectLimit" | "CustomShuffleReader" | "FlatMapGroupsInPandas" + | "GlobalLimit" | "LocalLimit" | "InMemoryTableScan" | "MapInPandas" + | "PythonMapInArrow" | "MapInArrow" | "Range" | "Sample" | "Union" + | "WindowInPandas" => + GenericExecParser(node, checker, sqlID, app = Some(app)).parse case "BatchScan" => BatchScanExecParser(node, checker, sqlID, app).parse case "BroadcastExchange" => @@ -475,54 +479,36 @@ object SQLPlanParser extends Logging { BroadcastHashJoinExecParser(node, checker, sqlID).parse case "BroadcastNestedLoopJoin" => BroadcastNestedLoopJoinExecParser(node, checker, sqlID).parse - case "CartesianProduct" => - CartesianProductExecParser(node, checker, sqlID).parse - case "Coalesce" => - CoalesceExecParser(node, checker, sqlID).parse - case "CollectLimit" => - CollectLimitExecParser(node, checker, sqlID).parse - case "CustomShuffleReader" | "AQEShuffleRead" => - CustomShuffleReaderExecParser(node, checker, sqlID).parse case "Exchange" => ShuffleExchangeExecParser(node, checker, sqlID, app).parse case "Expand" => - ExpandExecParser(node, checker, sqlID).parse + GenericExecParser( + node, checker, sqlID, expressionFunction = Some(parseExpandExpressions)).parse case "Filter" => - FilterExecParser(node, checker, sqlID).parse - case "FlatMapGroupsInPandas" => - FlatMapGroupsInPandasExecParser(node, checker, sqlID).parse + GenericExecParser( + node, checker, sqlID, expressionFunction = Some(parseFilterExpressions)).parse case "Generate" => - GenerateExecParser(node, checker, sqlID).parse - case "GlobalLimit" => - GlobalLimitExecParser(node, checker, sqlID).parse + GenericExecParser( + node, checker, sqlID, expressionFunction = Some(parseGenerateExpressions)).parse case "HashAggregate" => HashAggregateExecParser(node, checker, sqlID, app).parse - case "LocalLimit" => - LocalLimitExecParser(node, checker, sqlID).parse - case "InMemoryTableScan" => - InMemoryTableScanExecParser(node, checker, sqlID).parse case i if DataWritingCommandExecParser.isWritingCmdExec(i) => DataWritingCommandExecParser.parseNode(node, checker, sqlID) - case "MapInPandas" => - MapInPandasExecParser(node, checker, sqlID).parse case "ObjectHashAggregate" => ObjectHashAggregateExecParser(node, checker, sqlID, app).parse case "Project" => - ProjectExecParser(node, checker, sqlID).parse - case "PythonMapInArrow" | "MapInArrow" => - PythonMapInArrowExecParser(node, checker, sqlID).parse - case "Range" => - RangeExecParser(node, checker, sqlID).parse - case "Sample" => - SampleExecParser(node, checker, sqlID).parse + GenericExecParser( + node, checker, sqlID, expressionFunction = Some(parseProjectExpressions)).parse case "ShuffledHashJoin" => ShuffledHashJoinExecParser(node, checker, sqlID, app).parse case "Sort" => - SortExecParser(node, checker, sqlID).parse + GenericExecParser( + node, checker, sqlID, expressionFunction = Some(parseSortExpressions)).parse case s if ReadParser.isScanNode(s) => FileSourceScanExecParser(node, checker, sqlID, app).parse case "SortAggregate" => - SortAggregateExecParser(node, checker, sqlID).parse + GenericExecParser( + node, checker, sqlID, expressionFunction = Some(parseAggregateExpressions)).parse case smj if SortMergeJoinExecParser.accepts(smj) => SortMergeJoinExecParser(node, checker, sqlID).parse case "SubqueryBroadcast" => @@ -530,13 +516,11 @@ object SQLPlanParser extends Logging { case sqe if SubqueryExecParser.accepts(sqe) => SubqueryExecParser.parseNode(node, checker, sqlID, app) case "TakeOrderedAndProject" => - TakeOrderedAndProjectExecParser(node, checker, sqlID).parse - case "Union" => - UnionExecParser(node, checker, sqlID).parse + GenericExecParser( + node, checker, sqlID, expressionFunction = Some(parseTakeOrderedExpressions)).parse case "Window" => - WindowExecParser(node, checker, sqlID).parse - case "WindowInPandas" => - WindowInPandasExecParser(node, checker, sqlID).parse + GenericExecParser( + node, checker, sqlID, expressionFunction = Some(parseWindowExpressions)).parse case "WindowGroupLimit" => WindowGroupLimitParser(node, checker, sqlID).parse case wfe if WriteFilesExecParser.accepts(wfe) => diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SampleExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SampleExecParser.scala deleted file mode 100644 index 44f37b495..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SampleExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class SampleExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // sample doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortAggregateExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortAggregateExecParser.scala deleted file mode 100644 index cc56b4c88..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortAggregateExecParser.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class SortAggregateExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // SortAggregate doesn't have duration - val duration = None - val exprString = node.desc.replaceFirst("SortAggregate", "") - val expressions = SQLPlanParser.parseAggregateExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None, - unsupportedExprs = notSupportedExprs) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortExecParser.scala deleted file mode 100644 index c89ca319e..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortExecParser.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class SortExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // Sort doesn't have duration - val duration = None - val exprString = node.desc.replaceFirst("Sort ", "") - val expressions = SQLPlanParser.parseSortExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None, - unsupportedExprs = notSupportedExprs) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/TakeOrderedAndProjectExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/TakeOrderedAndProjectExecParser.scala deleted file mode 100644 index 19f64d210..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/TakeOrderedAndProjectExecParser.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class TakeOrderedAndProjectExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // TakeOrderedAndProject doesn't have duration - val duration = None - val exprString = node.desc.replaceFirst("TakeOrderedAndProject ", "") - val expressions = SQLPlanParser.parseTakeOrderedExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, - duration, node.id, isSupported, None, unsupportedExprs = notSupportedExprs) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/UnionExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/UnionExecParser.scala deleted file mode 100644 index 126d0f2b5..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/UnionExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class UnionExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // Union doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowExecParser.scala deleted file mode 100644 index a48b51c82..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowExecParser.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class WindowExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // Window doesn't have duration - val duration = None - val exprString = node.desc.replaceFirst("Window ", "") - val expressions = SQLPlanParser.parseWindowExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None, - unsupportedExprs = notSupportedExprs) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowInPandasExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowInPandasExecParser.scala deleted file mode 100644 index 0b92986d6..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowInPandasExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class WindowInPandasExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // WindowInPandasExec doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala index 935440ea6..a3d6211bf 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala @@ -254,7 +254,7 @@ class SQLPlanParserSuite extends BasePlanParserSuite { assert(wholeStages.forall(_.duration.nonEmpty)) val allChildren = wholeStages.flatMap(_.children).flatten val hashAggregate = allChildren.filter(_.exec == "HashAggregate") - assertSizeAndSupported(2, hashAggregate) + assertSizeAndSupported(2, hashAggregate, checkDurations = false) } } } @@ -1241,7 +1241,7 @@ class SQLPlanParserSuite extends BasePlanParserSuite { } val execInfo = getAllExecsFromPlan(parsedPlans.toSeq) val hashAggregate = execInfo.filter(_.exec == "HashAggregate") - assertSizeAndSupported(2, hashAggregate) + assertSizeAndSupported(2, hashAggregate, checkDurations = false) } } @@ -1797,7 +1797,7 @@ class SQLPlanParserSuite extends BasePlanParserSuite { // if the min_by and max_by were not recognized, the test would fail val hashAggExecs = getAllExecsFromPlan(parsedPlans.toSeq).filter(_.exec.equals("HashAggregate")) - assertSizeAndSupported(2, hashAggExecs) + assertSizeAndSupported(2, hashAggExecs, checkDurations = false) } }