Skip to content

Commit

Permalink
[FEA] Add user qualification tool options for specifying pricing disc…
Browse files Browse the repository at this point in the history
…ounts for CPU or GPU cluster, or both (#583)

* initial implementation for adding discount options for both ascli and spark_rapids_user_tools

Signed-off-by: cindyyuanjiang <[email protected]>

* updated est savings cal

Signed-off-by: cindyyuanjiang <[email protected]>

* fix python formatting issues

Signed-off-by: cindyyuanjiang <[email protected]>

* removed redundant code

Signed-off-by: cindyyuanjiang <[email protected]>

---------

Signed-off-by: cindyyuanjiang <[email protected]>
  • Loading branch information
cindyyuanjiang authored Sep 27, 2023
1 parent 1437b7a commit 0afe2e8
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 12 deletions.
42 changes: 40 additions & 2 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,40 @@ def __process_filter_args(self, arg_val: str):
selected_filter = QualFilterApp.fromstring(default_filter_txt)
self.ctxt.set_ctxt('filterApps', selected_filter)

def _process_price_discount_args(self):
def check_discount_percentage(discount_type: str, discount_value: int):
if discount_value < 0 or discount_value > 100:
self.logger.error('%s is out of range [0, 100]', discount_type)
raise RuntimeError(f'Invalid arguments. {discount_type} = {discount_value} is an invalid '
'percentage.')

raw_cpu_discount = self.wrapper_options.get('cpuDiscount')
raw_gpu_discount = self.wrapper_options.get('gpuDiscount')
raw_global_discount = self.wrapper_options.get('globalDiscount')
if raw_global_discount is not None and (raw_cpu_discount is not None or raw_gpu_discount is not None):
self.logger.error('Setting both global_discount and either cpu_discount or '
'gpu_discount is inconsistent.')
raise RuntimeError('Invalid arguments. If global_discount is specified, no additional '
'discount arguments (cpu_discount or gpu_discount) should be set.')
try:
cpu_discount = int(raw_cpu_discount) if raw_cpu_discount is not None else 0
gpu_discount = int(raw_gpu_discount) if raw_gpu_discount is not None else 0
global_discount = int(raw_global_discount) if raw_global_discount is not None else 0
except Exception as ex:
self.logger.error('Discount arguments have incorrect type.')
raise RuntimeError('Invalid arguments. Discount arguments cannot be converted to integer.') from ex

check_discount_percentage('cpu_discount', cpu_discount)
check_discount_percentage('gpu_discount', gpu_discount)
check_discount_percentage('global_discount', global_discount)

if global_discount != 0:
self.ctxt.set_ctxt('cpu_discount', global_discount)
self.ctxt.set_ctxt('gpu_discount', global_discount)
else:
self.ctxt.set_ctxt('cpu_discount', cpu_discount)
self.ctxt.set_ctxt('gpu_discount', gpu_discount)

def _process_custom_args(self):
"""
Qualification tool processes extra arguments:
Expand Down Expand Up @@ -322,6 +356,7 @@ def _process_custom_args(self):

self._process_offline_cluster_args()
self._process_eventlogs_args()
self._process_price_discount_args()
# This is noise to dump everything
# self.logger.debug('%s custom arguments = %s', self.pretty_name(), self.ctxt.props['wrapperCtx'])

Expand Down Expand Up @@ -528,8 +563,11 @@ def __calc_apps_cost(self,
'savingRecommendationsRanges')

def get_costs_for_single_app(df_row, estimator: SavingsEstimator) -> pd.Series:
cpu_cost, gpu_cost, est_savings = estimator.get_costs_and_savings(df_row['App Duration'],
df_row['Estimated GPU Duration'])
raw_cpu_cost, raw_gpu_cost, _ = estimator.get_costs_and_savings(df_row['App Duration'],
df_row['Estimated GPU Duration'])
cpu_cost = (100 - self.ctxt.get_ctxt('cpu_discount')) / 100 * raw_cpu_cost
gpu_cost = (100 - self.ctxt.get_ctxt('gpu_discount')) / 100 * raw_gpu_cost
est_savings = 100.0 - ((100.0 * gpu_cost) / cpu_cost)
# We do not want to mistakenly mark a Not-applicable app as Recommended in the savings column
if df_row[speedup_rec_col] == 'Not Applicable':
savings_recommendations = 'Not Applicable'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def qualification(cpu_cluster: str = None,
QualGpuClusterReshapeType.get_default()),
jvm_heap_size: int = 24,
verbose: bool = False,
cpu_discount: int = None,
gpu_discount: int = None,
global_discount: int = None,
**rapids_options) -> None:
"""
The Qualification tool analyzes Spark events generated from CPU based Spark applications to
Expand Down Expand Up @@ -87,9 +90,15 @@ def qualification(cpu_cluster: str = None,
It accepts one of the following ("CLUSTER", "JOB" and the default value "MATCH").
"MATCH": keep GPU cluster same number of nodes as CPU cluster;
"CLUSTER": recommend optimal GPU cluster by cost for entire cluster;
"JOB": recommend optimal GPU cluster by cost per job
:param verbose: True or False to enable verbosity to the wrapper script.
"JOB": recommend optimal GPU cluster by cost per job.
:param jvm_heap_size: The maximum heap size of the JVM in gigabytes.
:param verbose: True or False to enable verbosity to the wrapper script.
:param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param gpu_discount: A percent discount for the gpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param global_discount: A percent discount for both the cpu and gpu cluster costs in the form of an
integer value (e.g. 30 for 30% discount).
:param rapids_options: A list of valid Qualification tool options.
Note that the wrapper ignores ["output-directory", "platform"] flags, and it does not support
multiple "spark-property" arguments.
Expand Down Expand Up @@ -120,7 +129,10 @@ def qualification(cpu_cluster: str = None,
'eventlogs': eventlogs,
'filterApps': filter_apps,
'toolsJar': tools_jar,
'gpuClusterRecommendation': gpu_cluster_recommendation
'gpuClusterRecommendation': gpu_cluster_recommendation,
'cpuDiscount': cpu_discount,
'gpuDiscount': gpu_discount,
'globalDiscount': global_discount
}
QualificationAsLocal(platform_type=CspEnv.DATABRICKS_AWS,
cluster=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ def qualification(cpu_cluster: str = None,
QualGpuClusterReshapeType.get_default()),
jvm_heap_size: int = 24,
verbose: bool = False,
cpu_discount: int = None,
gpu_discount: int = None,
global_discount: int = None,
**rapids_options) -> None:
"""
The Qualification tool analyzes Spark events generated from CPU based Spark applications to
Expand Down Expand Up @@ -85,9 +88,15 @@ def qualification(cpu_cluster: str = None,
It accepts one of the following ("CLUSTER", "JOB" and the default value "MATCH").
"MATCH": keep GPU cluster same number of nodes as CPU cluster;
"CLUSTER": recommend optimal GPU cluster by cost for entire cluster;
"JOB": recommend optimal GPU cluster by cost per job
:param verbose: True or False to enable verbosity to the wrapper script.
"JOB": recommend optimal GPU cluster by cost per job.
:param jvm_heap_size: The maximum heap size of the JVM in gigabytes.
:param verbose: True or False to enable verbosity to the wrapper script.
:param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param gpu_discount: A percent discount for the gpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param global_discount: A percent discount for both the cpu and gpu cluster costs in the form of an
integer value (e.g. 30 for 30% discount).
:param rapids_options: A list of valid Qualification tool options.
Note that the wrapper ignores ["output-directory", "platform"] flags, and it does not support
multiple "spark-property" arguments.
Expand Down Expand Up @@ -117,7 +126,10 @@ def qualification(cpu_cluster: str = None,
'eventlogs': eventlogs,
'filterApps': filter_apps,
'toolsJar': tools_jar,
'gpuClusterRecommendation': gpu_cluster_recommendation
'gpuClusterRecommendation': gpu_cluster_recommendation,
'cpuDiscount': cpu_discount,
'gpuDiscount': gpu_discount,
'globalDiscount': global_discount
}
QualificationAsLocal(platform_type=CspEnv.DATABRICKS_AZURE,
cluster=None,
Expand Down
14 changes: 13 additions & 1 deletion user_tools/src/spark_rapids_pytools/wrappers/dataproc_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ def qualification(cpu_cluster: str = None,
QualGpuClusterReshapeType.get_default()),
jvm_heap_size: int = 24,
verbose: bool = False,
cpu_discount: int = None,
gpu_discount: int = None,
global_discount: int = None,
**rapids_options) -> None:
"""
The Qualification tool analyzes Spark events generated from CPU based Spark applications to
Expand Down Expand Up @@ -87,6 +90,12 @@ def qualification(cpu_cluster: str = None,
"JOB": recommend optimal GPU cluster by cost per job
:param jvm_heap_size: The maximum heap size of the JVM in gigabytes
:param verbose: True or False to enable verbosity to the wrapper script
:param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param gpu_discount: A percent discount for the gpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param global_discount: A percent discount for both the cpu and gpu cluster costs in the form of an
integer value (e.g. 30 for 30% discount).
:param rapids_options: A list of valid Qualification tool options.
Note that the wrapper ignores ["output-directory", "platform"] flags, and it does not support
multiple "spark-property" arguments.
Expand Down Expand Up @@ -114,7 +123,10 @@ def qualification(cpu_cluster: str = None,
'eventlogs': eventlogs,
'filterApps': filter_apps,
'toolsJar': tools_jar,
'gpuClusterRecommendation': gpu_cluster_recommendation
'gpuClusterRecommendation': gpu_cluster_recommendation,
'cpuDiscount': cpu_discount,
'gpuDiscount': gpu_discount,
'globalDiscount': global_discount
}

tool_obj = QualificationAsLocal(platform_type=CspEnv.DATAPROC,
Expand Down
14 changes: 13 additions & 1 deletion user_tools/src/spark_rapids_pytools/wrappers/emr_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def qualification(cpu_cluster: str = None,
QualGpuClusterReshapeType.get_default()),
jvm_heap_size: int = 24,
verbose: bool = False,
cpu_discount: int = None,
gpu_discount: int = None,
global_discount: int = None,
**rapids_options) -> None:
"""
The Qualification tool analyzes Spark events generated from CPU based Spark applications to
Expand Down Expand Up @@ -85,6 +88,12 @@ def qualification(cpu_cluster: str = None,
"JOB": recommend optimal GPU cluster by cost per job
:param jvm_heap_size: The maximum heap size of the JVM in gigabytes
:param verbose: True or False to enable verbosity to the wrapper script
:param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param gpu_discount: A percent discount for the gpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param global_discount: A percent discount for both the cpu and gpu cluster costs in the form of an
integer value (e.g. 30 for 30% discount).
:param rapids_options: A list of valid Qualification tool options.
Note that the wrapper ignores ["output-directory", "platform"] flags, and it does not support
multiple "spark-property" arguments.
Expand Down Expand Up @@ -112,7 +121,10 @@ def qualification(cpu_cluster: str = None,
'eventlogs': eventlogs,
'filterApps': filter_apps,
'toolsJar': tools_jar,
'gpuClusterRecommendation': gpu_cluster_recommendation
'gpuClusterRecommendation': gpu_cluster_recommendation,
'cpuDiscount': cpu_discount,
'gpuDiscount': gpu_discount,
'globalDiscount': global_discount
}
QualificationAsLocal(platform_type=CspEnv.EMR,
cluster=None,
Expand Down
14 changes: 13 additions & 1 deletion user_tools/src/spark_rapids_pytools/wrappers/onprem_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ def qualification(cpu_cluster: str = None,
QualGpuClusterReshapeType.get_default()),
jvm_heap_size: int = 24,
verbose: bool = False,
cpu_discount: int = None,
gpu_discount: int = None,
global_discount: int = None,
**rapids_options) -> None:
"""
The Qualification tool analyzes Spark events generated from CPU based Spark applications to
Expand Down Expand Up @@ -65,6 +68,12 @@ def qualification(cpu_cluster: str = None,
"JOB": recommend optimal GPU cluster by cost per job
:param jvm_heap_size: The maximum heap size of the JVM in gigabytes
:param verbose: True or False to enable verbosity to the wrapper script
:param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param gpu_discount: A percent discount for the gpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param global_discount: A percent discount for both the cpu and gpu cluster costs in the form of an
integer value (e.g. 30 for 30% discount).
:param rapids_options: A list of valid Qualification tool options.
Note that the wrapper ignores ["output-directory", "platform"] flags, and it does not support
multiple "spark-property" arguments.
Expand Down Expand Up @@ -103,7 +112,10 @@ def qualification(cpu_cluster: str = None,
'filterApps': filter_apps,
'toolsJar': tools_jar,
'gpuClusterRecommendation': gpu_cluster_recommendation,
'targetPlatform': target_platform
'targetPlatform': target_platform,
'cpuDiscount': cpu_discount,
'gpuDiscount': gpu_discount,
'globalDiscount': global_discount
}
tool_obj = QualificationAsLocal(platform_type=CspEnv.ONPREM,
output_folder=local_folder,
Expand Down
11 changes: 10 additions & 1 deletion user_tools/src/spark_rapids_tools/cmdli/argprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,18 @@ class QualifyUserArgModel(ToolUserArgModel):
target_platform: Optional[CspEnv] = None
filter_apps: Optional[QualFilterApp] = None
gpu_cluster_recommendation: Optional[QualGpuClusterReshapeType] = None
cpu_discount: Optional[int] = None
gpu_discount: Optional[int] = None
global_discount: Optional[int] = None

def init_tool_args(self):
self.p_args['toolArgs']['platform'] = self.platform
self.p_args['toolArgs']['savingsCalculations'] = True
self.p_args['toolArgs']['filterApps'] = self.filter_apps
self.p_args['toolArgs']['targetPlatform'] = self.target_platform
self.p_args['toolArgs']['cpuDiscount'] = self.cpu_discount
self.p_args['toolArgs']['gpuDiscount'] = self.gpu_discount
self.p_args['toolArgs']['globalDiscount'] = self.global_discount
# check the reshapeType argument
if self.gpu_cluster_recommendation is None:
self.p_args['toolArgs']['gpuClusterRecommendation'] = QualGpuClusterReshapeType.get_default()
Expand Down Expand Up @@ -405,7 +411,10 @@ def build_tools_args(self) -> dict:
'toolsJar': None,
'gpuClusterRecommendation': self.p_args['toolArgs']['gpuClusterRecommendation'],
# used to initialize the pricing information
'targetPlatform': self.p_args['toolArgs']['targetPlatform']
'targetPlatform': self.p_args['toolArgs']['targetPlatform'],
'cpuDiscount': self.p_args['toolArgs']['cpuDiscount'],
'gpuDiscount': self.p_args['toolArgs']['gpuDiscount'],
'globalDiscount': self.p_args['toolArgs']['globalDiscount']
}
return wrapped_args

Expand Down
12 changes: 12 additions & 0 deletions user_tools/src/spark_rapids_tools/cmdli/tools_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ def qualification(self,
target_platform: str = None,
output_folder: str = None,
filter_apps: str = None,
cpu_discount: int = None,
gpu_discount: int = None,
global_discount: int = None,
gpu_cluster_recommendation: str = QualGpuClusterReshapeType.tostring(
QualGpuClusterReshapeType.get_default()),
verbose: bool = False):
Expand Down Expand Up @@ -77,6 +80,12 @@ def qualification(self,
'Recommended', or 'Strongly Recommended' based on speedups. "SAVINGS"
lists all the apps that have positive estimated GPU savings except for the apps that
are "Not Applicable"
:param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param gpu_discount: A percent discount for the gpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param global_discount: A percent discount for both the cpu and gpu cluster costs in the form of an
integer value (e.g. 30 for 30% discount).
:param gpu_cluster_recommendation: The type of GPU cluster recommendation to generate.
Requires "Cluster".
Expand All @@ -96,6 +105,9 @@ def qualification(self,
target_platform=target_platform,
output_folder=output_folder,
filter_apps=filter_apps,
cpu_discount=cpu_discount,
gpu_discount=gpu_discount,
global_discount=global_discount,
gpu_cluster_recommendation=gpu_cluster_recommendation)
if qual_args:
tool_obj = QualificationAsLocal(platform_type=qual_args['runtimePlatform'],
Expand Down

0 comments on commit 0afe2e8

Please sign in to comment.