diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py index ceee37858..91268cf6b 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py @@ -69,7 +69,9 @@ def migrate_cluster_to_gpu(self, orig_cluster): def create_saving_estimator(self, source_cluster: ClusterGetAccessor, - reshaped_cluster: ClusterGetAccessor): + reshaped_cluster: ClusterGetAccessor, + target_cost: float = None, + source_cost: float = None): raw_pricing_config = self.configs.get_value_silent('pricing') if raw_pricing_config: pricing_config = JSONPropertiesContainer(prop_arg=raw_pricing_config, file_load=False) @@ -79,7 +81,9 @@ def create_saving_estimator(self, pricing_configs={'databricks': pricing_config}) saving_estimator = DBAWSSavingsEstimator(price_provider=databricks_price_provider, reshaped_cluster=reshaped_cluster, - source_cluster=source_cluster) + source_cluster=source_cluster, + target_cost=target_cost, + source_cost=source_cost) return saving_estimator def create_local_submission_job(self, job_prop, ctxt) -> Any: @@ -310,8 +314,3 @@ def _get_cost_per_cluster(self, cluster: ClusterGetAccessor): cost = self.price_provider.get_instance_price(instance=instance_type) dbu_cost += cost * nodes_cnt return self.__calculate_ec2_cost(cluster) + dbu_cost - - def _setup_costs(self): - # calculate target_cost - self.target_cost = self._get_cost_per_cluster(self.reshaped_cluster) - self.source_cost = self._get_cost_per_cluster(self.source_cluster) diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py index f0c863e47..6deb9c369 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py @@ -69,7 +69,9 @@ def migrate_cluster_to_gpu(self, orig_cluster): def create_saving_estimator(self, source_cluster: ClusterGetAccessor, - reshaped_cluster: ClusterGetAccessor): + reshaped_cluster: ClusterGetAccessor, + target_cost: float = None, + source_cost: float = None): raw_pricing_config = self.configs.get_value_silent('pricing') if raw_pricing_config: pricing_config = JSONPropertiesContainer(prop_arg=raw_pricing_config, file_load=False) @@ -79,7 +81,9 @@ def create_saving_estimator(self, pricing_configs={'databricks-azure': pricing_config}) saving_estimator = DBAzureSavingsEstimator(price_provider=db_azure_price_provider, reshaped_cluster=reshaped_cluster, - source_cluster=source_cluster) + source_cluster=source_cluster, + target_cost=target_cost, + source_cost=source_cost) return saving_estimator def create_local_submission_job(self, job_prop, ctxt) -> Any: @@ -381,8 +385,3 @@ def _get_cost_per_cluster(self, cluster: ClusterGetAccessor): cost = self.price_provider.get_instance_price(instance=instance_type) db_azure_cost += cost * nodes_cnt return db_azure_cost - - def _setup_costs(self): - # calculate target_cost - self.target_cost = self._get_cost_per_cluster(self.reshaped_cluster) - self.source_cost = self._get_cost_per_cluster(self.source_cluster) diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py index f2865dd6d..78cfa49be 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py @@ -99,7 +99,9 @@ def migrate_cluster_to_gpu(self, orig_cluster): def create_saving_estimator(self, source_cluster: ClusterGetAccessor, - reshaped_cluster: ClusterGetAccessor): + reshaped_cluster: ClusterGetAccessor, + target_cost: float = None, + source_cost: float = None): raw_pricing_config = self.configs.get_value_silent('pricing') if raw_pricing_config: pricing_config = JSONPropertiesContainer(prop_arg=raw_pricing_config, @@ -110,7 +112,9 @@ def create_saving_estimator(self, pricing_configs={'gcloud': pricing_config}) saving_estimator = DataprocSavingsEstimator(price_provider=pricing_provider, reshaped_cluster=reshaped_cluster, - source_cluster=source_cluster) + source_cluster=source_cluster, + target_cost=target_cost, + source_cost=source_cost) return saving_estimator def create_local_submission_job(self, job_prop, ctxt) -> Any: @@ -532,8 +536,3 @@ def _get_cost_per_cluster(self, cluster: ClusterGetAccessor): workers_cost = self.__calculate_group_cost(cluster, SparkNodeType.WORKER) dataproc_cost = self.price_provider.get_container_cost() return master_cost + workers_cost + dataproc_cost - - def _setup_costs(self): - # calculate target_cost - self.target_cost = self._get_cost_per_cluster(self.reshaped_cluster) - self.source_cost = self._get_cost_per_cluster(self.source_cluster) diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/emr.py b/user_tools/src/spark_rapids_pytools/cloud_api/emr.py index a938e2f3e..c0e04c8f5 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/emr.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/emr.py @@ -95,7 +95,9 @@ def validate_job_submission_args(self, submission_args: dict) -> dict: def create_saving_estimator(self, source_cluster: ClusterGetAccessor, - reshaped_cluster: ClusterGetAccessor): + reshaped_cluster: ClusterGetAccessor, + target_cost: float = None, + source_cost: float = None): raw_pricing_config = self.configs.get_value_silent('pricing') if raw_pricing_config: pricing_config = JSONPropertiesContainer(prop_arg=raw_pricing_config, file_load=False) @@ -105,7 +107,9 @@ def create_saving_estimator(self, pricing_configs={'emr': pricing_config}) saving_estimator = EmrSavingsEstimator(price_provider=emr_price_provider, reshaped_cluster=reshaped_cluster, - source_cluster=source_cluster) + source_cluster=source_cluster, + target_cost=target_cost, + source_cost=source_cost) return saving_estimator def create_local_submission_job(self, job_prop, ctxt) -> Any: @@ -493,8 +497,3 @@ def _get_cost_per_cluster(self, cluster: ClusterGetAccessor): total_cost += self._calculate_ec2_cost(cluster, node_type) total_cost += self._calculate_emr_cost(cluster, node_type) return total_cost - - def _setup_costs(self): - # calculate target_cost - self.target_cost = self._get_cost_per_cluster(self.reshaped_cluster) - self.source_cost = self._get_cost_per_cluster(self.source_cluster) diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py b/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py index ef96c6420..28584617b 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py @@ -82,7 +82,9 @@ def get_footer_message(self) -> str: def create_saving_estimator(self, source_cluster: ClusterGetAccessor, - reshaped_cluster: ClusterGetAccessor): + reshaped_cluster: ClusterGetAccessor, + target_cost: float = None, + source_cost: float = None): if self.platform == 'dataproc': region = 'us-central1' raw_pricing_config = self.configs.get_value_silent('csp_pricing') @@ -95,7 +97,9 @@ def create_saving_estimator(self, pricing_configs={'gcloud': pricing_config}) saving_estimator = OnpremSavingsEstimator(price_provider=pricing_provider, reshaped_cluster=reshaped_cluster, - source_cluster=source_cluster) + source_cluster=source_cluster, + target_cost=target_cost, + source_cost=source_cost) return saving_estimator def set_offline_cluster(self, cluster_args: dict = None): @@ -311,8 +315,3 @@ def _get_cost_per_cluster(self, cluster: ClusterGetAccessor): dataproc_cost = self.price_provider.get_container_cost() total_cost = master_cost + workers_cost + dataproc_cost return total_cost - - def _setup_costs(self): - # calculate target_cost - self.target_cost = self._get_cost_per_cluster(self.reshaped_cluster) - self.source_cost = self._get_cost_per_cluster(self.source_cluster) diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py b/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py index 2c99f8bf8..57979e2d5 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py @@ -806,7 +806,9 @@ def migrate_cluster_to_gpu(self, orig_cluster): def create_saving_estimator(self, source_cluster: ClusterGetAccessor, - reshaped_cluster: ClusterGetAccessor): + reshaped_cluster: ClusterGetAccessor, + target_cost: float = None, + source_cost: float = None): raise NotImplementedError def create_local_submission_job(self, job_prop, ctxt) -> Any: diff --git a/user_tools/src/spark_rapids_pytools/pricing/price_provider.py b/user_tools/src/spark_rapids_pytools/pricing/price_provider.py index 6c705da69..a4a35f7c6 100644 --- a/user_tools/src/spark_rapids_pytools/pricing/price_provider.py +++ b/user_tools/src/spark_rapids_pytools/pricing/price_provider.py @@ -127,14 +127,20 @@ class SavingsEstimator: price_provider: PriceProvider source_cluster: ClusterGetAccessor reshaped_cluster: ClusterGetAccessor - target_cost: float = field(default=None, init=False) - source_cost: float = field(default=None, init=False) + target_cost: float = field(default=None) + source_cost: float = field(default=None) comments: list = field(default_factory=lambda: [], init=False) logger: Logger = field(default=None, init=False) + def _get_cost_per_cluster(self, cluster: ClusterGetAccessor): + raise NotImplementedError + def _setup_costs(self): # calculate target_cost - pass + if self.target_cost is None: + self.target_cost = self._get_cost_per_cluster(self.reshaped_cluster) + if self.source_cost is None: + self.source_cost = self._get_cost_per_cluster(self.source_cluster) def __post_init__(self): # when debug is set to true set it in the environment. diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index fe6e10047..7b5ed96b1 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -291,6 +291,12 @@ def __process_filter_args(self, arg_val: str): selected_filter = QualFilterApp.fromstring(default_filter_txt) self.ctxt.set_ctxt('filterApps', selected_filter) + def _process_external_pricing_args(self): + cpu_cluster_price = self.wrapper_options.get('cpuClusterPrice') + estimated_gpu_cluster_price = self.wrapper_options.get('estimatedGpuClusterPrice') + self.ctxt.set_ctxt('source_cost', cpu_cluster_price) + self.ctxt.set_ctxt('target_cost', estimated_gpu_cluster_price) + def _process_price_discount_args(self): def check_discount_percentage(discount_type: str, discount_value: int): if discount_value < 0 or discount_value > 100: @@ -356,6 +362,7 @@ def _process_custom_args(self): self._process_offline_cluster_args() self._process_eventlogs_args() + self._process_external_pricing_args() self._process_price_discount_args() # This is noise to dump everything # self.logger.debug('%s custom arguments = %s', self.pretty_name(), self.ctxt.props['wrapperCtx']) @@ -595,7 +602,9 @@ def get_cost_per_row(df_row, reshape_col: str) -> pd.Series: reshaped_cluster = ClusterReshape(self.ctxt.get_ctxt('gpuClusterProxy'), reshape_workers_cnt=lambda x: workers_cnt) estimator_obj = self.ctxt.platform.create_saving_estimator(self.ctxt.get_ctxt('cpuClusterProxy'), - reshaped_cluster) + reshaped_cluster, + self.ctxt.get_ctxt('target_cost'), + self.ctxt.get_ctxt('source_cost')) saving_estimator_cache.setdefault(workers_cnt, estimator_obj) cost_pd_series = get_costs_for_single_app(df_row, estimator_obj) return cost_pd_series @@ -605,7 +614,9 @@ def get_cost_per_row(df_row, reshape_col: str) -> pd.Series: # initialize the savings estimator only once reshaped_gpu_cluster = ClusterReshape(self.ctxt.get_ctxt('gpuClusterProxy')) savings_estimator = self.ctxt.platform.create_saving_estimator(self.ctxt.get_ctxt('cpuClusterProxy'), - reshaped_gpu_cluster) + reshaped_gpu_cluster, + self.ctxt.get_ctxt('target_cost'), + self.ctxt.get_ctxt('source_cost')) app_df_set[cost_cols] = app_df_set.apply( lambda row: get_costs_for_single_app(row, estimator=savings_estimator), axis=1) else: diff --git a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py index fe7eb2880..67624ac7a 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py +++ b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py @@ -307,6 +307,8 @@ class QualifyUserArgModel(ToolUserArgModel): target_platform: Optional[CspEnv] = None filter_apps: Optional[QualFilterApp] = None gpu_cluster_recommendation: Optional[QualGpuClusterReshapeType] = None + cpu_cluster_price: Optional[float] = None + estimated_gpu_cluster_price: Optional[float] = None cpu_discount: Optional[int] = None gpu_discount: Optional[int] = None global_discount: Optional[int] = None @@ -316,6 +318,8 @@ def init_tool_args(self): self.p_args['toolArgs']['savingsCalculations'] = True self.p_args['toolArgs']['filterApps'] = self.filter_apps self.p_args['toolArgs']['targetPlatform'] = self.target_platform + self.p_args['toolArgs']['cpuClusterPrice'] = self.cpu_cluster_price + self.p_args['toolArgs']['estimatedGpuClusterPrice'] = self.estimated_gpu_cluster_price self.p_args['toolArgs']['cpuDiscount'] = self.cpu_discount self.p_args['toolArgs']['gpuDiscount'] = self.gpu_discount self.p_args['toolArgs']['globalDiscount'] = self.global_discount @@ -412,6 +416,8 @@ def build_tools_args(self) -> dict: 'gpuClusterRecommendation': self.p_args['toolArgs']['gpuClusterRecommendation'], # used to initialize the pricing information 'targetPlatform': self.p_args['toolArgs']['targetPlatform'], + 'cpuClusterPrice': self.p_args['toolArgs']['cpuClusterPrice'], + 'estimatedGpuClusterPrice': self.p_args['toolArgs']['estimatedGpuClusterPrice'], 'cpuDiscount': self.p_args['toolArgs']['cpuDiscount'], 'gpuDiscount': self.p_args['toolArgs']['gpuDiscount'], 'globalDiscount': self.p_args['toolArgs']['globalDiscount'] diff --git a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py index fd9bfdb7a..2f971dcbe 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py +++ b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py @@ -41,6 +41,8 @@ def qualification(self, target_platform: str = None, output_folder: str = None, filter_apps: str = None, + cpu_cluster_price: float = None, + estimated_gpu_cluster_price: float = None, cpu_discount: int = None, gpu_discount: int = None, global_discount: int = None, @@ -80,6 +82,8 @@ def qualification(self, 'Recommended', or 'Strongly Recommended' based on speedups. "SAVINGS" lists all the apps that have positive estimated GPU savings except for the apps that are "Not Applicable" + :param cpu_cluster_price: the CPU cluster hourly price provided by the user. + :param estimated_gpu_cluster_price: the GPU cluster hourly price provided by the user. :param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value (e.g. 30 for 30% discount). :param gpu_discount: A percent discount for the gpu cluster cost in the form of an integer value @@ -105,6 +109,8 @@ def qualification(self, target_platform=target_platform, output_folder=output_folder, filter_apps=filter_apps, + cpu_cluster_price=cpu_cluster_price, + estimated_gpu_cluster_price=estimated_gpu_cluster_price, cpu_discount=cpu_discount, gpu_discount=gpu_discount, global_discount=global_discount,