From 88e9c57c35b41dbf1ab04e02ede59cd5ffd4165d Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Wed, 30 Oct 2024 22:58:20 -0700 Subject: [PATCH] Remove reflection instead use case classes to match and call respective Parsers Signed-off-by: Niranjan Artal --- .../execParserMappings/execParser.json | 117 ---------------- .../tool/planparser/ExecParserLoader.scala | 84 ------------ .../tool/planparser/GenericExecParser.scala | 52 +++---- .../tool/planparser/SQLPlanParser.scala | 128 ++++++++++++------ 4 files changed, 103 insertions(+), 278 deletions(-) delete mode 100644 core/src/main/resources/execParserMappings/execParser.json delete mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExecParserLoader.scala diff --git a/core/src/main/resources/execParserMappings/execParser.json b/core/src/main/resources/execParserMappings/execParser.json deleted file mode 100644 index dab562cab..000000000 --- a/core/src/main/resources/execParserMappings/execParser.json +++ /dev/null @@ -1,117 +0,0 @@ -{ - "parsers": [ - { - "execName": "AggregateInPandas", - "parseExpressions": false - }, - { - "execName": "ArrowEvalPython", - "parseExpressions": false - }, - { - "execName": "CartesianProduct", - "parseExpressions": false - }, - { - "execName": "Coalesce", - "parseExpressions": false - }, - { - "execName": "CollectLimit", - "parseExpressions": false - }, - { - "execName": "CustomShuffleReader", - "parseExpressions": false - }, - { - "execName": "FlatMapGroupsInPandas", - "parseExpressions": false - }, - { - "execName": "InMemoryTableScan", - "parseExpressions": false - }, - { - "execName": "LocalLimit", - "parseExpressions": false - }, - { - "execName": "GlobalLimit", - "parseExpressions": false - }, - { - "execName": "MapInArrow", - "parseExpressions": false - }, - { - "execName": "MapInPandas", - "parseExpressions": false - }, - { - "execName": "PythonMapInArrow", - "parseExpressions": false - }, - { - "execName": "Range", - "parseExpressions": false - }, - { - "execName": "Sample", - "parseExpressions": false - }, - { - "execName": "Union", - "parseExpressions": false - }, - { - "execName": "WindowInPandas", - "parseExpressions": false - }, - { - "execName": "Expand", - "parseExpressions": true, - "expressionParserMethod": "parseExpandExpressions" - }, - { - "execName": "Filter", - "parseExpressions": true, - "expressionParserMethod": "parseFilterExpressions" - }, - { - "execName": "Generate", - "parseExpressions": true, - "expressionParserMethod": "parseGenerateExpressions" - }, - { - "execName": "ObjectHashAggregate", - "parseExpressions": true, - "expressionParserMethod": "parseAggregateExpressions" - }, - { - "execName": "Project", - "parseExpressions": true, - "expressionParserMethod": "parseProjectExpressions" - }, - { - "execName": "SortAggregate", - "parseExpressions": true, - "expressionParserMethod": "parseAggregateExpressions" - }, - { - "execName": "Sort", - "parseExpressions": true, - "expressionParserMethod": "parseSortExpressions" - }, - { - "execName": "TakeOrderedAndProject", - "parseExpressions": true, - "expressionParserMethod": "parseTakeOrderedExpressions" - }, - { - "execName": "Window", - "parseExpressions": true, - "expressionParserMethod": "parseWindowExpressions" - } - ] -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExecParserLoader.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExecParserLoader.scala deleted file mode 100644 index 223c23ba8..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExecParserLoader.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright (c) 2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import java.nio.file.Paths - -import scala.collection.mutable -import scala.reflect.runtime.universe._ - -import org.json4s._ -import org.json4s.jackson.JsonMethods - -import org.apache.spark.sql.rapids.tool.util.UTF8Source - -object ReflectionCache { - private val methodCache = mutable.Map[String, MethodSymbol]() - - def getMethodSymbol(classType: Type, methodName: String): MethodSymbol = { - methodCache.getOrElseUpdate(methodName, { - classType.decl(TermName(methodName)).asMethod - }) - } -} - -object ExecParserLoader { - case class ParserConfig( - execName: String, - parseExpressions: Boolean, - expressionParserMethod: Option[MethodSymbol], - durationMetrics: Option[List[String]] - ) - - val EXEC_PARSER_DIRECTORY= "execParserMappings" - val DEFAULT_EXEC_PARSER_FILE = "execParser.json" - - implicit val formats: Formats = DefaultFormats - - lazy val mirror = runtimeMirror(getClass.getClassLoader) - lazy val sqlPlanParserModule = mirror.staticModule( - "com.nvidia.spark.rapids.tool.planparser.SQLPlanParser") - lazy val sqlPlanParserType = sqlPlanParserModule.moduleClass.asType.toType - - val mappingFile = Paths.get(EXEC_PARSER_DIRECTORY, DEFAULT_EXEC_PARSER_FILE).toString - val jsonString = UTF8Source.fromResource(mappingFile).mkString - val json = JsonMethods.parse(jsonString) - val parsersList = (json \ "parsers").extract[List[Map[String, Any]]] - - lazy val parsersConfig: Map[String, ParserConfig] = { - parsersList.map { parserMap => - val execName = parserMap("execName").asInstanceOf[String] - val parseExpressions = parserMap.get( - "parseExpressions").map(_.asInstanceOf[Boolean]).getOrElse(false) - val expressionParserMethodName = parserMap.get( - "expressionParserMethod").map(_.asInstanceOf[String]) - val expressionParserMethod = expressionParserMethodName.map { methodName => - ReflectionCache.getMethodSymbol(sqlPlanParserType, methodName) - } - val durationMetrics = parserMap.get("durationMetrics").map(_.asInstanceOf[List[String]]) - - execName -> ParserConfig( - execName, - parseExpressions, - expressionParserMethod, - durationMetrics - ) - }.toMap - } - - def getConfig(execName: String): Option[ParserConfig] = parsersConfig.get(execName) -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala index 43dd7ae65..5fb59630e 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala @@ -16,8 +16,6 @@ package com.nvidia.spark.rapids.tool.planparser -import scala.reflect.runtime.universe._ - import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker import org.apache.spark.sql.execution.ui.SparkPlanGraphNode @@ -27,14 +25,11 @@ class GenericExecParser( val node: SparkPlanGraphNode, val checker: PluginTypeChecker, val sqlID: Long, + val expressionFunction: Option[String => Array[String]] = None, val app: Option[AppBase] = None ) extends ExecParser { val fullExecName: String = node.name + "Exec" - val parserConfig: Option[ExecParserLoader.ParserConfig] = - ExecParserLoader.getConfig(node.name) - // Cache to store reflected methods to avoid redundant reflection calls - private val methodCache = scala.collection.mutable.Map[String, MethodMirror]() override def parse: ExecInfo = { val duration = computeDuration @@ -54,35 +49,12 @@ class GenericExecParser( } protected def parseExpressions(): Array[String] = { - parserConfig match { - case Some(config) if config.parseExpressions => + expressionFunction match { + case Some(func) => val exprString = getExprString - val methodSymbol = config.expressionParserMethod.getOrElse { - throw new IllegalArgumentException( - s"Expression parser method not specified for ${node.name}") - } - invokeCachedParserMethod(methodSymbol, exprString) - - case _ => Array.empty[String] // Default behavior when parsing is not required - } - } - - // Helper method to invoke the parser method with caching - private def invokeCachedParserMethod( - methodSymbol: MethodSymbol, exprString: String): Array[String] = { - // This is to check if the method is already cached, else reflect and cache it - val cachedMethod = methodCache.getOrElseUpdate(methodSymbol.fullName, { - val mirror = runtimeMirror(getClass.getClassLoader) - val module = mirror.reflectModule(mirror.staticModule( - "com.nvidia.spark.rapids.tool.planparser.SQLPlanParser")) - val instanceMirror = mirror.reflect(module.instance) - instanceMirror.reflectMethod(methodSymbol) // Cache this reflected method - }) - - cachedMethod(exprString) match { - case expressions: Array[String] => expressions - case _ => throw new IllegalArgumentException( - s"Unexpected return type from method: ${methodSymbol.name}") + func(exprString) + case None => + Array.empty[String] } } @@ -121,3 +93,15 @@ class GenericExecParser( ) } } + +object GenericExecParser { + def apply( + node: SparkPlanGraphNode, + checker: PluginTypeChecker, + sqlID: Long, + expressionFunction: Option[String => Array[String]] = None, + app: Option[AppBase] = None + ): GenericExecParser = { + new GenericExecParser(node, checker, sqlID, expressionFunction, app) + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala index a8e481259..fbd7c182e 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala @@ -462,49 +462,91 @@ object SQLPlanParser extends Logging { checker: PluginTypeChecker, app: AppBase): ExecInfo = { val normalizedNodeName = node.name.stripSuffix("$") - val parserConfig = ExecParserLoader.getConfig(normalizedNodeName) - parserConfig match { - case Some(_) => - new GenericExecParser(node, checker, sqlID).parse - case None => - normalizedNodeName match { - case "BatchScan" => - BatchScanExecParser(node, checker, sqlID, app).parse - case "BroadcastExchange" => - BroadcastExchangeExecParser(node, checker, sqlID, app).parse - case "BroadcastHashJoin" => - BroadcastHashJoinExecParser(node, checker, sqlID).parse - case "BroadcastNestedLoopJoin" => - BroadcastNestedLoopJoinExecParser(node, checker, sqlID).parse - case "CustomShuffleReader" | "AQEShuffleRead" => - CustomShuffleReaderExecParser(node, checker, sqlID).parse - case "Exchange" => - ShuffleExchangeExecParser(node, checker, sqlID, app).parse - case "HashAggregate" => - HashAggregateExecParser(node, checker, sqlID, app).parse - case i if DataWritingCommandExecParser.isWritingCmdExec(i) => - DataWritingCommandExecParser.parseNode(node, checker, sqlID) - case "ShuffledHashJoin" => - ShuffledHashJoinExecParser(node, checker, sqlID, app).parse - case s if ReadParser.isScanNode(s) => - FileSourceScanExecParser(node, checker, sqlID, app).parse - case smj if SortMergeJoinExecParser.accepts(smj) => - SortMergeJoinExecParser(node, checker, sqlID).parse - case "SubqueryBroadcast" => - SubqueryBroadcastExecParser(node, checker, sqlID, app).parse - case sqe if SubqueryExecParser.accepts(sqe) => - SubqueryExecParser.parseNode(node, checker, sqlID, app) - case "WindowGroupLimit" => - WindowGroupLimitParser(node, checker, sqlID).parse - case wfe if WriteFilesExecParser.accepts(wfe) => - WriteFilesExecParser(node, checker, sqlID).parse - case _ => - // Execs that are members of reuseExecs (i.e., ReusedExchange) should be marked as - // supported but with shouldRemove flag set to True. - // Setting the "shouldRemove" is handled at the end of the function. - ExecInfo(node, sqlID, normalizedNodeName, expr = "", 1, duration = None, node.id, - isSupported = reuseExecs.contains(normalizedNodeName), None) - } + normalizedNodeName match { + case "AggregateInPandas" => + GenericExecParser(node, checker, sqlID).parse + case "ArrowEvalPython" => + GenericExecParser(node, checker, sqlID).parse + case "BatchScan" => + BatchScanExecParser(node, checker, sqlID, app).parse + case "BroadcastExchange" => + BroadcastExchangeExecParser(node, checker, sqlID, app).parse + case "BroadcastHashJoin" => + BroadcastHashJoinExecParser(node, checker, sqlID).parse + case "BroadcastNestedLoopJoin" => + BroadcastNestedLoopJoinExecParser(node, checker, sqlID).parse + case "CartesianProduct" => + GenericExecParser(node, checker, sqlID).parse + case "Coalesce" => + GenericExecParser(node, checker, sqlID).parse + case "CollectLimit" => + GenericExecParser(node, checker, sqlID).parse + case "CustomShuffleReader" | "AQEShuffleRead" => + CustomShuffleReaderExecParser(node, checker, sqlID).parse + case "Exchange" => + ShuffleExchangeExecParser(node, checker, sqlID, app).parse + case "Expand" => + GenericExecParser(node, checker, sqlID, Some(parseExpandExpressions)).parse + case "Filter" => + GenericExecParser(node, checker, sqlID, Some(parseFilterExpressions)).parse + case "FlatMapGroupsInPandas" => + GenericExecParser(node, checker, sqlID).parse + case "Generate" => + GenericExecParser(node, checker, sqlID, Some(parseGenerateExpressions)).parse + case "GlobalLimit" => + GenericExecParser(node, checker, sqlID).parse + case "HashAggregate" => + HashAggregateExecParser(node, checker, sqlID, app).parse + case "LocalLimit" => + GenericExecParser(node, checker, sqlID).parse + case "InMemoryTableScan" => + GenericExecParser(node, checker, sqlID).parse + case i if DataWritingCommandExecParser.isWritingCmdExec(i) => + DataWritingCommandExecParser.parseNode(node, checker, sqlID) + case "MapInPandas" => + GenericExecParser(node, checker, sqlID).parse + case "ObjectHashAggregate" => + ObjectHashAggregateExecParser(node, checker, sqlID, app).parse + case "Project" => + GenericExecParser(node, checker, sqlID, Some(parseProjectExpressions)).parse + case "PythonMapInArrow" | "MapInArrow" => + GenericExecParser(node, checker, sqlID).parse + case "Range" => + GenericExecParser(node, checker, sqlID).parse + case "Sample" => + GenericExecParser(node, checker, sqlID).parse + case "ShuffledHashJoin" => + ShuffledHashJoinExecParser(node, checker, sqlID, app).parse + case "Sort" => + GenericExecParser(node, checker, sqlID, Some(parseSortExpressions)).parse + case s if ReadParser.isScanNode(s) => + FileSourceScanExecParser(node, checker, sqlID, app).parse + case "SortAggregate" => + GenericExecParser(node, checker, sqlID, Some(parseAggregateExpressions)).parse + case smj if SortMergeJoinExecParser.accepts(smj) => + SortMergeJoinExecParser(node, checker, sqlID).parse + case "SubqueryBroadcast" => + SubqueryBroadcastExecParser(node, checker, sqlID, app).parse + case sqe if SubqueryExecParser.accepts(sqe) => + SubqueryExecParser.parseNode(node, checker, sqlID, app) + case "TakeOrderedAndProject" => + GenericExecParser(node, checker, sqlID, Some(parseTakeOrderedExpressions)).parse + case "Union" => + GenericExecParser(node, checker, sqlID).parse + case "Window" => + GenericExecParser(node, checker, sqlID, Some(parseWindowExpressions)).parse + case "WindowInPandas" => + GenericExecParser(node, checker, sqlID).parse + case "WindowGroupLimit" => + WindowGroupLimitParser(node, checker, sqlID).parse + case wfe if WriteFilesExecParser.accepts(wfe) => + WriteFilesExecParser(node, checker, sqlID).parse + case _ => + // Execs that are members of reuseExecs (i.e., ReusedExchange) should be marked as + // supported but with shouldRemove flag set to True. + // Setting the "shouldRemove" is handled at the end of the function. + ExecInfo(node, sqlID, normalizedNodeName, expr = "", 1, duration = None, node.id, + isSupported = reuseExecs.contains(normalizedNodeName), None) } }