Skip to content

Commit

Permalink
Refactor Exec Parsers - unify the code calling similar methods
Browse files Browse the repository at this point in the history
This PR is to remove redundant code in many ExecParsers. We have
several ExecParsers with the same functionality in parse code.
In this PR, following changes are done:

1. If the Execs don't have expressions, then execName is assigned and
ExecInfo object is created from GenericExecParser.
2. If the Execs have expressions then reflection is used to assign the
appropriate parser to be used.
3. If there is any additional calculation in the parse function, then
only that part is overriden in the ExecParser and the rest of the
function uses the GenereicExecParser base code
4.Removed all the files which are not needed anymore

Signed-off-by: Niranjan Artal <[email protected]>
  • Loading branch information
nartal1 committed Oct 29, 2024
1 parent 8223ee1 commit 760f1d1
Show file tree
Hide file tree
Showing 29 changed files with 378 additions and 1,106 deletions.
117 changes: 117 additions & 0 deletions core/src/main/resources/execParserMappings/execParser.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
{
"parsers": [
{
"execName": "AggregateInPandas",
"parseExpressions": false
},
{
"execName": "ArrowEvalPython",
"parseExpressions": false
},
{
"execName": "CartesianProduct",
"parseExpressions": false
},
{
"execName": "Coalesce",
"parseExpressions": false
},
{
"execName": "CollectLimit",
"parseExpressions": false
},
{
"execName": "CustomShuffleReader",
"parseExpressions": false
},
{
"execName": "FlatMapGroupsInPandas",
"parseExpressions": false
},
{
"execName": "InMemoryTableScan",
"parseExpressions": false
},
{
"execName": "LocalLimit",
"parseExpressions": false
},
{
"execName": "GlobalLimit",
"parseExpressions": false
},
{
"execName": "MapInArrow",
"parseExpressions": false
},
{
"execName": "MapInPandas",
"parseExpressions": false
},
{
"execName": "PythonMapInArrow",
"parseExpressions": false
},
{
"execName": "Range",
"parseExpressions": false
},
{
"execName": "Sample",
"parseExpressions": false
},
{
"execName": "Union",
"parseExpressions": false
},
{
"execName": "WindowInPandas",
"parseExpressions": false
},
{
"execName": "Expand",
"parseExpressions": true,
"expressionParserMethod": "parseExpandExpressions"
},
{
"execName": "Filter",
"parseExpressions": true,
"expressionParserMethod": "parseFilterExpressions"
},
{
"execName": "Generate",
"parseExpressions": true,
"expressionParserMethod": "parseGenerateExpressions"
},
{
"execName": "ObjectHashAggregate",
"parseExpressions": true,
"expressionParserMethod": "parseAggregateExpressions"
},
{
"execName": "Project",
"parseExpressions": true,
"expressionParserMethod": "parseProjectExpressions"
},
{
"execName": "SortAggregate",
"parseExpressions": true,
"expressionParserMethod": "parseAggregateExpressions"
},
{
"execName": "Sort",
"parseExpressions": true,
"expressionParserMethod": "parseSortExpressions"
},
{
"execName": "TakeOrderedAndProject",
"parseExpressions": true,
"expressionParserMethod": "parseTakeOrderedExpressions"
},
{
"execName": "Window",
"parseExpressions": true,
"expressionParserMethod": "parseWindowExpressions"
}
]
}

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,11 @@ import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker
import org.apache.spark.sql.execution.ui.SparkPlanGraphNode

case class CustomShuffleReaderExecParser(
node: SparkPlanGraphNode,
checker: PluginTypeChecker,
sqlID: Long) extends ExecParser {
override val node: SparkPlanGraphNode,
override val checker: PluginTypeChecker,
override val sqlID: Long) extends GenericExecParser(node, checker,sqlID) {

// note this is called either AQEShuffleRead and CustomShuffleReader depending
// on the Spark version, our supported ops list it as CustomShuffleReader
val fullExecName = "CustomShuffleReaderExec"

override def parse: ExecInfo = {
// doesn't have duration
val duration = None
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1.0, false)
}
ExecInfo(node, sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
}
override val fullExecName = "CustomShuffleReaderExec"
}
Loading

0 comments on commit 760f1d1

Please sign in to comment.