From fc2c878b5ee5c94fb653a0f51254b1ac07fdbda3 Mon Sep 17 00:00:00 2001 From: "Ahmed Hussein (amahussein)" Date: Fri, 15 Nov 2024 11:56:22 -0600 Subject: [PATCH] Remove custom-speedup module from user-tools Signed-off-by: Ahmed Hussein (amahussein) Contributes to #1221 This PR remove the python scripts that used to generate custom speedup factors. The scripts are dead since the tools migrated to XGBoost estimates. --- user_tools/custom_speedup_factors/README.md | 56 ---- .../custom_speedup_factors/defaultScores.csv | 19 -- .../generate_speedup_factors.py | 225 ---------------- .../custom_speedup_factors/operatorsList.csv | 254 ------------------ .../validate_qualification_estimates.py | 156 ----------- 5 files changed, 710 deletions(-) delete mode 100644 user_tools/custom_speedup_factors/README.md delete mode 100644 user_tools/custom_speedup_factors/defaultScores.csv delete mode 100644 user_tools/custom_speedup_factors/generate_speedup_factors.py delete mode 100644 user_tools/custom_speedup_factors/operatorsList.csv delete mode 100644 user_tools/custom_speedup_factors/validate_qualification_estimates.py diff --git a/user_tools/custom_speedup_factors/README.md b/user_tools/custom_speedup_factors/README.md deleted file mode 100644 index 2bcdc9466..000000000 --- a/user_tools/custom_speedup_factors/README.md +++ /dev/null @@ -1,56 +0,0 @@ -# Custom Speedup Factors for Workload Qualification - -## Purpose - -Speedup factor estimation for the qualification tool is used for determining the estimated runtime for an application on GPU using Spark RAPIDS. The speedup factors generate multipliers at an exec (and potentially expression) level that are used in the qualification tool for GPU estimates. - -## Prerequisites - -1. python >= 3.8 -2. Necessary Python dependencies: `pandas`, `argparse`, `spark-rapids-user-tools` - -## Generating Custom Speedup Factors - -The high-level process to generate speedup factors for an environment is as follows: - -1. Event log generation - 1. Run the NDS benchmark on CPU cluster along with any other representative jobs and save event log(s). Follow steps documented in the [NDS README](https://github.com/NVIDIA/spark-rapids-benchmarks/blob/dev/nds/README.md) for running the Power Run. - 2. Run the NDS benchmark on GPU cluster along with any other representative jobs and save event log(s). Follow steps documented in the [NDS README](https://github.com/NVIDIA/spark-rapids-benchmarks/blob/dev/nds/README.md) for running the Power Run. - 3. Note that the benchmark data size (referred to as scale factor) should match the representative data size for your workloads. If your workloads are 1TB in size, then you should use SF1000. If your workloads are 500GB in size, then you should use SF500. -2. Job profiler analysis - 1. Run the Spark RAPIDS profiling tool against the CPU and GPU event log to get stage-level duration metrics. -``` -spark_rapids_user_tools onprem profiling --csv --eventlogs CPU-3k --local_folder CPU-3k-profile - -spark_rapids_user_tools onprem profiling --csv --eventlogs GPU-3k --local_folder GPU-3k-profile -``` -3. Speedup factor generation - 1. Run the speedup factor generation script, passing the CPU and GPU profiler output along with a CSV output filename. -``` -python generate_speedup_factors.py --cpu CPU-3k-profile/rapids_4_spark_profile --gpu GPU-3k-profile/rapids_4_spark_profile --output newScores.csv -``` - -The script will generate the new scores in the output specified by the `--output` argument. - -## Running Workload Qualification with Custom Speedup Factors - -Now that you have a custom *operatorsScore.csv* file, you can run the Spark RAPIDS qualification tool using it to get estimations applicable for your environment. Here is the command to run with a custom speedup factor file: -``` -spark_rapids_user_tools onprem qualification --speedup-factor-file newScores.csv --eventlogs -``` - -## Validating Custom Speedup Factors - -There is a utility script in the directory to allow for validation of custom speedup factors given CPU and GPU event logs for a corresponding job or set of jobs. By default, the script will generate custom speedup factors, run the qualification tool with the generated custom speed up factors, and then generate validation metrics for the estimations against the actuals. - -Example execution of the script: -``` -python validate_qualification_estimates.py --cpu_log CPU-nds-eventlog --gpu_log GPU-nds-eventlog --output test-speedup -``` - -The script also allows you to pass in a custom speedup factor file if you have previously generated them. Example: -``` -python validate_qualification_estimates.py --cpu_log CPU-nds-eventlog --gpu_log GPU-nds-eventlog --output test-speedup --speedups test-scores.csv -``` - -Other options include passing in the CPU and/or GPU profiler output if that has already been done via the `cpu_profile` and `gpu_profile` arguments. Additionally, you can pass in a custom tools jar via `--jar` if that is needed. diff --git a/user_tools/custom_speedup_factors/defaultScores.csv b/user_tools/custom_speedup_factors/defaultScores.csv deleted file mode 100644 index c57bc5aa9..000000000 --- a/user_tools/custom_speedup_factors/defaultScores.csv +++ /dev/null @@ -1,19 +0,0 @@ -CPUOperator,Score -AggregateInPandasExec,1.2 -ArrowEvalPythonExec,1.2 -FlatMapGroupsInPandasExec,1.2 -FlatMapCoGroupsInPandasExec,1.2 -MapInPandasExec,1.2 -WindowInPandasExec,1.2 -KMeans-pyspark,8.86 -KMeans-scala,1 -PCA-pyspark,2.24 -PCA-scala,2.69 -LinearRegression-pyspark,2 -LinearRegression-scala,1 -RandomForestClassifier-pyspark,6.31 -RandomForestClassifier-scala,1 -RandomForestRegressor-pyspark,3.66 -RandomForestRegressor-scala,1 -XGBoost-pyspark,1 -XGBoost-scala,3.31 diff --git a/user_tools/custom_speedup_factors/generate_speedup_factors.py b/user_tools/custom_speedup_factors/generate_speedup_factors.py deleted file mode 100644 index 7d09f142c..000000000 --- a/user_tools/custom_speedup_factors/generate_speedup_factors.py +++ /dev/null @@ -1,225 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) 2023-2024, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Spark RAPIDS speedup factor generation script""" - -import argparse -import os - -import pandas as pd - - -def list_directories(root_name): - for root, dirs, files in os.walk(root_name): - return dirs - return [] - - -parser = argparse.ArgumentParser(description="Speedup Factor Analysis") -parser.add_argument("--cpu", type=str, help="Directory of CPU profiler logs", required=True) -parser.add_argument("--gpu", type=str, help="Directory of GPU profiler logs", required=True) -parser.add_argument("--output", type=str, help="Filename for custom speedup factors", required=True) -parser.add_argument("--verbose", action="store_true", help="flag to generate full verbose output for logging raw node results") -parser.add_argument("--chdir", action="store_true", help="flag to change to work dir that's the script located") -args = parser.parse_args() - -cpu_dir = args.cpu -gpu_dir = args.gpu -output = args.output -verbose = args.verbose - -cpu_stage_log = {} -gpu_stage_log = {} -cpu_duration = 0.0 -gpu_duration = 0.0 - -min_speedup = 1.0 - -if args.chdir: - # Change to work dir that's the script located - os.chdir(os.path.dirname(__file__)) - -# CPU log parsing -for app in list_directories(cpu_dir): - - # - figure out query from application_info.csv - app_info = pd.read_csv(cpu_dir + "/" + app + "/application_information.csv") - app_name = app_info.loc[0]["appName"] - cpu_duration = cpu_duration + app_info.loc[0]["duration"] - cpu_stage_log[app_name] = {} - - # - load wholestagecodegen_mapping.csv into a dictionary for lookups (CPU only) - mapping_info = pd.read_csv(cpu_dir + "/" + app + "/wholestagecodegen_mapping.csv") - mapping_info = mapping_info.groupby(['SQL Node'])['Child Node'].apply(','.join).reset_index() - - # - process sql_plan_metrics_for_application.csv - # - load in "duration" (CPU) - # - replace WholeStageCodegen (CPU only) with list of operators from mapping lookup file - # - mapping_info.parent = sql_times.nodeName - cpu_sql_info = pd.read_csv(cpu_dir + "/" + app + "/sql_plan_metrics_for_application.csv") - cpu_sql_times = cpu_sql_info[cpu_sql_info["name"] == "duration"] - cpu_sql_combined = cpu_sql_times.set_index('nodeName').join(mapping_info.set_index('SQL Node'), how='left') - - # - parse WholeStageCodegen durations with child node mapping - cpu_sql_times_df = cpu_sql_combined[['Child Node', 'total']] - - for index, row in cpu_sql_times_df.iterrows(): - operators = str(row['Child Node']).split(',') - duration = row['total']/len(operators)/1000.0 - for operator in operators: - if operator in cpu_stage_log[app_name]: - cpu_stage_log[app_name][operator] = cpu_stage_log[app_name][operator] + duration - else: - cpu_stage_log[app_name][operator] = duration - - # - parse top-level execs from sql_to_stage_information.csv - cpu_stage_info = pd.read_csv(cpu_dir + "/" + app + "/sql_to_stage_information.csv") - cpu_stage_times = cpu_stage_info[['Stage Duration', 'SQL Nodes(IDs)']] - cpu_stage_times_df = cpu_stage_times.dropna() - - for index, row in cpu_stage_times_df.iterrows(): - node_list = str(row['SQL Nodes(IDs)']) - operators = node_list.split(',') - duration = row['Stage Duration']/(len(operators)-node_list.count("WholeStageCodegen")) - - for operator in operators: - if "WholeStageCodegen" in operator: - continue - - op_key = operator.split('(')[0] - if op_key in cpu_stage_log[app_name]: - cpu_stage_log[app_name][op_key] = cpu_stage_log[app_name][op_key] + duration - else: - cpu_stage_log[app_name][op_key] = duration - -# GPU log parsing -for app in list_directories(gpu_dir): - - # - figure out query from application_info.csv - app_info = pd.read_csv(gpu_dir + "/" + app + "/application_information.csv") - app_name = app_info.loc[0]["appName"] - gpu_duration = gpu_duration + app_info.loc[0]["duration"] - gpu_stage_log[app_name] = {} - - # - process sql_to_stage_information.csv to get stage durations - # - split up duration by operators listed in each stage - gpu_stage_info = pd.read_csv(gpu_dir + "/" + app + "/sql_to_stage_information.csv") - gpu_stage_times = gpu_stage_info[['Stage Duration', 'SQL Nodes(IDs)']] - - for index, row in gpu_stage_times.iterrows(): - operators = str(row['SQL Nodes(IDs)']).split(',') - duration = row['Stage Duration']/len(operators) - for operator in operators: - op_key = operator.split('(')[0] - if op_key in gpu_stage_log[app_name]: - gpu_stage_log[app_name][op_key] = gpu_stage_log[app_name][op_key] + duration - else: - gpu_stage_log[app_name][op_key] = duration - -cpu_stage_totals = {} -gpu_stage_totals = {} -cpu_stage_total = 0.0 -gpu_stage_total = 0.0 - -# Sum up SQL operators for each operator found in CPU and GPU -for app_key in cpu_stage_log: - for op_key in cpu_stage_log[app_key]: - if op_key not in cpu_stage_totals: - cpu_stage_totals[op_key] = cpu_stage_log[app_key][op_key] - else: - cpu_stage_totals[op_key] = cpu_stage_totals[op_key] + cpu_stage_log[app_key][op_key] - cpu_stage_total = cpu_stage_total + cpu_stage_log[app_key][op_key] - -for app_key in gpu_stage_log: - for op_key in gpu_stage_log[app_key]: - if op_key not in gpu_stage_totals: - gpu_stage_totals[op_key] = gpu_stage_log[app_key][op_key] - else: - gpu_stage_totals[op_key] = gpu_stage_totals[op_key] + gpu_stage_log[app_key][op_key] - gpu_stage_total = gpu_stage_total + gpu_stage_log[app_key][op_key] - -# Create dictionary of execs where speedup factors can be calculated -scores_dict = {} - -# Scan operators -if 'Scan parquet ' in cpu_stage_totals and 'GpuScan parquet ' in gpu_stage_totals: - scores_dict["BatchScanExec"] = str(round(cpu_stage_totals['Scan parquet '] / gpu_stage_totals['GpuScan parquet '], 2)) - scores_dict["FileSourceScanExec"] = str(round(cpu_stage_totals['Scan parquet '] / gpu_stage_totals['GpuScan parquet '], 2)) -if 'Scan orc ' in cpu_stage_totals and 'GpuScan orc ' in gpu_stage_totals: - scores_dict["BatchScanExec"] = str(round(cpu_stage_totals['Scan orc '] / gpu_stage_totals['GpuScan orc '], 2)) - scores_dict["FileSourceScanExec"] = str(round(cpu_stage_totals['Scan orc '] / gpu_stage_totals['GpuScan orc '], 2)) - -# Other operators -if 'Expand' in cpu_stage_totals and 'GpuExpand' in gpu_stage_totals: - scores_dict["ExpandExec"] = str(round(cpu_stage_totals['Expand'] / gpu_stage_totals['GpuExpand'], 2)) -if 'CartesianProduct' in cpu_stage_totals and 'GpuCartesianProduct' in gpu_stage_totals: - scores_dict["CartesianProductExec"] = str(round(cpu_stage_totals['CartesianProduct'] / gpu_stage_totals['GpuCartesianProduct'], 2)) -if 'Filter' in cpu_stage_totals and 'GpuFilter' in gpu_stage_totals: - scores_dict["FilterExec"] = str(round(cpu_stage_totals['Filter'] / gpu_stage_totals['GpuFilter'], 2)) -if 'SortMergeJoin' in cpu_stage_totals and 'GpuShuffledHashJoin' in gpu_stage_totals: - scores_dict["SortMergeJoinExec"] = str(round(cpu_stage_totals['SortMergeJoin'] / gpu_stage_totals['GpuShuffledHashJoin'], 2)) -if 'BroadcastHashJoin' in cpu_stage_totals and 'GpuBroadcastHashJoin' in gpu_stage_totals: - scores_dict["BroadcastHashJoinExec"] = str(round(cpu_stage_totals['BroadcastHashJoin'] / gpu_stage_totals['GpuBroadcastHashJoin'], 2)) -if 'Exchange' in cpu_stage_totals and 'GpuColumnarExchange' in gpu_stage_totals: - scores_dict["ShuffleExchangeExec"] = str(round(cpu_stage_totals['Exchange'] / gpu_stage_totals['GpuColumnarExchange'], 2)) -if 'HashAggregate' in cpu_stage_totals and 'GpuHashAggregate' in gpu_stage_totals: - scores_dict["HashAggregateExec"] = str(round(cpu_stage_totals['HashAggregate'] / gpu_stage_totals['GpuHashAggregate'], 2)) - scores_dict["ObjectHashAggregateExec"] = str(round(cpu_stage_totals['HashAggregate'] / gpu_stage_totals['GpuHashAggregate'], 2)) - scores_dict["SortAggregateExec"] = str(round(cpu_stage_totals['HashAggregate'] / gpu_stage_totals['GpuHashAggregate'], 2)) -if 'TakeOrderedAndProject' in cpu_stage_totals and 'GpuTopN' in gpu_stage_totals: - scores_dict["TakeOrderedAndProjectExec"] = str(round(cpu_stage_totals['TakeOrderedAndProject'] / gpu_stage_totals['GpuTopN'], 2)) -if 'BroadcastNestedLoopJoin' in cpu_stage_totals and 'GpuBroadcastNestedLoopJoin' in gpu_stage_totals: - scores_dict["BroadcastNestedLoopJoinExec"] = str(round(cpu_stage_totals['BroadcastNestedLoopJoin'] / gpu_stage_totals['GpuBroadcastNestedLoopJoin'], 2)) - -# Set minimum to 1.0 for speedup factors -for key in scores_dict: - if float(scores_dict[key]) < min_speedup: - scores_dict[key] = f"{min_speedup}" - -# Set overall speedup for default value for execs not in logs -overall_speedup = str(max(min_speedup, round(cpu_duration/gpu_duration, 2))) - -# Print out node metrics (if verbose) -if verbose: - print("# CPU Operator Metrics") - for key in cpu_stage_totals: - print(key + " = " + str(cpu_stage_totals[key])) - print("# GPU Operator Metrics") - for key in gpu_stage_totals: - print(key + " = " + str(gpu_stage_totals[key])) - print("# Summary Metrics") - print("CPU Total = " + str(cpu_stage_total)) - print("GPU Total = " + str(gpu_stage_total)) - print("Overall speedup = " + overall_speedup) - - # Print out individual exec speedup factors - print("# Speedup Factors ") - for key in scores_dict: - print(f"{key} = {scores_dict[key]}") - -# Load in list of operators and set initial values to default speedup -scores_df = pd.read_csv("operatorsList.csv") -scores_df["Score"] = overall_speedup - -# Update operators that are found in benchmark -for key in scores_dict: - scores_df.loc[scores_df['CPUOperator'] == key, 'Score'] = scores_dict[key] - -# Add in hard-coded defaults -defaults_df = pd.read_csv("defaultScores.csv") - -# Generate output CSV file -final_df = pd.concat([scores_df, defaults_df]) -final_df.to_csv(output, index=False) diff --git a/user_tools/custom_speedup_factors/operatorsList.csv b/user_tools/custom_speedup_factors/operatorsList.csv deleted file mode 100644 index 986b88ed7..000000000 --- a/user_tools/custom_speedup_factors/operatorsList.csv +++ /dev/null @@ -1,254 +0,0 @@ -CPUOperator -CoalesceExec -CollectLimitExec -ExpandExec -FileSourceScanExec -FilterExec -GenerateExec -GlobalLimitExec -LocalLimitExec -ProjectExec -RangeExec -SampleExec -SortExec -TakeOrderedAndProjectExec -HashAggregateExec -ObjectHashAggregateExec -SortAggregateExec -DataWritingCommandExec -ExecutedCommandExec -BatchScanExec -ShuffleExchangeExec -BroadcastHashJoinExec -BroadcastNestedLoopJoinExec -CartesianProductExec -ShuffledHashJoinExec -SortMergeJoinExec -WindowExec -Abs -Acos -Acosh -Add -AggregateExpression -Alias -And -ApproximatePercentile -ArrayContains -ArrayExcept -ArrayExists -ArrayIntersect -ArrayMax -ArrayMin -ArrayRemove -ArrayRepeat -ArrayTransform -ArrayUnion -ArraysOverlap -ArraysZip -Asin -Asinh -AtLeastNNonNulls -Atan -Atanh -AttributeReference -Average -BRound -BitLength -BitwiseAnd -BitwiseNot -BitwiseOr -BitwiseXor -CaseWhen -Cbrt -Ceil -CheckOverflow -Coalesce -CollectList -CollectSet -Concat -ConcatWs -Contains -Conv -Cos -Cosh -Cot -Count -CreateArray -CreateMap -CreateNamedStruct -CurrentRow$ -DateAdd -DateAddInterval -DateDiff -DateFormatClass -DateSub -DayOfMonth -DayOfWeek -DayOfYear -DenseRank -Divide -DynamicPruningExpression -ElementAt -EndsWith -EqualNullSafe -EqualTo -Exp -Explode -Expm1 -First -Flatten -Floor -FormatNumber -FromUTCTimestamp -FromUnixTime -GetArrayItem -GetArrayStructFields -GetJsonObject -GetMapValue -GetStructField -GetTimestamp -GreaterThan -GreaterThanOrEqual -Greatest -HiveGenericUDF -HiveSimpleUDF -Hour -Hypot -If -In -InSet -InitCap -InputFileBlockLength -InputFileBlockStart -InputFileName -IntegralDivide -IsNaN -IsNotNull -IsNull -JsonToStructs -JsonTuple -KnownFloatingPointNormalized -KnownNotNull -Lag -LambdaFunction -Last -LastDay -Lead -Least -Length -LessThan -LessThanOrEqual -Like -Literal -Log -Log10 -Log1p -Log2 -Logarithm -Lower -MakeDecimal -MapConcat -MapEntries -MapFilter -MapKeys -MapValues -Max -Md5 -MicrosToTimestamp -MillisToTimestamp -Min -Minute -MonotonicallyIncreasingID -Month -Multiply -Murmur3Hash -NaNvl -NamedLambdaVariable -NormalizeNaNAndZero -Not -NthValue -OctetLength -Or -Percentile -PercentRank -PivotFirst -Pmod -PosExplode -Pow -PreciseTimestampConversion -PromotePrecision -PythonUDF -Quarter -RLike -RaiseError -Rand -Rank -RegExpExtract -RegExpExtractAll -RegExpReplace -Remainder -ReplicateRows -Reverse -Rint -Round -RowNumber -ScalaUDF -ScalarSubquery -Second -SecondsToTimestamp -Sequence -ShiftLeft -ShiftRight -ShiftRightUnsigned -Signum -Sin -Sinh -Size -SortArray -SortOrder -SparkPartitionID -SpecifiedWindowFrame -Sqrt -Stack -StartsWith -StddevPop -StddevSamp -StringInstr -StringLPad -StringLocate -StringRPad -StringRepeat -StringReplace -StringSplit -StringToMap -StringTranslate -StringTrim -StringTrimLeft -StringTrimRight -StructsToJson -Substring -SubstringIndex -Subtract -Sum -Tan -Tanh -TimeAdd -ToDegrees -ToRadians -ToUnixTimestamp -TransformKeys -TransformValues -UnaryMinus -UnaryPositive -UnboundedFollowing$ -UnboundedPreceding$ -UnixTimestamp -UnscaledValue -Upper -VariancePop -VarianceSamp -WeekDay -WindowExpression -WindowSpecDefinition -XxHash64 -Year diff --git a/user_tools/custom_speedup_factors/validate_qualification_estimates.py b/user_tools/custom_speedup_factors/validate_qualification_estimates.py deleted file mode 100644 index 26e6b8ae2..000000000 --- a/user_tools/custom_speedup_factors/validate_qualification_estimates.py +++ /dev/null @@ -1,156 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) 2023, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Spark RAPIDS speedup factor validation script""" - -import argparse -import os -import glob -import subprocess - -import pandas as pd -from tabulate import tabulate - -parser = argparse.ArgumentParser(description="Speedup Factor Validation") -parser.add_argument("--cpu_log", type=str, help="Directory of CPU event log(s)", required=True) -parser.add_argument("--gpu_log", type=str, help="Directory of GPU event log(s)", required=True) -parser.add_argument("--output", type=str, help="Output folder for storing logs", required=True) -parser.add_argument("--speedups", type=str, help="Custom speedup factor file") -parser.add_argument("--cpu_profile", type=str, help="Directory of CPU profiler log(s)") -parser.add_argument("--gpu_profile", type=str, help="Directory of GPU profiler log(s)") -parser.add_argument("--jar", type=str, help="Custom tools jar") -parser.add_argument("--verbose", action="store_true", help="flag to generate full verbose output for logging raw node results") -args = parser.parse_args() - -cpu_log = args.cpu_log -gpu_log = args.gpu_log -cpu_profile = args.cpu_profile -gpu_profile = args.gpu_profile -output = args.output -speedups = args.speedups -jar = args.jar -verbose = args.verbose - -print(f"Output folder = {output}") -print(f"CPU event log = {cpu_log}") -print(f"GPU event log = {gpu_log}") - -subprocess.run(f"rm -rf {output}", shell=True) - -speedups_arg = "" -if speedups is not None: - speedups_arg = f"--speedup-factor-file {speedups}" -else: - speedups_arg = f"--speedup-factor-file {output}/generatedScores.csv" - -jar_arg = "" -if jar is not None: - jar_arg = f"--tools_jar {jar}" - -# Generate speedup factors - -### run GPU profiler if needed -gpu_profile_dir = "" -if gpu_profile is not None: - gpu_profile_dir = gpu_profile -else: - gpu_profile_dir = f"{output}/gpu_profile" - subprocess.run(f"spark_rapids_user_tools onprem profiling --csv {jar_arg} --local_folder {gpu_profile_dir} --eventlogs {gpu_log}", shell=True) - -if speedups is None: - ### run CPU profiler if needed - cpu_profile_dir = "" - if cpu_profile is not None: - cpu_profile_dir = cpu_profile - else: - cpu_profile_dir = f"{output}/cpu_profile" - subprocess.run(f"spark_rapids_user_tools onprem profiling --csv {jar_arg} --local_folder {cpu_profile_dir} --eventlogs {cpu_log}", shell=True) - - ### run speedup factor generation - subprocess.run(f"python generate_speedup_factors.py --cpu {cpu_profile_dir}/*/rapids_4_spark_profile --gpu {gpu_profile_dir}/*/rapids_4_spark_profile --output {output}/generatedScores.csv", shell=True) - -# Run qualification - -### set speedup factors to input or generated -speedups_arg = "" -if speedups is not None: - speedups_arg = f"--speedup-factor-file {speedups}" -else: - speedups_arg = f"--speedup-factor-file {output}/generatedScores.csv" - -### run CPU qualification -cpu_tmp_dir = f"{output}/cpu" -subprocess.run(f"spark_rapids_user_tools onprem qualification {speedups_arg} {jar_arg} --local_folder {cpu_tmp_dir} --eventlogs {cpu_log}", shell=True) - -# Parse and validate results - -### CPU log parsing -cpu_app_info = pd.read_csv(glob.glob(f"{cpu_tmp_dir}/*/rapids_4_spark_qualification_output/rapids_4_spark_qualification_output.csv")[0]) -cpu_query_info = cpu_app_info[["App Name", "App Duration", "Estimated GPU Duration", "Estimated GPU Speedup"]] - -### GPU log parsing -gpu_query_info = pd.DataFrame(columns = ['App Name', 'GPU Duration']) -counter = 0 - -for app in glob.glob(f"{gpu_profile_dir}/*/rapids_4_spark_profile/*/application_information.csv"): - app_info = pd.read_csv(app) - new_row = pd.DataFrame({'App Name': app_info.loc[0]["appName"], 'GPU Duration': app_info.loc[0]["duration"]}, index=[counter]) - gpu_query_info = pd.concat([gpu_query_info, new_row]) - counter = counter+1 - -merged_info = cpu_query_info.merge(gpu_query_info, left_on='App Name', right_on='App Name') -merged_info["Duration Error (sec)"] = (merged_info["Estimated GPU Duration"] - merged_info["GPU Duration"])/1000.0 -merged_info["Duration Error (pct)"] = (100.0*(merged_info["Estimated GPU Duration"] - merged_info["GPU Duration"])/merged_info["Estimated GPU Duration"]).apply(lambda x: round(x,2)) -merged_info["GPU Speedup"] = (merged_info["App Duration"]/merged_info["GPU Duration"]).apply(lambda x: round(x,2)) -merged_info["Speedup Error (abs)"] = merged_info["Estimated GPU Speedup"] - merged_info["GPU Speedup"] -merged_info["Speedup Error (pct)"] = (100.0*(merged_info["Estimated GPU Speedup"] - merged_info["GPU Speedup"])/merged_info["Estimated GPU Speedup"]).apply(lambda x: round(x,2)) - -print("==================================================") -print(" Application Details") -print("==================================================") -print(tabulate(merged_info, headers='keys', tablefmt='psql')) - -print("==================================================") -print(" Duration Error Metrics ") -print("==================================================") -print("Average duration error (seconds) = " + str(round(merged_info["Duration Error (sec)"].mean(),2))) -print("Median duration error (seconds) = " + str(round(merged_info["Duration Error (sec)"].median(),2))) -print("Min duration error (seconds) = " + str(round(merged_info["Duration Error (sec)"].min(),2))) -print("Max duration error (seconds) = " + str(round(merged_info["Duration Error (sec)"].max(),2))) -print("Average duration error (diff pct) = " + str(round(merged_info["Duration Error (pct)"].mean(),2))) -print("Median duration error (diff pct) = " + str(round(merged_info["Duration Error (pct)"].median(),2))) -print("Max duration error (diff pct) = " + str(round(merged_info["Duration Error (pct)"].max(),2))) -print("Average duration error (diff sec) = " + str(round(merged_info["Duration Error (sec)"].abs().mean(),2))) -print("Median duration error (diff sec) = " + str(round(merged_info["Duration Error (sec)"].abs().median(),2))) -print("Max duration error (diff sec) = " + str(round(merged_info["Duration Error (sec)"].abs().max(),2))) -print("Average duration error (abs pct) = " + str(round(merged_info["Duration Error (pct)"].abs().mean(),2))) -print("Median duration error (abs pct) = " + str(round(merged_info["Duration Error (pct)"].abs().median(),2))) -print("Max duration error (abs pct) = " + str(round(merged_info["Duration Error (pct)"].abs().max(),2))) -print("==================================================") -print(" Speedup Error Metrics ") -print("==================================================") -print("Average speedup error (diff) = " + str(round(merged_info["Speedup Error (abs)"].mean(),2))) -print("Median speedup error (diff) = " + str(round(merged_info["Speedup Error (abs)"].median(),2))) -print("Min speedup error (diff) = " + str(round(merged_info["Speedup Error (abs)"].min(),2))) -print("Max speedup error (diff) = " + str(round(merged_info["Speedup Error (abs)"].max(),2))) -print("Average speedup error (diff pct) = " + str(round(merged_info["Speedup Error (pct)"].mean(),2))) -print("Median speedup error (diff pct = " + str(round(merged_info["Speedup Error (pct)"].median(),2))) -print("Max speedup error (diff pct) = " + str(round(merged_info["Speedup Error (pct)"].max(),2))) -print("Average speedup error (abs diff) = " + str(round(merged_info["Speedup Error (abs)"].abs().mean(),2))) -print("Median speedup error (abs diff) = " + str(round(merged_info["Speedup Error (abs)"].abs().median(),2))) -print("Max speedup error (abs diff) = " + str(round(merged_info["Speedup Error (abs)"].abs().max(),2))) -print("Average speedup error (abs pct) = " + str(round(merged_info["Speedup Error (pct)"].abs().mean(),2))) -print("Median speedup error (abs pct) = " + str(round(merged_info["Speedup Error (pct)"].abs().median(),2))) -print("Max speedup error (abs pct) = " + str(round(merged_info["Speedup Error (pct)"].abs().max(),2)))