Skip to content

Commit

Permalink
Refactoring the speedup factor generation to support WholeStageCodege…
Browse files Browse the repository at this point in the history
…n parsing and environment defaults (NVIDIA#493)

* Refactoring the code to support WholeStageCodegen parsing for CPU and adding environment default

Signed-off-by: Matt Ahrens <[email protected]>

* Adding checks before creating speedup factor entries

Signed-off-by: Matt Ahrens <[email protected]>

---------

Signed-off-by: Matt Ahrens <[email protected]>
  • Loading branch information
mattahrens authored Aug 17, 2023
1 parent 7471194 commit 5ef39b8
Show file tree
Hide file tree
Showing 4 changed files with 344 additions and 33 deletions.
8 changes: 4 additions & 4 deletions user_tools/custom_speedup_factors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ spark_rapids_user_tools onprem profiling --csv --eventlogs CPU-3k --local_folder
spark_rapids_user_tools onprem profiling --csv --eventlogs GPU-3k --local_folder GPU-3k-profile
```
3. Speedup factor generation
1. Run the speedup factor generation script, passing the CPU and GPU profiler output.
1. Run the speedup factor generation script, passing the CPU and GPU profiler output along with a CSV output filename.
```
python generate_speedup_factors.py --cpu CPU-3k-profile/rapids_4_spark_profile --gpu GPU-3k-profile/rapids_4_spark_profile
python generate_speedup_factors.py --cpu CPU-3k-profile/rapids_4_spark_profile --gpu GPU-3k-profile/rapids_4_spark_profile --output newScores.csv
```

The output will showcase what operators were detected in the benchmarks to be used as custom speedups. You can then update values from the default [operatorsScore.csv](https://github.com/NVIDIA/spark-rapids-tools/blob/dev/core/src/main/resources/operatorsScore.csv) file to create your own version with the custom speedup factors generated by the output.
The script will generate the new scores in the output specified by the `--output` argument.

## Running Workload Qualification with Custom Speedup Factors

Now that you have a custom *operatorsScore.csv* file, you can run the Spark RAPIDS qualification tool using it to get estimations applicable for your environment. Here is the command to run with a custom speedup factor file:
```
spark_rapids_user_tools onprem qualification --speedup-factor-file operatorsScore.csv --eventlogs <CPU-event-logs>
spark_rapids_user_tools onprem qualification --speedup-factor-file newScores.csv --eventlogs <CPU-event-logs>
```
18 changes: 18 additions & 0 deletions user_tools/custom_speedup_factors/defaultScores.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CPUOperator,Score
AggregateInPandasExec,1.2
ArrowEvalPythonExec,1.2
FlatMapGroupsInPandasExec,1.2
MapInPandasExec,1.2
WindowInPandasExec,1.2
KMeans-pyspark,8.86
KMeans-scala,1
PCA-pyspark,2.24
PCA-scala,2.69
LinearRegression-pyspark,2
LinearRegression-scala,1
RandomForestClassifier-pyspark,6.31
RandomForestClassifier-scala,1
RandomForestRegressor-pyspark,3.66
RandomForestRegressor-scala,1
XGBoost-pyspark,1
XGBoost-scala,3.31
104 changes: 75 additions & 29 deletions user_tools/custom_speedup_factors/generate_speedup_factors.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
parser = argparse.ArgumentParser(description="Speedup Factor Analysis")
parser.add_argument("--cpu", type=str, help="Directory of CPU profiler logs", required=True)
parser.add_argument("--gpu", type=str, help="Directory of GPU profiler logs", required=True)
parser.add_argument("--output", type=str, help="Filename for custom speedup factors", required=True)
parser.add_argument("--verbose", action="store_true", help="flag to generate full verbose output for logging raw node results")
parser.add_argument("--chdir", action="store_true", help="flag to change to work dir that's the script located")
args = parser.parse_args()

cpu_dir = args.cpu
gpu_dir = args.gpu
output = args.output
verbose = args.verbose

cpu_stage_log = {}
Expand All @@ -55,13 +57,26 @@
mapping_info = mapping_info.groupby(['SQL Node'])['Child Node'].apply(','.join).reset_index()

# - process sql_plan_metrics_for_application.csv
# - load in "duration" (CPU) or "op time" (GPU)
# - load in "duration" (CPU)
# - replace WholeStageCodegen (CPU only) with list of operators from mapping lookup file
# - mapping_info.parent = sql_times.nodeName
cpu_sql_info = pd.read_csv(cpu_dir + "/" + app + "/sql_plan_metrics_for_application.csv")
cpu_sql_times = cpu_sql_info[cpu_sql_info["name"] == "duration"]
cpu_sql_combined = cpu_sql_times.set_index('nodeName').join(mapping_info.set_index('SQL Node'), how='left')

# - parse WholeStageCodegen durations with child node mapping
cpu_sql_times_df = cpu_sql_combined[['Child Node', 'max_value']]

for index, row in cpu_sql_times_df.iterrows():
operators = str(row['Child Node']).split(',')
duration = row['max_value']/len(operators)/1000.0
for operator in operators:
if operator in cpu_stage_log[app_name]:
cpu_stage_log[app_name][operator] = cpu_stage_log[app_name][operator] + duration
else:
cpu_stage_log[app_name][operator] = duration

# - parse top-level execs from sql_to_stage_information.csv
cpu_stage_info = pd.read_csv(cpu_dir + "/" + app + "/sql_to_stage_information.csv")
cpu_stage_times = cpu_stage_info[['Stage Duration', 'SQL Nodes(IDs)']]

Expand Down Expand Up @@ -92,12 +107,8 @@
app_name = app_info.loc[0]["appName"]
gpu_stage_log[app_name] = {}

# - process sql_plan_metrics_for_application.csv
# - load in "duration" (CPU) or "op time" (GPU)
# - mapping_info.parent = sql_times.nodeName
gpu_sql_info = pd.read_csv(gpu_dir + "/" + app + "/sql_plan_metrics_for_application.csv")
gpu_sql_times = gpu_sql_info[gpu_sql_info["name"] == "op time"]

# - process sql_to_stage_information.csv to get stage durations
# - split up duration by operators listed in each stage
gpu_stage_info = pd.read_csv(gpu_dir + "/" + app + "/sql_to_stage_information.csv")
gpu_stage_times = gpu_stage_info[['Stage Duration', 'SQL Nodes(IDs)']]

Expand All @@ -111,41 +122,76 @@
else:
gpu_stage_log[app_name][op_key] = duration

# Sum up SQL operators for each
stage_totals = {}
cpu_stage_totals = {}
gpu_stage_totals = {}
cpu_stage_total = 0.0
gpu_stage_total = 0.0

# Sum up SQL operators for each operator found in CPU and GPU
for app_key in cpu_stage_log:
for op_key in cpu_stage_log[app_key]:
if op_key not in stage_totals:
stage_totals[op_key] = cpu_stage_log[app_key][op_key]
if op_key not in cpu_stage_totals:
cpu_stage_totals[op_key] = cpu_stage_log[app_key][op_key]
else:
stage_totals[op_key] = stage_totals[op_key] + cpu_stage_log[app_key][op_key]
cpu_stage_totals[op_key] = cpu_stage_totals[op_key] + cpu_stage_log[app_key][op_key]
cpu_stage_total = cpu_stage_total + cpu_stage_log[app_key][op_key]


for app_key in gpu_stage_log:
for op_key in gpu_stage_log[app_key]:
if op_key not in stage_totals:
stage_totals[op_key] = gpu_stage_log[app_key][op_key]
if op_key not in gpu_stage_totals:
gpu_stage_totals[op_key] = gpu_stage_log[app_key][op_key]
else:
stage_totals[op_key] = stage_totals[op_key] + gpu_stage_log[app_key][op_key]
gpu_stage_totals[op_key] = gpu_stage_totals[op_key] + gpu_stage_log[app_key][op_key]
gpu_stage_total = gpu_stage_total + gpu_stage_log[app_key][op_key]

# Create dictionary of execs where speedup factors can be calculated
scores_dict = {}

if 'Filter' in cpu_stage_totals and 'GpuFilter' in gpu_stage_totals:
scores_dict["FilterExec"] = str(round(cpu_stage_totals['Filter'] / gpu_stage_totals['GpuFilter'], 2))
if 'SortMergeJoin' in cpu_stage_totals and 'GpuShuffledHashJoin' in gpu_stage_totals:
scores_dict["SortExec"] = str(round(cpu_stage_totals['SortMergeJoin'] / gpu_stage_totals['GpuShuffledHashJoin'], 2))
if 'BroadcastHashJoin' in cpu_stage_totals and 'GpuBroadcastHashJoin' in gpu_stage_totals:
scores_dict["BroadcastHashJoinExec"] = str(round(cpu_stage_totals['BroadcastHashJoin'] / gpu_stage_totals['GpuBroadcastHashJoin'], 2))
if 'Exchange' in cpu_stage_totals and 'GpuColumnarExchange' in gpu_stage_totals:
scores_dict["ShuffleExchangeExec"] = str(round(cpu_stage_totals['Exchange'] / gpu_stage_totals['GpuColumnarExchange'], 2))
if 'HashAggregate' in cpu_stage_totals and 'GpuHashAggregate' in gpu_stage_totals:
scores_dict["HashAggregateExec"] = str(round(cpu_stage_totals['HashAggregate'] / gpu_stage_totals['GpuHashAggregate'], 2))
if all(cpu_keys in cpu_stage_totals for cpu_keys in ('SortMergeJoin', 'Sort' )) and all(gpu_keys in gpu_stage_totals for gpu_keys in ('GpuShuffledHashJoin', 'GpuSort')):
scores_dict["SortMergeJoinExec"] = str(round((cpu_stage_totals['SortMergeJoin'] + cpu_stage_totals['Sort']) / (gpu_stage_totals['GpuShuffledHashJoin'] + gpu_stage_totals['GpuSort']), 2))

overall_speedup = str(round(cpu_stage_total/gpu_stage_total, 2))

# Print out node metrics (if verbose)
if verbose:
print("# Operator metrics ")
for key in stage_totals:
print(key + "," + str(stage_totals[key]))
print("CPU Total," + str(cpu_stage_total))
print("GPU Total," + str(gpu_stage_total))

# Print out speedup factors
print("# Speedup Factors ")
print("FilterExec," + str(round(stage_totals['Filter'] / stage_totals['GpuFilter'], 2)))
print("SortExec," + str(round(stage_totals['SortMergeJoin'] / stage_totals['GpuShuffledHashJoin'], 2)))
print("BroadcastHashJoinExec," + str(round(stage_totals['BroadcastHashJoin'] / stage_totals['GpuBroadcastHashJoin'], 2)))
print("ShuffleExchangeExec," + str(round(stage_totals['Exchange'] / stage_totals['GpuColumnarExchange'], 2)))
print("HashAggregateExec," + str(round(stage_totals['HashAggregate'] / stage_totals['GpuHashAggregate'], 2)))
print("SortMergeJoinExec," + str(round((stage_totals['SortMergeJoin']+stage_totals['Sort']) / (stage_totals['GpuShuffledHashJoin']+stage_totals['GpuSort']), 2)))
print("# CPU Operator Metrics")
for key in cpu_stage_totals:
print(key + " = " + str(cpu_stage_totals[key]))
print("# GPU Operator Metrics")
for key in gpu_stage_totals:
print(key + " = " + str(gpu_stage_totals[key]))
print("# Summary Metrics")
print("CPU Total = " + str(cpu_stage_total))
print("GPU Total = " + str(gpu_stage_total))
print("Overall speedup = " + overall_speedup)

# Print out individual exec speedup factors
print("# Speedup Factors ")
for key in scores_dict:
print(f"{key} = {scores_dict[key]}")

# Load in list of operators and set initial values to default speedup
scores_df = pd.read_csv("operatorsList.csv")
scores_df["Score"] = overall_speedup

# Update operators that are found in benchmark
for key in scores_dict:
scores_df.loc[scores_df['CPUOperator'] == key, 'Score'] = scores_dict[key]

# Add in hard-coded defaults
defaults_df = pd.read_csv("defaultScores.csv")

# Generate output CSV file
final_df = pd.concat([scores_df, defaults_df])
final_df.to_csv(output, index=False)
Loading

0 comments on commit 5ef39b8

Please sign in to comment.