Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Add user qualification tool options for specifying pricing discounts for CPU or GPU cluster, or both #583

Merged
merged 4 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion user_tools/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ dependencies = [
"azure-storage-blob==12.17.0",
"adlfs==2023.4.0"
]
dynamic=["entry-points", "version"]
dynamic=["version"]
cindyyuanjiang marked this conversation as resolved.
Show resolved Hide resolved

[project.scripts]
spark_rapids_user_tools = "spark_rapids_pytools.wrapper:main"
Expand Down
45 changes: 43 additions & 2 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,43 @@ def __process_filter_args(self, arg_val: str):
selected_filter = QualFilterApp.fromstring(default_filter_txt)
self.ctxt.set_ctxt('filterApps', selected_filter)

def _process_price_discount_args(self):
raw_cpu_discount = self.wrapper_options.get('cpuDiscount')
raw_gpu_discount = self.wrapper_options.get('gpuDiscount')
raw_global_discount = self.wrapper_options.get('globalDiscount')
if raw_global_discount is not None and (raw_cpu_discount is not None or raw_gpu_discount is not None):
self.logger.error('Setting both global_discount and either cpu_discount or '
'gpu_discount is inconsistent.')
raise RuntimeError('Invalid arguments. If global_discount is specified, no additional '
'discount arguments (cpu_discount or gpu_discount) should be set.')
try:
cpu_discount = int(raw_cpu_discount) if raw_cpu_discount is not None else 0
gpu_discount = int(raw_gpu_discount) if raw_gpu_discount is not None else 0
global_discount = int(raw_global_discount) if raw_global_discount is not None else 0
except Exception as ex:
self.logger.error('Discount arguments have incorrect type.')
raise RuntimeError('Invalid arguments. Discount arguments cannot be converted to integer.') from ex

if cpu_discount < 0 or cpu_discount > 100:
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
self.logger.error('cpu_discount is out of range [0, 100]')
raise RuntimeError(f'Invalid arguments. cpu_discount = {cpu_discount} is an invalid '
'percentage.')
if gpu_discount < 0 or gpu_discount > 100:
self.logger.error('gpu_discount is out of range [0, 100]')
raise RuntimeError(f'Invalid arguments. gpu_discount = {gpu_discount} is an invalid '
'percentage.')
if global_discount < 0 or global_discount > 100:
self.logger.error('global_discount is out of range [0, 100]')
raise RuntimeError(f'Invalid arguments. global_discount = {global_discount} is an invalid '
'percentage.')

if global_discount != 0:
self.ctxt.set_ctxt('cpu_discount', global_discount)
self.ctxt.set_ctxt('gpu_discount', global_discount)
else:
self.ctxt.set_ctxt('cpu_discount', cpu_discount)
self.ctxt.set_ctxt('gpu_discount', gpu_discount)

def _process_custom_args(self):
"""
Qualification tool processes extra arguments:
Expand Down Expand Up @@ -322,6 +359,7 @@ def _process_custom_args(self):

self._process_offline_cluster_args()
self._process_eventlogs_args()
self._process_price_discount_args()
# This is noise to dump everything
# self.logger.debug('%s custom arguments = %s', self.pretty_name(), self.ctxt.props['wrapperCtx'])

Expand Down Expand Up @@ -528,8 +566,11 @@ def __calc_apps_cost(self,
'savingRecommendationsRanges')

def get_costs_for_single_app(df_row, estimator: SavingsEstimator) -> pd.Series:
cpu_cost, gpu_cost, est_savings = estimator.get_costs_and_savings(df_row['App Duration'],
df_row['Estimated GPU Duration'])
raw_cpu_cost, raw_gpu_cost, _ = estimator.get_costs_and_savings(df_row['App Duration'],
df_row['Estimated GPU Duration'])
cpu_cost = (100 - self.ctxt.get_ctxt('cpu_discount')) / 100 * raw_cpu_cost
gpu_cost = (100 - self.ctxt.get_ctxt('gpu_discount')) / 100 * raw_gpu_cost
est_savings = 100.0 - ((100.0 * gpu_cost) / cpu_cost)
# We do not want to mistakenly mark a Not-applicable app as Recommended in the savings column
if df_row[speedup_rec_col] == 'Not Applicable':
savings_recommendations = 'Not Applicable'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ def qualification(cpu_cluster: str = None,
QualGpuClusterReshapeType.get_default()),
jvm_heap_size: int = 24,
verbose: bool = False,
cpu_discount: int = None,
gpu_discount: int = None,
global_discount: int = None,
cindyyuanjiang marked this conversation as resolved.
Show resolved Hide resolved
**rapids_options) -> None:
"""
The Qualification tool analyzes Spark events generated from CPU based Spark applications to
Expand Down Expand Up @@ -86,9 +89,15 @@ def qualification(cpu_cluster: str = None,
It accepts one of the following ("CLUSTER", "JOB" and the default value "MATCH").
"MATCH": keep GPU cluster same number of nodes as CPU cluster;
"CLUSTER": recommend optimal GPU cluster by cost for entire cluster;
"JOB": recommend optimal GPU cluster by cost per job
:param verbose: True or False to enable verbosity to the wrapper script.
"JOB": recommend optimal GPU cluster by cost per job.
:param jvm_heap_size: The maximum heap size of the JVM in gigabytes.
:param verbose: True or False to enable verbosity to the wrapper script.
:param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param gpu_discount: A percent discount for the gpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param global_discount: A percent discount for both the cpu and gpu cluster costs in the form of an
integer value (e.g. 30 for 30% discount).
:param rapids_options: A list of valid Qualification tool options.
Note that the wrapper ignores ["output-directory", "platform"] flags, and it does not support
multiple "spark-property" arguments.
Expand Down Expand Up @@ -119,7 +128,10 @@ def qualification(cpu_cluster: str = None,
'eventlogs': eventlogs,
'filterApps': filter_apps,
'toolsJar': tools_jar,
'gpuClusterRecommendation': gpu_cluster_recommendation
'gpuClusterRecommendation': gpu_cluster_recommendation,
'cpuDiscount': cpu_discount,
'gpuDiscount': gpu_discount,
'globalDiscount': global_discount
}
QualificationAsLocal(platform_type=CspEnv.DATABRICKS_AWS,
cluster=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ def qualification(cpu_cluster: str = None,
QualGpuClusterReshapeType.get_default()),
jvm_heap_size: int = 24,
verbose: bool = False,
cpu_discount: int = None,
gpu_discount: int = None,
global_discount: int = None,
**rapids_options) -> None:
"""
The Qualification tool analyzes Spark events generated from CPU based Spark applications to
Expand Down Expand Up @@ -84,9 +87,15 @@ def qualification(cpu_cluster: str = None,
It accepts one of the following ("CLUSTER", "JOB" and the default value "MATCH").
"MATCH": keep GPU cluster same number of nodes as CPU cluster;
"CLUSTER": recommend optimal GPU cluster by cost for entire cluster;
"JOB": recommend optimal GPU cluster by cost per job
:param verbose: True or False to enable verbosity to the wrapper script.
"JOB": recommend optimal GPU cluster by cost per job.
:param jvm_heap_size: The maximum heap size of the JVM in gigabytes.
:param verbose: True or False to enable verbosity to the wrapper script.
:param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param gpu_discount: A percent discount for the gpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param global_discount: A percent discount for both the cpu and gpu cluster costs in the form of an
integer value (e.g. 30 for 30% discount).
:param rapids_options: A list of valid Qualification tool options.
Note that the wrapper ignores ["output-directory", "platform"] flags, and it does not support
multiple "spark-property" arguments.
Expand Down Expand Up @@ -116,7 +125,10 @@ def qualification(cpu_cluster: str = None,
'eventlogs': eventlogs,
'filterApps': filter_apps,
'toolsJar': tools_jar,
'gpuClusterRecommendation': gpu_cluster_recommendation
'gpuClusterRecommendation': gpu_cluster_recommendation,
'cpuDiscount': cpu_discount,
'gpuDiscount': gpu_discount,
'globalDiscount': global_discount
}
QualificationAsLocal(platform_type=CspEnv.DATABRICKS_AZURE,
cluster=None,
Expand Down
14 changes: 13 additions & 1 deletion user_tools/src/spark_rapids_pytools/wrappers/dataproc_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ def qualification(cpu_cluster: str = None,
QualGpuClusterReshapeType.get_default()),
jvm_heap_size: int = 24,
verbose: bool = False,
cpu_discount: int = None,
gpu_discount: int = None,
global_discount: int = None,
**rapids_options) -> None:
"""
The Qualification tool analyzes Spark events generated from CPU based Spark applications to
Expand Down Expand Up @@ -87,6 +90,12 @@ def qualification(cpu_cluster: str = None,
"JOB": recommend optimal GPU cluster by cost per job
:param jvm_heap_size: The maximum heap size of the JVM in gigabytes
:param verbose: True or False to enable verbosity to the wrapper script
:param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param gpu_discount: A percent discount for the gpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param global_discount: A percent discount for both the cpu and gpu cluster costs in the form of an
integer value (e.g. 30 for 30% discount).
:param rapids_options: A list of valid Qualification tool options.
Note that the wrapper ignores ["output-directory", "platform"] flags, and it does not support
multiple "spark-property" arguments.
Expand Down Expand Up @@ -114,7 +123,10 @@ def qualification(cpu_cluster: str = None,
'eventlogs': eventlogs,
'filterApps': filter_apps,
'toolsJar': tools_jar,
'gpuClusterRecommendation': gpu_cluster_recommendation
'gpuClusterRecommendation': gpu_cluster_recommendation,
'cpuDiscount': cpu_discount,
'gpuDiscount': gpu_discount,
'globalDiscount': global_discount
}

tool_obj = QualificationAsLocal(platform_type=CspEnv.DATAPROC,
Expand Down
14 changes: 13 additions & 1 deletion user_tools/src/spark_rapids_pytools/wrappers/emr_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def qualification(cpu_cluster: str = None,
QualGpuClusterReshapeType.get_default()),
jvm_heap_size: int = 24,
verbose: bool = False,
cpu_discount: int = None,
gpu_discount: int = None,
global_discount: int = None,
**rapids_options) -> None:
"""
The Qualification tool analyzes Spark events generated from CPU based Spark applications to
Expand Down Expand Up @@ -85,6 +88,12 @@ def qualification(cpu_cluster: str = None,
"JOB": recommend optimal GPU cluster by cost per job
:param jvm_heap_size: The maximum heap size of the JVM in gigabytes
:param verbose: True or False to enable verbosity to the wrapper script
:param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param gpu_discount: A percent discount for the gpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param global_discount: A percent discount for both the cpu and gpu cluster costs in the form of an
integer value (e.g. 30 for 30% discount).
:param rapids_options: A list of valid Qualification tool options.
Note that the wrapper ignores ["output-directory", "platform"] flags, and it does not support
multiple "spark-property" arguments.
Expand Down Expand Up @@ -112,7 +121,10 @@ def qualification(cpu_cluster: str = None,
'eventlogs': eventlogs,
'filterApps': filter_apps,
'toolsJar': tools_jar,
'gpuClusterRecommendation': gpu_cluster_recommendation
'gpuClusterRecommendation': gpu_cluster_recommendation,
'cpuDiscount': cpu_discount,
'gpuDiscount': gpu_discount,
'globalDiscount': global_discount
}
QualificationAsLocal(platform_type=CspEnv.EMR,
cluster=None,
Expand Down
14 changes: 13 additions & 1 deletion user_tools/src/spark_rapids_pytools/wrappers/onprem_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ def qualification(cpu_cluster: str = None,
QualGpuClusterReshapeType.get_default()),
jvm_heap_size: int = 24,
verbose: bool = False,
cpu_discount: int = None,
gpu_discount: int = None,
global_discount: int = None,
**rapids_options) -> None:
"""
The Qualification tool analyzes Spark events generated from CPU based Spark applications to
Expand Down Expand Up @@ -65,6 +68,12 @@ def qualification(cpu_cluster: str = None,
"JOB": recommend optimal GPU cluster by cost per job
:param jvm_heap_size: The maximum heap size of the JVM in gigabytes
:param verbose: True or False to enable verbosity to the wrapper script
:param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param gpu_discount: A percent discount for the gpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param global_discount: A percent discount for both the cpu and gpu cluster costs in the form of an
integer value (e.g. 30 for 30% discount).
:param rapids_options: A list of valid Qualification tool options.
Note that the wrapper ignores ["output-directory", "platform"] flags, and it does not support
multiple "spark-property" arguments.
Expand Down Expand Up @@ -103,7 +112,10 @@ def qualification(cpu_cluster: str = None,
'filterApps': filter_apps,
'toolsJar': tools_jar,
'gpuClusterRecommendation': gpu_cluster_recommendation,
'targetPlatform': target_platform
'targetPlatform': target_platform,
'cpuDiscount': cpu_discount,
'gpuDiscount': gpu_discount,
'globalDiscount': global_discount
}
tool_obj = QualificationAsLocal(platform_type=CspEnv.ONPREM,
output_folder=local_folder,
Expand Down
11 changes: 10 additions & 1 deletion user_tools/src/spark_rapids_tools/cmdli/argprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,12 +303,18 @@ class QualifyUserArgModel(ToolUserArgModel):
target_platform: Optional[CspEnv] = None
filter_apps: Optional[QualFilterApp] = None
gpu_cluster_recommendation: Optional[QualGpuClusterReshapeType] = None
cpu_discount: Optional[int] = None
gpu_discount: Optional[int] = None
global_discount: Optional[int] = None

def init_tool_args(self):
self.p_args['toolArgs']['platform'] = self.platform
self.p_args['toolArgs']['savingsCalculations'] = True
self.p_args['toolArgs']['filterApps'] = self.filter_apps
self.p_args['toolArgs']['targetPlatform'] = self.target_platform
self.p_args['toolArgs']['cpuDiscount'] = self.cpu_discount
self.p_args['toolArgs']['gpuDiscount'] = self.gpu_discount
self.p_args['toolArgs']['globalDiscount'] = self.global_discount
# check the reshapeType argument
if self.gpu_cluster_recommendation is None:
self.p_args['toolArgs']['gpuClusterRecommendation'] = QualGpuClusterReshapeType.get_default()
Expand Down Expand Up @@ -401,7 +407,10 @@ def build_tools_args(self) -> dict:
'toolsJar': None,
'gpuClusterRecommendation': self.p_args['toolArgs']['gpuClusterRecommendation'],
# used to initialize the pricing information
'targetPlatform': self.p_args['toolArgs']['targetPlatform']
'targetPlatform': self.p_args['toolArgs']['targetPlatform'],
'cpuDiscount': self.p_args['toolArgs']['cpuDiscount'],
'gpuDiscount': self.p_args['toolArgs']['gpuDiscount'],
'globalDiscount': self.p_args['toolArgs']['globalDiscount']
}
return wrapped_args

Expand Down
12 changes: 12 additions & 0 deletions user_tools/src/spark_rapids_tools/cmdli/tools_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ def qualification(self,
target_platform: str = None,
output_folder: str = None,
filter_apps: str = None,
cpu_discount: int = None,
gpu_discount: int = None,
global_discount: int = None,
gpu_cluster_recommendation: str = QualGpuClusterReshapeType.tostring(
QualGpuClusterReshapeType.get_default())):
"""The Qualification cmd provides estimated running costs and speedups by migrating Apache
Expand Down Expand Up @@ -75,6 +78,12 @@ def qualification(self,
'Recommended', or 'Strongly Recommended' based on speedups. "SAVINGS"
lists all the apps that have positive estimated GPU savings except for the apps that
are "Not Applicable"
:param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param gpu_discount: A percent discount for the gpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param global_discount: A percent discount for both the cpu and gpu cluster costs in the form of an
integer value (e.g. 30 for 30% discount).
:param gpu_cluster_recommendation: The type of GPU cluster recommendation to generate.
Requires "Cluster".

Expand All @@ -91,6 +100,9 @@ def qualification(self,
target_platform=target_platform,
output_folder=output_folder,
filter_apps=filter_apps,
cpu_discount=cpu_discount,
gpu_discount=gpu_discount,
global_discount=global_discount,
gpu_cluster_recommendation=gpu_cluster_recommendation)
if qual_args:
tool_obj = QualificationAsLocal(platform_type=qual_args['runtimePlatform'],
Expand Down
Loading