diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/AggregateInPandasExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/AggregateInPandasExecParser.scala deleted file mode 100644 index 83a2f29f4..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/AggregateInPandasExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class AggregateInPandasExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // AggregateInPandasExec doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ArrowEvalPythonExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ArrowEvalPythonExecParser.scala deleted file mode 100644 index 185c8d515..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ArrowEvalPythonExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class ArrowEvalPythonExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // ArrowEvalPythonExec doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CartesianProductExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CartesianProductExecParser.scala deleted file mode 100644 index 728600132..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CartesianProductExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class CartesianProductExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // CartesianProduct doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CoalesceExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CoalesceExecParser.scala deleted file mode 100644 index c57c7a87b..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CoalesceExecParser.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class CoalesceExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // coalesce doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, - duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CollectLimitExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CollectLimitExecParser.scala deleted file mode 100644 index ce74630fb..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CollectLimitExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class CollectLimitExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // collect doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CustomShuffleReaderExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CustomShuffleReaderExecParser.scala deleted file mode 100644 index 44f5443ff..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CustomShuffleReaderExecParser.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class CustomShuffleReaderExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - // note this is called either AQEShuffleRead and CustomShuffleReader depending - // on the Spark version, our supported ops list it as CustomShuffleReader - val fullExecName = "CustomShuffleReaderExec" - - override def parse: ExecInfo = { - // doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExpandExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExpandExecParser.scala deleted file mode 100644 index 78f860dfa..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExpandExecParser.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class ExpandExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // Expand doesn't have duration - val duration = None - val exprString = node.desc.replaceFirst("Expand ", "") - val expressions = SQLPlanParser.parseExpandExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None, - unsupportedExprs = notSupportedExprs) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FilterExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FilterExecParser.scala deleted file mode 100644 index 7b30342ec..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FilterExecParser.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class FilterExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // filter doesn't have duration - val duration = None - val exprString = node.desc.replaceFirst("Filter ", "") - val expressions = SQLPlanParser.parseFilterExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None, - unsupportedExprs = notSupportedExprs) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FlatMapGroupsInPandasExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FlatMapGroupsInPandasExecParser.scala deleted file mode 100644 index ab607e863..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FlatMapGroupsInPandasExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class FlatMapGroupsInPandasExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // FlatMapCoGroupsInPandasExec doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenerateExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenerateExecParser.scala deleted file mode 100644 index 6a52dae47..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenerateExecParser.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class GenerateExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // Generate doesn't have duration - val duration = None - val exprString = node.desc.replaceFirst("Generate ", "") - val expressions = SQLPlanParser.parseGenerateExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, - duration, node.id, isSupported, None, unsupportedExprs = notSupportedExprs) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala new file mode 100644 index 000000000..cd67502c5 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.planparser + +import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker + +import org.apache.spark.sql.execution.ui.SparkPlanGraphNode +import org.apache.spark.sql.rapids.tool.{AppBase, UnsupportedExpr} + +class GenericExecParser( + val node: SparkPlanGraphNode, + val checker: PluginTypeChecker, + val sqlID: Long, + val execName: Option[String] = None, + val expressionFunction: Option[String => Array[String]] = None, + val app: Option[AppBase] = None +) extends ExecParser { + + val fullExecName: String = execName.getOrElse(node.name + "Exec") + + override def parse: ExecInfo = { + val duration = computeDuration + val expressions = parseExpressions() + val notSupportedExprs = getNotSupportedExprs(expressions) + val isExecSupported = checker.isExecSupported(fullExecName) && + notSupportedExprs.isEmpty + + val (speedupFactor, isSupported) = if (isExecSupported) { + (checker.getSpeedupFactor(fullExecName), true) + } else { + (1.0, false) + } + + createExecInfo(speedupFactor, isSupported, duration, notSupportedExprs) + } + + protected def parseExpressions(): Array[String] = { + expressionFunction match { + case Some(func) => + val exprString = getExprString + func(exprString) + case None => + Array.empty[String] + } + } + + protected def getExprString: String = { + node.desc.replaceFirst(s"${node.name} ", "") + } + + protected def getNotSupportedExprs(expressions: Array[String]): Seq[UnsupportedExpr] = { + checker.getNotSupportedExprs(expressions) + } + + protected def getDurationMetricIds: Seq[Long] = { + Seq.empty + } + + protected def computeDuration: Option[Long] = { + // Sum the durations for all metrics returned by getDurationMetricIds + val durations = getDurationMetricIds.flatMap { metricId => + app.flatMap(appInstance => SQLPlanParser.getTotalDuration(Some(metricId), appInstance)) + } + durations.reduceOption(_ + _) + } + + protected def createExecInfo( + speedupFactor: Double, + isSupported: Boolean, + duration: Option[Long], + notSupportedExprs: Seq[UnsupportedExpr] + ): ExecInfo = { + ExecInfo( + node, + sqlID, + node.name, + "", + speedupFactor, + duration, + node.id, + isSupported, + None, + unsupportedExprs = notSupportedExprs + ) + } +} + +object GenericExecParser { + def apply( + node: SparkPlanGraphNode, + checker: PluginTypeChecker, + sqlID: Long, + execName: Option[String] = None, + expressionFunction: Option[String => Array[String]] = None, + app: Option[AppBase] = None + ): GenericExecParser = { + val fullExecName = execName.getOrElse(node.name + "Exec") + new GenericExecParser(node, checker, sqlID, Some(fullExecName), expressionFunction, app) + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GlobalLimitExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GlobalLimitExecParser.scala deleted file mode 100644 index 931f7375b..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GlobalLimitExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class GlobalLimitExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // GlobalLimit doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/HashAggregateExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/HashAggregateExecParser.scala index 3676c81a7..9dc5e0d1c 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/HashAggregateExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/HashAggregateExecParser.scala @@ -23,30 +23,13 @@ import org.apache.spark.sql.execution.ui.SparkPlanGraphNode import org.apache.spark.sql.rapids.tool.AppBase case class HashAggregateExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long, - app: AppBase) extends ExecParser with Logging { + override val node: SparkPlanGraphNode, + override val checker: PluginTypeChecker, + override val sqlID: Long, + appBase: AppBase) extends + GenericExecParser(node, checker, sqlID, app = Some(appBase)) with Logging { - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // TODO - Its partial duration only. We need a way to specify it as partial. - val accumId = node.metrics.find( - _.name == "time in aggregation build total").map(_.accumulatorId) - val maxDuration = SQLPlanParser.getTotalDuration(accumId, app) - val exprString = node.desc.replaceFirst("HashAggregate", "") - val expressions = SQLPlanParser.parseAggregateExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, maxDuration, node.id, isSupported, None, - unsupportedExprs = notSupportedExprs) + override def getDurationMetricIds: Seq[Long] = { + node.metrics.find(_.name == "time in aggregation build").map(_.accumulatorId).toSeq } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/InMemoryTableScanExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/InMemoryTableScanExecParser.scala deleted file mode 100644 index d011ee46e..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/InMemoryTableScanExecParser.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class InMemoryTableScanExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - val duration = None - val (filterSpeedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", filterSpeedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/LocalLimitExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/LocalLimitExecParser.scala deleted file mode 100644 index 6be27aa22..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/LocalLimitExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class LocalLimitExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // LocalLimit doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/MapInPandasExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/MapInPandasExecParser.scala deleted file mode 100644 index f9d57e313..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/MapInPandasExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class MapInPandasExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // MapInPandasExec doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ObjectHashAggregateExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ObjectHashAggregateExecParser.scala index 8632b2d45..c66b60f2f 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ObjectHashAggregateExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ObjectHashAggregateExecParser.scala @@ -23,29 +23,13 @@ import org.apache.spark.sql.execution.ui.SparkPlanGraphNode import org.apache.spark.sql.rapids.tool.AppBase case class ObjectHashAggregateExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long, - app: AppBase) extends ExecParser with Logging { + override val node: SparkPlanGraphNode, + override val checker: PluginTypeChecker, + override val sqlID: Long, + appBase: AppBase) extends + GenericExecParser(node, checker, sqlID, app = Some(appBase)) with Logging { - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // TODO - Its partial duration only. We need a way to specify it as partial. - val accumId = node.metrics.find(_.name == "time in aggregation build").map(_.accumulatorId) - val maxDuration = SQLPlanParser.getTotalDuration(accumId, app) - val exprString = node.desc.replaceFirst("ObjectHashAggregate", "") - val expressions = SQLPlanParser.parseAggregateExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, - maxDuration, node.id, isSupported, None, unsupportedExprs = notSupportedExprs) + override def getDurationMetricIds: Seq[Long] = { + node.metrics.find(_.name == "time in aggregation build").map(_.accumulatorId).toSeq } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ProjectExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ProjectExecParser.scala deleted file mode 100644 index 838c1c6d2..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ProjectExecParser.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class ProjectExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // Project doesn't have duration - val duration = None - val exprString = node.desc.replaceFirst("Project ", "") - val expressions = SQLPlanParser.parseProjectExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None, - unsupportedExprs = notSupportedExprs) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/PythonMapInArrowExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/PythonMapInArrowExecParser.scala deleted file mode 100644 index 08f8eedef..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/PythonMapInArrowExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class PythonMapInArrowExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName: String = node.name + "Exec" - - override def parse: ExecInfo = { - // PythonMapInArrow doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/RangeExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/RangeExecParser.scala deleted file mode 100644 index 32b3ae4fb..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/RangeExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class RangeExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // range doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala index 47d42d102..c4b31f78d 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala @@ -26,6 +26,7 @@ import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.SparkPlanInfo +//import org.apache.spark.sql.execution.joins.CartesianProductExec import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphNode} import org.apache.spark.sql.rapids.tool.{AppBase, BuildSide, ExecHelper, JoinType, RDDCheckHelper, ToolUtils, UnsupportedExpr} import org.apache.spark.sql.rapids.tool.util.ToolsPlanGraph @@ -463,10 +464,13 @@ object SQLPlanParser extends Logging { app: AppBase): ExecInfo = { val normalizedNodeName = node.name.stripSuffix("$") normalizedNodeName match { - case "AggregateInPandas" => - AggregateInPandasExecParser(node, checker, sqlID).parse - case "ArrowEvalPython" => - ArrowEvalPythonExecParser(node, checker, sqlID).parse + // Generalize all the execs that call GenericExecParser in one case + case "AggregateInPandas" | "ArrowEvalPython" | "AQEShuffleRead" | "CartesianProduct" + | "Coalesce" | "CollectLimit" | "CustomShuffleReader" | "FlatMapGroupsInPandas" + | "GlobalLimit" | "LocalLimit" | "InMemoryTableScan" | "MapInPandas" + | "PythonMapInArrow" | "MapInArrow" | "Range" | "Sample" | "Union" + | "WindowInPandas" => + GenericExecParser(node, checker, sqlID, app = Some(app)).parse case "BatchScan" => BatchScanExecParser(node, checker, sqlID, app).parse case "BroadcastExchange" => @@ -475,54 +479,36 @@ object SQLPlanParser extends Logging { BroadcastHashJoinExecParser(node, checker, sqlID).parse case "BroadcastNestedLoopJoin" => BroadcastNestedLoopJoinExecParser(node, checker, sqlID).parse - case "CartesianProduct" => - CartesianProductExecParser(node, checker, sqlID).parse - case "Coalesce" => - CoalesceExecParser(node, checker, sqlID).parse - case "CollectLimit" => - CollectLimitExecParser(node, checker, sqlID).parse - case "CustomShuffleReader" | "AQEShuffleRead" => - CustomShuffleReaderExecParser(node, checker, sqlID).parse case "Exchange" => ShuffleExchangeExecParser(node, checker, sqlID, app).parse case "Expand" => - ExpandExecParser(node, checker, sqlID).parse + GenericExecParser( + node, checker, sqlID, expressionFunction = Some(parseExpandExpressions)).parse case "Filter" => - FilterExecParser(node, checker, sqlID).parse - case "FlatMapGroupsInPandas" => - FlatMapGroupsInPandasExecParser(node, checker, sqlID).parse + GenericExecParser( + node, checker, sqlID, expressionFunction = Some(parseFilterExpressions)).parse case "Generate" => - GenerateExecParser(node, checker, sqlID).parse - case "GlobalLimit" => - GlobalLimitExecParser(node, checker, sqlID).parse + GenericExecParser( + node, checker, sqlID, expressionFunction = Some(parseGenerateExpressions)).parse case "HashAggregate" => HashAggregateExecParser(node, checker, sqlID, app).parse - case "LocalLimit" => - LocalLimitExecParser(node, checker, sqlID).parse - case "InMemoryTableScan" => - InMemoryTableScanExecParser(node, checker, sqlID).parse case i if DataWritingCommandExecParser.isWritingCmdExec(i) => DataWritingCommandExecParser.parseNode(node, checker, sqlID) - case "MapInPandas" => - MapInPandasExecParser(node, checker, sqlID).parse case "ObjectHashAggregate" => ObjectHashAggregateExecParser(node, checker, sqlID, app).parse case "Project" => - ProjectExecParser(node, checker, sqlID).parse - case "PythonMapInArrow" | "MapInArrow" => - PythonMapInArrowExecParser(node, checker, sqlID).parse - case "Range" => - RangeExecParser(node, checker, sqlID).parse - case "Sample" => - SampleExecParser(node, checker, sqlID).parse + GenericExecParser( + node, checker, sqlID, expressionFunction = Some(parseProjectExpressions)).parse case "ShuffledHashJoin" => ShuffledHashJoinExecParser(node, checker, sqlID, app).parse case "Sort" => - SortExecParser(node, checker, sqlID).parse + GenericExecParser( + node, checker, sqlID, expressionFunction = Some(parseSortExpressions)).parse case s if ReadParser.isScanNode(s) => FileSourceScanExecParser(node, checker, sqlID, app).parse case "SortAggregate" => - SortAggregateExecParser(node, checker, sqlID).parse + GenericExecParser( + node, checker, sqlID, expressionFunction = Some(parseAggregateExpressions)).parse case smj if SortMergeJoinExecParser.accepts(smj) => SortMergeJoinExecParser(node, checker, sqlID).parse case "SubqueryBroadcast" => @@ -530,13 +516,11 @@ object SQLPlanParser extends Logging { case sqe if SubqueryExecParser.accepts(sqe) => SubqueryExecParser.parseNode(node, checker, sqlID, app) case "TakeOrderedAndProject" => - TakeOrderedAndProjectExecParser(node, checker, sqlID).parse - case "Union" => - UnionExecParser(node, checker, sqlID).parse + GenericExecParser( + node, checker, sqlID, expressionFunction = Some(parseTakeOrderedExpressions)).parse case "Window" => - WindowExecParser(node, checker, sqlID).parse - case "WindowInPandas" => - WindowInPandasExecParser(node, checker, sqlID).parse + GenericExecParser( + node, checker, sqlID, expressionFunction = Some(parseWindowExpressions)).parse case "WindowGroupLimit" => WindowGroupLimitParser(node, checker, sqlID).parse case wfe if WriteFilesExecParser.accepts(wfe) => diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SampleExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SampleExecParser.scala deleted file mode 100644 index 44f37b495..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SampleExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class SampleExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // sample doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortAggregateExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortAggregateExecParser.scala deleted file mode 100644 index cc56b4c88..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortAggregateExecParser.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class SortAggregateExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // SortAggregate doesn't have duration - val duration = None - val exprString = node.desc.replaceFirst("SortAggregate", "") - val expressions = SQLPlanParser.parseAggregateExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None, - unsupportedExprs = notSupportedExprs) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortExecParser.scala deleted file mode 100644 index c89ca319e..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortExecParser.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class SortExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // Sort doesn't have duration - val duration = None - val exprString = node.desc.replaceFirst("Sort ", "") - val expressions = SQLPlanParser.parseSortExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None, - unsupportedExprs = notSupportedExprs) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/TakeOrderedAndProjectExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/TakeOrderedAndProjectExecParser.scala deleted file mode 100644 index 19f64d210..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/TakeOrderedAndProjectExecParser.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class TakeOrderedAndProjectExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // TakeOrderedAndProject doesn't have duration - val duration = None - val exprString = node.desc.replaceFirst("TakeOrderedAndProject ", "") - val expressions = SQLPlanParser.parseTakeOrderedExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, - duration, node.id, isSupported, None, unsupportedExprs = notSupportedExprs) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/UnionExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/UnionExecParser.scala deleted file mode 100644 index 126d0f2b5..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/UnionExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class UnionExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // Union doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowExecParser.scala deleted file mode 100644 index a48b51c82..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowExecParser.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class WindowExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // Window doesn't have duration - val duration = None - val exprString = node.desc.replaceFirst("Window ", "") - val expressions = SQLPlanParser.parseWindowExpressions(exprString) - val notSupportedExprs = checker.getNotSupportedExprs(expressions) - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName) && - notSupportedExprs.isEmpty) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None, - unsupportedExprs = notSupportedExprs) - } -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowInPandasExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowInPandasExecParser.scala deleted file mode 100644 index 0b92986d6..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowInPandasExecParser.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.planparser - -import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker - -import org.apache.spark.sql.execution.ui.SparkPlanGraphNode - -case class WindowInPandasExecParser( - node: SparkPlanGraphNode, - checker: PluginTypeChecker, - sqlID: Long) extends ExecParser { - - val fullExecName = node.name + "Exec" - - override def parse: ExecInfo = { - // WindowInPandasExec doesn't have duration - val duration = None - val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { - (checker.getSpeedupFactor(fullExecName), true) - } else { - (1.0, false) - } - // TODO - add in parsing expressions - average speedup across? - ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) - } -} diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala index 7c3948f9a..f9c4c86c4 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala @@ -254,7 +254,7 @@ class SQLPlanParserSuite extends BasePlanParserSuite { assert(wholeStages.forall(_.duration.nonEmpty)) val allChildren = wholeStages.flatMap(_.children).flatten val hashAggregate = allChildren.filter(_.exec == "HashAggregate") - assertSizeAndSupported(2, hashAggregate) + assertSizeAndSupported(2, hashAggregate, checkDurations = false) } } } @@ -1243,7 +1243,7 @@ class SQLPlanParserSuite extends BasePlanParserSuite { } val execInfo = getAllExecsFromPlan(parsedPlans.toSeq) val hashAggregate = execInfo.filter(_.exec == "HashAggregate") - assertSizeAndSupported(2, hashAggregate) + assertSizeAndSupported(2, hashAggregate, checkDurations = false) } } @@ -1799,7 +1799,7 @@ class SQLPlanParserSuite extends BasePlanParserSuite { // if the min_by and max_by were not recognized, the test would fail val hashAggExecs = getAllExecsFromPlan(parsedPlans.toSeq).filter(_.exec.equals("HashAggregate")) - assertSizeAndSupported(2, hashAggExecs) + assertSizeAndSupported(2, hashAggExecs, checkDurations = false) } }