diff --git a/core/src/main/resources/execParserMappings/execParser.json b/core/src/main/resources/execParserMappings/execParser.json new file mode 100644 index 000000000..dab562cab --- /dev/null +++ b/core/src/main/resources/execParserMappings/execParser.json @@ -0,0 +1,117 @@ +{ + "parsers": [ + { + "execName": "AggregateInPandas", + "parseExpressions": false + }, + { + "execName": "ArrowEvalPython", + "parseExpressions": false + }, + { + "execName": "CartesianProduct", + "parseExpressions": false + }, + { + "execName": "Coalesce", + "parseExpressions": false + }, + { + "execName": "CollectLimit", + "parseExpressions": false + }, + { + "execName": "CustomShuffleReader", + "parseExpressions": false + }, + { + "execName": "FlatMapGroupsInPandas", + "parseExpressions": false + }, + { + "execName": "InMemoryTableScan", + "parseExpressions": false + }, + { + "execName": "LocalLimit", + "parseExpressions": false + }, + { + "execName": "GlobalLimit", + "parseExpressions": false + }, + { + "execName": "MapInArrow", + "parseExpressions": false + }, + { + "execName": "MapInPandas", + "parseExpressions": false + }, + { + "execName": "PythonMapInArrow", + "parseExpressions": false + }, + { + "execName": "Range", + "parseExpressions": false + }, + { + "execName": "Sample", + "parseExpressions": false + }, + { + "execName": "Union", + "parseExpressions": false + }, + { + "execName": "WindowInPandas", + "parseExpressions": false + }, + { + "execName": "Expand", + "parseExpressions": true, + "expressionParserMethod": "parseExpandExpressions" + }, + { + "execName": "Filter", + "parseExpressions": true, + "expressionParserMethod": "parseFilterExpressions" + }, + { + "execName": "Generate", + "parseExpressions": true, + "expressionParserMethod": "parseGenerateExpressions" + }, + { + "execName": "ObjectHashAggregate", + "parseExpressions": true, + "expressionParserMethod": "parseAggregateExpressions" + }, + { + "execName": "Project", + "parseExpressions": true, + "expressionParserMethod": "parseProjectExpressions" + }, + { + "execName": "SortAggregate", + "parseExpressions": true, + "expressionParserMethod": "parseAggregateExpressions" + }, + { + "execName": "Sort", + "parseExpressions": true, + "expressionParserMethod": "parseSortExpressions" + }, + { + "execName": "TakeOrderedAndProject", + "parseExpressions": true, + "expressionParserMethod": "parseTakeOrderedExpressions" + }, + { + "execName": "Window", + "parseExpressions": true, + "expressionParserMethod": "parseWindowExpressions" + } + ] +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/AggregateInPandasExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/AggregateInPandasExecParser.scala deleted file mode 100644 index 83a2f29f4..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/AggregateInPandasExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class AggregateInPandasExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // AggregateInPandasExec doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ArrowEvalPythonExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ArrowEvalPythonExecParser.scala deleted file mode 100644 index 185c8d515..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ArrowEvalPythonExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class ArrowEvalPythonExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // ArrowEvalPythonExec doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CartesianProductExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CartesianProductExecParser.scala deleted file mode 100644 index 728600132..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CartesianProductExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class CartesianProductExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // CartesianProduct doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CoalesceExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CoalesceExecParser.scala deleted file mode 100644 index c57c7a87b..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CoalesceExecParser.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class CoalesceExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // coalesce doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, - duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CollectLimitExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CollectLimitExecParser.scala deleted file mode 100644 index ce74630fb..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CollectLimitExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class CollectLimitExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // collect doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CustomShuffleReaderExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CustomShuffleReaderExecParser.scala index 44f5443ff..b70258b5a 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CustomShuffleReaderExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CustomShuffleReaderExecParser.scala @@ -21,22 +21,11 @@ import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker import org.apache.spark.sql.execution.ui.SparkPlanGraphNode case class CustomShuffleReaderExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { + override val node: SparkPlanGraphNode, + override val checker: PluginTypeChecker, + override val sqlID: Long) extends GenericExecParser(node, checker,sqlID) { // note this is called either AQEShuffleRead and CustomShuffleReader depending // on the Spark version, our supported ops list it as CustomShuffleReader - val fullExecName = "CustomShuffleReaderExec" - - override def parse: ExecInfo = { - // doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } + override val fullExecName = "CustomShuffleReaderExec" } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExecParserLoader.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExecParserLoader.scala new file mode 100644 index 000000000..223c23ba8 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExecParserLoader.scala @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.planparser + +import java.nio.file.Paths + +import scala.collection.mutable +import scala.reflect.runtime.universe._ + +import org.json4s._ +import org.json4s.jackson.JsonMethods + +import org.apache.spark.sql.rapids.tool.util.UTF8Source + +object ReflectionCache { + private val methodCache = mutable.Map[String, MethodSymbol]() + + def getMethodSymbol(classType: Type, methodName: String): MethodSymbol = { + methodCache.getOrElseUpdate(methodName, { + classType.decl(TermName(methodName)).asMethod + }) + } +} + +object ExecParserLoader { + case class ParserConfig( + execName: String, + parseExpressions: Boolean, + expressionParserMethod: Option[MethodSymbol], + durationMetrics: Option[List[String]] + ) + + val EXEC_PARSER_DIRECTORY= "execParserMappings" + val DEFAULT_EXEC_PARSER_FILE = "execParser.json" + + implicit val formats: Formats = DefaultFormats + + lazy val mirror = runtimeMirror(getClass.getClassLoader) + lazy val sqlPlanParserModule = mirror.staticModule( + "com.nvidia.spark.rapids.tool.planparser.SQLPlanParser") + lazy val sqlPlanParserType = sqlPlanParserModule.moduleClass.asType.toType + + val mappingFile = Paths.get(EXEC_PARSER_DIRECTORY, DEFAULT_EXEC_PARSER_FILE).toString + val jsonString = UTF8Source.fromResource(mappingFile).mkString + val json = JsonMethods.parse(jsonString) + val parsersList = (json \ "parsers").extract[List[Map[String, Any]]] + + lazy val parsersConfig: Map[String, ParserConfig] = { + parsersList.map { parserMap => + val execName = parserMap("execName").asInstanceOf[String] + val parseExpressions = parserMap.get( + "parseExpressions").map(_.asInstanceOf[Boolean]).getOrElse(false) + val expressionParserMethodName = parserMap.get( + "expressionParserMethod").map(_.asInstanceOf[String]) + val expressionParserMethod = expressionParserMethodName.map { methodName => + ReflectionCache.getMethodSymbol(sqlPlanParserType, methodName) + } + val durationMetrics = parserMap.get("durationMetrics").map(_.asInstanceOf[List[String]]) + + execName -> ParserConfig( + execName, + parseExpressions, + expressionParserMethod, + durationMetrics + ) + }.toMap + } + + def getConfig(execName: String): Option[ParserConfig] = parsersConfig.get(execName) +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExpandExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExpandExecParser.scala deleted file mode 100644 index 78f860dfa..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExpandExecParser.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class ExpandExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // Expand doesn't have duration - val duration = None - val exprString = node.desc.replaceFirst("Expand ", "") - val expressions = SQLPlanParser.parseExpandExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None, - unsupportedExprs = notSupportedExprs) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FilterExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FilterExecParser.scala deleted file mode 100644 index 7b30342ec..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FilterExecParser.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class FilterExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // filter doesn't have duration - val duration = None - val exprString = node.desc.replaceFirst("Filter ", "") - val expressions = SQLPlanParser.parseFilterExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None, - unsupportedExprs = notSupportedExprs) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FlatMapGroupsInPandasExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FlatMapGroupsInPandasExecParser.scala deleted file mode 100644 index ab607e863..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FlatMapGroupsInPandasExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class FlatMapGroupsInPandasExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // FlatMapCoGroupsInPandasExec doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenerateExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenerateExecParser.scala deleted file mode 100644 index 6a52dae47..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenerateExecParser.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class GenerateExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // Generate doesn't have duration - val duration = None - val exprString = node.desc.replaceFirst("Generate ", "") - val expressions = SQLPlanParser.parseGenerateExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, - duration, node.id, isSupported, None, unsupportedExprs = notSupportedExprs) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala new file mode 100644 index 000000000..43dd7ae65 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.planparser + +import scala.reflect.runtime.universe._ + +import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker + +import org.apache.spark.sql.execution.ui.SparkPlanGraphNode +import org.apache.spark.sql.rapids.tool.{AppBase, UnsupportedExpr} + +class GenericExecParser( + val node: SparkPlanGraphNode, + val checker: PluginTypeChecker, + val sqlID: Long, + val app: Option[AppBase] = None +) extends ExecParser { + + val fullExecName: String = node.name + "Exec" + val parserConfig: Option[ExecParserLoader.ParserConfig] = + ExecParserLoader.getConfig(node.name) + // Cache to store reflected methods to avoid redundant reflection calls + private val methodCache = scala.collection.mutable.Map[String, MethodMirror]() + + override def parse: ExecInfo = { + val duration = computeDuration + val expressions = parseExpressions() + val notSupportedExprs = getNotSupportedExprs(expressions) + val isExecSupported = checker.isExecSupported(fullExecName) && + notSupportedExprs.isEmpty && + isSupportedByDefault + + val (speedupFactor, isSupported) = if (isExecSupported) { + (checker.getSpeedupFactor(fullExecName), true) + } else { + (1.0, false) + } + + createExecInfo(speedupFactor, isSupported, duration, notSupportedExprs) + } + + protected def parseExpressions(): Array[String] = { + parserConfig match { + case Some(config) if config.parseExpressions => + val exprString = getExprString + val methodSymbol = config.expressionParserMethod.getOrElse { + throw new IllegalArgumentException( + s"Expression parser method not specified for ${node.name}") + } + invokeCachedParserMethod(methodSymbol, exprString) + + case _ => Array.empty[String] // Default behavior when parsing is not required + } + } + + // Helper method to invoke the parser method with caching + private def invokeCachedParserMethod( + methodSymbol: MethodSymbol, exprString: String): Array[String] = { + // This is to check if the method is already cached, else reflect and cache it + val cachedMethod = methodCache.getOrElseUpdate(methodSymbol.fullName, { + val mirror = runtimeMirror(getClass.getClassLoader) + val module = mirror.reflectModule(mirror.staticModule( + "com.nvidia.spark.rapids.tool.planparser.SQLPlanParser")) + val instanceMirror = mirror.reflect(module.instance) + instanceMirror.reflectMethod(methodSymbol) // Cache this reflected method + }) + + cachedMethod(exprString) match { + case expressions: Array[String] => expressions + case _ => throw new IllegalArgumentException( + s"Unexpected return type from method: ${methodSymbol.name}") + } + } + + protected def getExprString: String = { + node.desc.replaceFirst(s"${node.name} ", "") + } + + protected def getNotSupportedExprs(expressions: Array[String]): Seq[UnsupportedExpr] = { + checker.getNotSupportedExprs(expressions) + } + + // Compute duration based on the node metrics of that ExecNode + protected def computeDuration: Option[Long] = { + None + } + + protected def isSupportedByDefault: Boolean = true + + protected def createExecInfo( + speedupFactor: Double, + isSupported: Boolean, + duration: Option[Long], + notSupportedExprs: Seq[UnsupportedExpr] + ): ExecInfo = { + ExecInfo( + node, + sqlID, + node.name, + "", + speedupFactor, + duration, + node.id, + isSupported, + None, + unsupportedExprs = notSupportedExprs + ) + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GlobalLimitExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GlobalLimitExecParser.scala deleted file mode 100644 index 931f7375b..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GlobalLimitExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class GlobalLimitExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // GlobalLimit doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/InMemoryTableScanExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/InMemoryTableScanExecParser.scala deleted file mode 100644 index d011ee46e..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/InMemoryTableScanExecParser.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class InMemoryTableScanExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - val duration = None - val (filterSpeedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", filterSpeedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/LocalLimitExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/LocalLimitExecParser.scala deleted file mode 100644 index 6be27aa22..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/LocalLimitExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class LocalLimitExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // LocalLimit doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/MapInPandasExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/MapInPandasExecParser.scala deleted file mode 100644 index f9d57e313..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/MapInPandasExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class MapInPandasExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // MapInPandasExec doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ObjectHashAggregateExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ObjectHashAggregateExecParser.scala index 8632b2d45..eae0564e3 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ObjectHashAggregateExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ObjectHashAggregateExecParser.scala @@ -23,29 +23,13 @@ import org.apache.spark.sql.execution.ui.SparkPlanGraphNode import org.apache.spark.sql.rapids.tool.AppBase case class ObjectHashAggregateExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long, - app: AppBase) extends ExecParser with Logging { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // TODO - Its partial duration only. We need a way to specify it as partial. + override val node: SparkPlanGraphNode, + override val checker: PluginTypeChecker, + override val sqlID: Long, + appParam: AppBase) extends + GenericExecParser(node, checker, sqlID) with Logging { + override def computeDuration: Option[Long] = { val accumId = node.metrics.find(_.name == "time in aggregation build").map(_.accumulatorId) - val maxDuration = SQLPlanParser.getTotalDuration(accumId, app) - val exprString = node.desc.replaceFirst("ObjectHashAggregate", "") - val expressions = SQLPlanParser.parseAggregateExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, - maxDuration, node.id, isSupported, None, unsupportedExprs = notSupportedExprs) + SQLPlanParser.getTotalDuration(accumId, appParam) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ProjectExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ProjectExecParser.scala deleted file mode 100644 index 838c1c6d2..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ProjectExecParser.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class ProjectExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // Project doesn't have duration - val duration = None - val exprString = node.desc.replaceFirst("Project ", "") - val expressions = SQLPlanParser.parseProjectExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None, - unsupportedExprs = notSupportedExprs) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/PythonMapInArrowExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/PythonMapInArrowExecParser.scala deleted file mode 100644 index 08f8eedef..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/PythonMapInArrowExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class PythonMapInArrowExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName: String = node.name + "Exec" - - override def parse: ExecInfo = { - // PythonMapInArrow doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/RangeExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/RangeExecParser.scala deleted file mode 100644 index 32b3ae4fb..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/RangeExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class RangeExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // range doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala index 47d42d102..a8e481259 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala @@ -462,91 +462,49 @@ object SQLPlanParser extends Logging { checker: PluginTypeChecker, app: AppBase): ExecInfo = { val normalizedNodeName = node.name.stripSuffix("$") - normalizedNodeName match { - case "AggregateInPandas" => - AggregateInPandasExecParser(node, checker, sqlID).parse - case "ArrowEvalPython" => - ArrowEvalPythonExecParser(node, checker, sqlID).parse - case "BatchScan" => - BatchScanExecParser(node, checker, sqlID, app).parse - case "BroadcastExchange" => - BroadcastExchangeExecParser(node, checker, sqlID, app).parse - case "BroadcastHashJoin" => - BroadcastHashJoinExecParser(node, checker, sqlID).parse - case "BroadcastNestedLoopJoin" => - BroadcastNestedLoopJoinExecParser(node, checker, sqlID).parse - case "CartesianProduct" => - CartesianProductExecParser(node, checker, sqlID).parse - case "Coalesce" => - CoalesceExecParser(node, checker, sqlID).parse - case "CollectLimit" => - CollectLimitExecParser(node, checker, sqlID).parse - case "CustomShuffleReader" | "AQEShuffleRead" => - CustomShuffleReaderExecParser(node, checker, sqlID).parse - case "Exchange" => - ShuffleExchangeExecParser(node, checker, sqlID, app).parse - case "Expand" => - ExpandExecParser(node, checker, sqlID).parse - case "Filter" => - FilterExecParser(node, checker, sqlID).parse - case "FlatMapGroupsInPandas" => - FlatMapGroupsInPandasExecParser(node, checker, sqlID).parse - case "Generate" => - GenerateExecParser(node, checker, sqlID).parse - case "GlobalLimit" => - GlobalLimitExecParser(node, checker, sqlID).parse - case "HashAggregate" => - HashAggregateExecParser(node, checker, sqlID, app).parse - case "LocalLimit" => - LocalLimitExecParser(node, checker, sqlID).parse - case "InMemoryTableScan" => - InMemoryTableScanExecParser(node, checker, sqlID).parse - case i if DataWritingCommandExecParser.isWritingCmdExec(i) => - DataWritingCommandExecParser.parseNode(node, checker, sqlID) - case "MapInPandas" => - MapInPandasExecParser(node, checker, sqlID).parse - case "ObjectHashAggregate" => - ObjectHashAggregateExecParser(node, checker, sqlID, app).parse - case "Project" => - ProjectExecParser(node, checker, sqlID).parse - case "PythonMapInArrow" | "MapInArrow" => - PythonMapInArrowExecParser(node, checker, sqlID).parse - case "Range" => - RangeExecParser(node, checker, sqlID).parse - case "Sample" => - SampleExecParser(node, checker, sqlID).parse - case "ShuffledHashJoin" => - ShuffledHashJoinExecParser(node, checker, sqlID, app).parse - case "Sort" => - SortExecParser(node, checker, sqlID).parse - case s if ReadParser.isScanNode(s) => - FileSourceScanExecParser(node, checker, sqlID, app).parse - case "SortAggregate" => - SortAggregateExecParser(node, checker, sqlID).parse - case smj if SortMergeJoinExecParser.accepts(smj) => - SortMergeJoinExecParser(node, checker, sqlID).parse - case "SubqueryBroadcast" => - SubqueryBroadcastExecParser(node, checker, sqlID, app).parse - case sqe if SubqueryExecParser.accepts(sqe) => - SubqueryExecParser.parseNode(node, checker, sqlID, app) - case "TakeOrderedAndProject" => - TakeOrderedAndProjectExecParser(node, checker, sqlID).parse - case "Union" => - UnionExecParser(node, checker, sqlID).parse - case "Window" => - WindowExecParser(node, checker, sqlID).parse - case "WindowInPandas" => - WindowInPandasExecParser(node, checker, sqlID).parse - case "WindowGroupLimit" => - WindowGroupLimitParser(node, checker, sqlID).parse - case wfe if WriteFilesExecParser.accepts(wfe) => - WriteFilesExecParser(node, checker, sqlID).parse - case _ => - // Execs that are members of reuseExecs (i.e., ReusedExchange) should be marked as - // supported but with shouldRemove flag set to True. - // Setting the "shouldRemove" is handled at the end of the function. - ExecInfo(node, sqlID, normalizedNodeName, expr = "", 1, duration = None, node.id, - isSupported = reuseExecs.contains(normalizedNodeName), None) + val parserConfig = ExecParserLoader.getConfig(normalizedNodeName) + parserConfig match { + case Some(_) => + new GenericExecParser(node, checker, sqlID).parse + case None => + normalizedNodeName match { + case "BatchScan" => + BatchScanExecParser(node, checker, sqlID, app).parse + case "BroadcastExchange" => + BroadcastExchangeExecParser(node, checker, sqlID, app).parse + case "BroadcastHashJoin" => + BroadcastHashJoinExecParser(node, checker, sqlID).parse + case "BroadcastNestedLoopJoin" => + BroadcastNestedLoopJoinExecParser(node, checker, sqlID).parse + case "CustomShuffleReader" | "AQEShuffleRead" => + CustomShuffleReaderExecParser(node, checker, sqlID).parse + case "Exchange" => + ShuffleExchangeExecParser(node, checker, sqlID, app).parse + case "HashAggregate" => + HashAggregateExecParser(node, checker, sqlID, app).parse + case i if DataWritingCommandExecParser.isWritingCmdExec(i) => + DataWritingCommandExecParser.parseNode(node, checker, sqlID) + case "ShuffledHashJoin" => + ShuffledHashJoinExecParser(node, checker, sqlID, app).parse + case s if ReadParser.isScanNode(s) => + FileSourceScanExecParser(node, checker, sqlID, app).parse + case smj if SortMergeJoinExecParser.accepts(smj) => + SortMergeJoinExecParser(node, checker, sqlID).parse + case "SubqueryBroadcast" => + SubqueryBroadcastExecParser(node, checker, sqlID, app).parse + case sqe if SubqueryExecParser.accepts(sqe) => + SubqueryExecParser.parseNode(node, checker, sqlID, app) + case "WindowGroupLimit" => + WindowGroupLimitParser(node, checker, sqlID).parse + case wfe if WriteFilesExecParser.accepts(wfe) => + WriteFilesExecParser(node, checker, sqlID).parse + case _ => + // Execs that are members of reuseExecs (i.e., ReusedExchange) should be marked as + // supported but with shouldRemove flag set to True. + // Setting the "shouldRemove" is handled at the end of the function. + ExecInfo(node, sqlID, normalizedNodeName, expr = "", 1, duration = None, node.id, + isSupported = reuseExecs.contains(normalizedNodeName), None) + } } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SampleExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SampleExecParser.scala deleted file mode 100644 index 44f37b495..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SampleExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class SampleExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // sample doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortAggregateExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortAggregateExecParser.scala deleted file mode 100644 index cc56b4c88..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortAggregateExecParser.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class SortAggregateExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // SortAggregate doesn't have duration - val duration = None - val exprString = node.desc.replaceFirst("SortAggregate", "") - val expressions = SQLPlanParser.parseAggregateExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None, - unsupportedExprs = notSupportedExprs) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortExecParser.scala deleted file mode 100644 index c89ca319e..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortExecParser.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class SortExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // Sort doesn't have duration - val duration = None - val exprString = node.desc.replaceFirst("Sort ", "") - val expressions = SQLPlanParser.parseSortExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None, - unsupportedExprs = notSupportedExprs) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/TakeOrderedAndProjectExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/TakeOrderedAndProjectExecParser.scala deleted file mode 100644 index 19f64d210..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/TakeOrderedAndProjectExecParser.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class TakeOrderedAndProjectExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // TakeOrderedAndProject doesn't have duration - val duration = None - val exprString = node.desc.replaceFirst("TakeOrderedAndProject ", "") - val expressions = SQLPlanParser.parseTakeOrderedExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, - duration, node.id, isSupported, None, unsupportedExprs = notSupportedExprs) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/UnionExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/UnionExecParser.scala deleted file mode 100644 index 126d0f2b5..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/UnionExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class UnionExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // Union doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowExecParser.scala deleted file mode 100644 index a48b51c82..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowExecParser.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class WindowExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // Window doesn't have duration - val duration = None - val exprString = node.desc.replaceFirst("Window ", "") - val expressions = SQLPlanParser.parseWindowExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None, - unsupportedExprs = notSupportedExprs) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowInPandasExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowInPandasExecParser.scala deleted file mode 100644 index 0b92986d6..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowInPandasExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class WindowInPandasExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // WindowInPandasExec doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -}