diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index a8861c6a1..fe6e10047 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -291,6 +291,40 @@ def __process_filter_args(self, arg_val: str): selected_filter = QualFilterApp.fromstring(default_filter_txt) self.ctxt.set_ctxt('filterApps', selected_filter) + def _process_price_discount_args(self): + def check_discount_percentage(discount_type: str, discount_value: int): + if discount_value < 0 or discount_value > 100: + self.logger.error('%s is out of range [0, 100]', discount_type) + raise RuntimeError(f'Invalid arguments. {discount_type} = {discount_value} is an invalid ' + 'percentage.') + + raw_cpu_discount = self.wrapper_options.get('cpuDiscount') + raw_gpu_discount = self.wrapper_options.get('gpuDiscount') + raw_global_discount = self.wrapper_options.get('globalDiscount') + if raw_global_discount is not None and (raw_cpu_discount is not None or raw_gpu_discount is not None): + self.logger.error('Setting both global_discount and either cpu_discount or ' + 'gpu_discount is inconsistent.') + raise RuntimeError('Invalid arguments. If global_discount is specified, no additional ' + 'discount arguments (cpu_discount or gpu_discount) should be set.') + try: + cpu_discount = int(raw_cpu_discount) if raw_cpu_discount is not None else 0 + gpu_discount = int(raw_gpu_discount) if raw_gpu_discount is not None else 0 + global_discount = int(raw_global_discount) if raw_global_discount is not None else 0 + except Exception as ex: + self.logger.error('Discount arguments have incorrect type.') + raise RuntimeError('Invalid arguments. Discount arguments cannot be converted to integer.') from ex + + check_discount_percentage('cpu_discount', cpu_discount) + check_discount_percentage('gpu_discount', gpu_discount) + check_discount_percentage('global_discount', global_discount) + + if global_discount != 0: + self.ctxt.set_ctxt('cpu_discount', global_discount) + self.ctxt.set_ctxt('gpu_discount', global_discount) + else: + self.ctxt.set_ctxt('cpu_discount', cpu_discount) + self.ctxt.set_ctxt('gpu_discount', gpu_discount) + def _process_custom_args(self): """ Qualification tool processes extra arguments: @@ -322,6 +356,7 @@ def _process_custom_args(self): self._process_offline_cluster_args() self._process_eventlogs_args() + self._process_price_discount_args() # This is noise to dump everything # self.logger.debug('%s custom arguments = %s', self.pretty_name(), self.ctxt.props['wrapperCtx']) @@ -528,8 +563,11 @@ def __calc_apps_cost(self, 'savingRecommendationsRanges') def get_costs_for_single_app(df_row, estimator: SavingsEstimator) -> pd.Series: - cpu_cost, gpu_cost, est_savings = estimator.get_costs_and_savings(df_row['App Duration'], - df_row['Estimated GPU Duration']) + raw_cpu_cost, raw_gpu_cost, _ = estimator.get_costs_and_savings(df_row['App Duration'], + df_row['Estimated GPU Duration']) + cpu_cost = (100 - self.ctxt.get_ctxt('cpu_discount')) / 100 * raw_cpu_cost + gpu_cost = (100 - self.ctxt.get_ctxt('gpu_discount')) / 100 * raw_gpu_cost + est_savings = 100.0 - ((100.0 * gpu_cost) / cpu_cost) # We do not want to mistakenly mark a Not-applicable app as Recommended in the savings column if df_row[speedup_rec_col] == 'Not Applicable': savings_recommendations = 'Not Applicable' diff --git a/user_tools/src/spark_rapids_pytools/wrappers/databricks_aws_wrapper.py b/user_tools/src/spark_rapids_pytools/wrappers/databricks_aws_wrapper.py index 6b83a04ce..fc2ff55bd 100644 --- a/user_tools/src/spark_rapids_pytools/wrappers/databricks_aws_wrapper.py +++ b/user_tools/src/spark_rapids_pytools/wrappers/databricks_aws_wrapper.py @@ -42,6 +42,9 @@ def qualification(cpu_cluster: str = None, QualGpuClusterReshapeType.get_default()), jvm_heap_size: int = 24, verbose: bool = False, + cpu_discount: int = None, + gpu_discount: int = None, + global_discount: int = None, **rapids_options) -> None: """ The Qualification tool analyzes Spark events generated from CPU based Spark applications to @@ -87,9 +90,15 @@ def qualification(cpu_cluster: str = None, It accepts one of the following ("CLUSTER", "JOB" and the default value "MATCH"). "MATCH": keep GPU cluster same number of nodes as CPU cluster; "CLUSTER": recommend optimal GPU cluster by cost for entire cluster; - "JOB": recommend optimal GPU cluster by cost per job - :param verbose: True or False to enable verbosity to the wrapper script. + "JOB": recommend optimal GPU cluster by cost per job. :param jvm_heap_size: The maximum heap size of the JVM in gigabytes. + :param verbose: True or False to enable verbosity to the wrapper script. + :param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value + (e.g. 30 for 30% discount). + :param gpu_discount: A percent discount for the gpu cluster cost in the form of an integer value + (e.g. 30 for 30% discount). + :param global_discount: A percent discount for both the cpu and gpu cluster costs in the form of an + integer value (e.g. 30 for 30% discount). :param rapids_options: A list of valid Qualification tool options. Note that the wrapper ignores ["output-directory", "platform"] flags, and it does not support multiple "spark-property" arguments. @@ -120,7 +129,10 @@ def qualification(cpu_cluster: str = None, 'eventlogs': eventlogs, 'filterApps': filter_apps, 'toolsJar': tools_jar, - 'gpuClusterRecommendation': gpu_cluster_recommendation + 'gpuClusterRecommendation': gpu_cluster_recommendation, + 'cpuDiscount': cpu_discount, + 'gpuDiscount': gpu_discount, + 'globalDiscount': global_discount } QualificationAsLocal(platform_type=CspEnv.DATABRICKS_AWS, cluster=None, diff --git a/user_tools/src/spark_rapids_pytools/wrappers/databricks_azure_wrapper.py b/user_tools/src/spark_rapids_pytools/wrappers/databricks_azure_wrapper.py index 1f4de26c6..ef16ad299 100644 --- a/user_tools/src/spark_rapids_pytools/wrappers/databricks_azure_wrapper.py +++ b/user_tools/src/spark_rapids_pytools/wrappers/databricks_azure_wrapper.py @@ -41,6 +41,9 @@ def qualification(cpu_cluster: str = None, QualGpuClusterReshapeType.get_default()), jvm_heap_size: int = 24, verbose: bool = False, + cpu_discount: int = None, + gpu_discount: int = None, + global_discount: int = None, **rapids_options) -> None: """ The Qualification tool analyzes Spark events generated from CPU based Spark applications to @@ -85,9 +88,15 @@ def qualification(cpu_cluster: str = None, It accepts one of the following ("CLUSTER", "JOB" and the default value "MATCH"). "MATCH": keep GPU cluster same number of nodes as CPU cluster; "CLUSTER": recommend optimal GPU cluster by cost for entire cluster; - "JOB": recommend optimal GPU cluster by cost per job - :param verbose: True or False to enable verbosity to the wrapper script. + "JOB": recommend optimal GPU cluster by cost per job. :param jvm_heap_size: The maximum heap size of the JVM in gigabytes. + :param verbose: True or False to enable verbosity to the wrapper script. + :param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value + (e.g. 30 for 30% discount). + :param gpu_discount: A percent discount for the gpu cluster cost in the form of an integer value + (e.g. 30 for 30% discount). + :param global_discount: A percent discount for both the cpu and gpu cluster costs in the form of an + integer value (e.g. 30 for 30% discount). :param rapids_options: A list of valid Qualification tool options. Note that the wrapper ignores ["output-directory", "platform"] flags, and it does not support multiple "spark-property" arguments. @@ -117,7 +126,10 @@ def qualification(cpu_cluster: str = None, 'eventlogs': eventlogs, 'filterApps': filter_apps, 'toolsJar': tools_jar, - 'gpuClusterRecommendation': gpu_cluster_recommendation + 'gpuClusterRecommendation': gpu_cluster_recommendation, + 'cpuDiscount': cpu_discount, + 'gpuDiscount': gpu_discount, + 'globalDiscount': global_discount } QualificationAsLocal(platform_type=CspEnv.DATABRICKS_AZURE, cluster=None, diff --git a/user_tools/src/spark_rapids_pytools/wrappers/dataproc_wrapper.py b/user_tools/src/spark_rapids_pytools/wrappers/dataproc_wrapper.py index 05ae9eb60..65b03e0b4 100644 --- a/user_tools/src/spark_rapids_pytools/wrappers/dataproc_wrapper.py +++ b/user_tools/src/spark_rapids_pytools/wrappers/dataproc_wrapper.py @@ -41,6 +41,9 @@ def qualification(cpu_cluster: str = None, QualGpuClusterReshapeType.get_default()), jvm_heap_size: int = 24, verbose: bool = False, + cpu_discount: int = None, + gpu_discount: int = None, + global_discount: int = None, **rapids_options) -> None: """ The Qualification tool analyzes Spark events generated from CPU based Spark applications to @@ -87,6 +90,12 @@ def qualification(cpu_cluster: str = None, "JOB": recommend optimal GPU cluster by cost per job :param jvm_heap_size: The maximum heap size of the JVM in gigabytes :param verbose: True or False to enable verbosity to the wrapper script + :param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value + (e.g. 30 for 30% discount). + :param gpu_discount: A percent discount for the gpu cluster cost in the form of an integer value + (e.g. 30 for 30% discount). + :param global_discount: A percent discount for both the cpu and gpu cluster costs in the form of an + integer value (e.g. 30 for 30% discount). :param rapids_options: A list of valid Qualification tool options. Note that the wrapper ignores ["output-directory", "platform"] flags, and it does not support multiple "spark-property" arguments. @@ -114,7 +123,10 @@ def qualification(cpu_cluster: str = None, 'eventlogs': eventlogs, 'filterApps': filter_apps, 'toolsJar': tools_jar, - 'gpuClusterRecommendation': gpu_cluster_recommendation + 'gpuClusterRecommendation': gpu_cluster_recommendation, + 'cpuDiscount': cpu_discount, + 'gpuDiscount': gpu_discount, + 'globalDiscount': global_discount } tool_obj = QualificationAsLocal(platform_type=CspEnv.DATAPROC, diff --git a/user_tools/src/spark_rapids_pytools/wrappers/emr_wrapper.py b/user_tools/src/spark_rapids_pytools/wrappers/emr_wrapper.py index 65de11341..225075236 100644 --- a/user_tools/src/spark_rapids_pytools/wrappers/emr_wrapper.py +++ b/user_tools/src/spark_rapids_pytools/wrappers/emr_wrapper.py @@ -42,6 +42,9 @@ def qualification(cpu_cluster: str = None, QualGpuClusterReshapeType.get_default()), jvm_heap_size: int = 24, verbose: bool = False, + cpu_discount: int = None, + gpu_discount: int = None, + global_discount: int = None, **rapids_options) -> None: """ The Qualification tool analyzes Spark events generated from CPU based Spark applications to @@ -85,6 +88,12 @@ def qualification(cpu_cluster: str = None, "JOB": recommend optimal GPU cluster by cost per job :param jvm_heap_size: The maximum heap size of the JVM in gigabytes :param verbose: True or False to enable verbosity to the wrapper script + :param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value + (e.g. 30 for 30% discount). + :param gpu_discount: A percent discount for the gpu cluster cost in the form of an integer value + (e.g. 30 for 30% discount). + :param global_discount: A percent discount for both the cpu and gpu cluster costs in the form of an + integer value (e.g. 30 for 30% discount). :param rapids_options: A list of valid Qualification tool options. Note that the wrapper ignores ["output-directory", "platform"] flags, and it does not support multiple "spark-property" arguments. @@ -112,7 +121,10 @@ def qualification(cpu_cluster: str = None, 'eventlogs': eventlogs, 'filterApps': filter_apps, 'toolsJar': tools_jar, - 'gpuClusterRecommendation': gpu_cluster_recommendation + 'gpuClusterRecommendation': gpu_cluster_recommendation, + 'cpuDiscount': cpu_discount, + 'gpuDiscount': gpu_discount, + 'globalDiscount': global_discount } QualificationAsLocal(platform_type=CspEnv.EMR, cluster=None, diff --git a/user_tools/src/spark_rapids_pytools/wrappers/onprem_wrapper.py b/user_tools/src/spark_rapids_pytools/wrappers/onprem_wrapper.py index 1e4ea0c56..ac8bf8454 100644 --- a/user_tools/src/spark_rapids_pytools/wrappers/onprem_wrapper.py +++ b/user_tools/src/spark_rapids_pytools/wrappers/onprem_wrapper.py @@ -38,6 +38,9 @@ def qualification(cpu_cluster: str = None, QualGpuClusterReshapeType.get_default()), jvm_heap_size: int = 24, verbose: bool = False, + cpu_discount: int = None, + gpu_discount: int = None, + global_discount: int = None, **rapids_options) -> None: """ The Qualification tool analyzes Spark events generated from CPU based Spark applications to @@ -65,6 +68,12 @@ def qualification(cpu_cluster: str = None, "JOB": recommend optimal GPU cluster by cost per job :param jvm_heap_size: The maximum heap size of the JVM in gigabytes :param verbose: True or False to enable verbosity to the wrapper script + :param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value + (e.g. 30 for 30% discount). + :param gpu_discount: A percent discount for the gpu cluster cost in the form of an integer value + (e.g. 30 for 30% discount). + :param global_discount: A percent discount for both the cpu and gpu cluster costs in the form of an + integer value (e.g. 30 for 30% discount). :param rapids_options: A list of valid Qualification tool options. Note that the wrapper ignores ["output-directory", "platform"] flags, and it does not support multiple "spark-property" arguments. @@ -103,7 +112,10 @@ def qualification(cpu_cluster: str = None, 'filterApps': filter_apps, 'toolsJar': tools_jar, 'gpuClusterRecommendation': gpu_cluster_recommendation, - 'targetPlatform': target_platform + 'targetPlatform': target_platform, + 'cpuDiscount': cpu_discount, + 'gpuDiscount': gpu_discount, + 'globalDiscount': global_discount } tool_obj = QualificationAsLocal(platform_type=CspEnv.ONPREM, output_folder=local_folder, diff --git a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py index 4e921908a..fe7eb2880 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py +++ b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py @@ -307,12 +307,18 @@ class QualifyUserArgModel(ToolUserArgModel): target_platform: Optional[CspEnv] = None filter_apps: Optional[QualFilterApp] = None gpu_cluster_recommendation: Optional[QualGpuClusterReshapeType] = None + cpu_discount: Optional[int] = None + gpu_discount: Optional[int] = None + global_discount: Optional[int] = None def init_tool_args(self): self.p_args['toolArgs']['platform'] = self.platform self.p_args['toolArgs']['savingsCalculations'] = True self.p_args['toolArgs']['filterApps'] = self.filter_apps self.p_args['toolArgs']['targetPlatform'] = self.target_platform + self.p_args['toolArgs']['cpuDiscount'] = self.cpu_discount + self.p_args['toolArgs']['gpuDiscount'] = self.gpu_discount + self.p_args['toolArgs']['globalDiscount'] = self.global_discount # check the reshapeType argument if self.gpu_cluster_recommendation is None: self.p_args['toolArgs']['gpuClusterRecommendation'] = QualGpuClusterReshapeType.get_default() @@ -405,7 +411,10 @@ def build_tools_args(self) -> dict: 'toolsJar': None, 'gpuClusterRecommendation': self.p_args['toolArgs']['gpuClusterRecommendation'], # used to initialize the pricing information - 'targetPlatform': self.p_args['toolArgs']['targetPlatform'] + 'targetPlatform': self.p_args['toolArgs']['targetPlatform'], + 'cpuDiscount': self.p_args['toolArgs']['cpuDiscount'], + 'gpuDiscount': self.p_args['toolArgs']['gpuDiscount'], + 'globalDiscount': self.p_args['toolArgs']['globalDiscount'] } return wrapped_args diff --git a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py index bf0aac3ff..fd9bfdb7a 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py +++ b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py @@ -41,6 +41,9 @@ def qualification(self, target_platform: str = None, output_folder: str = None, filter_apps: str = None, + cpu_discount: int = None, + gpu_discount: int = None, + global_discount: int = None, gpu_cluster_recommendation: str = QualGpuClusterReshapeType.tostring( QualGpuClusterReshapeType.get_default()), verbose: bool = False): @@ -77,6 +80,12 @@ def qualification(self, 'Recommended', or 'Strongly Recommended' based on speedups. "SAVINGS" lists all the apps that have positive estimated GPU savings except for the apps that are "Not Applicable" + :param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value + (e.g. 30 for 30% discount). + :param gpu_discount: A percent discount for the gpu cluster cost in the form of an integer value + (e.g. 30 for 30% discount). + :param global_discount: A percent discount for both the cpu and gpu cluster costs in the form of an + integer value (e.g. 30 for 30% discount). :param gpu_cluster_recommendation: The type of GPU cluster recommendation to generate. Requires "Cluster". @@ -96,6 +105,9 @@ def qualification(self, target_platform=target_platform, output_folder=output_folder, filter_apps=filter_apps, + cpu_discount=cpu_discount, + gpu_discount=gpu_discount, + global_discount=global_discount, gpu_cluster_recommendation=gpu_cluster_recommendation) if qual_args: tool_obj = QualificationAsLocal(platform_type=qual_args['runtimePlatform'],