From c743611043d414c26ecf66e52feac4f59e4a6630 Mon Sep 17 00:00:00 2001 From: cindyyuanjiang Date: Mon, 25 Sep 2023 14:42:24 -0700 Subject: [PATCH 1/3] initial implementation for external pricing options Signed-off-by: cindyyuanjiang --- .../cloud_api/databricks_aws.py | 13 ++++++------- .../cloud_api/databricks_azure.py | 13 ++++++------- .../spark_rapids_pytools/cloud_api/dataproc.py | 13 ++++++------- .../src/spark_rapids_pytools/cloud_api/emr.py | 13 ++++++------- .../src/spark_rapids_pytools/cloud_api/onprem.py | 13 ++++++------- .../pricing/price_provider.py | 12 +++++++++--- .../spark_rapids_pytools/rapids/qualification.py | 15 +++++++++++++-- .../src/spark_rapids_tools/cmdli/argprocessor.py | 8 +++++++- .../src/spark_rapids_tools/cmdli/tools_cli.py | 6 ++++++ 9 files changed, 65 insertions(+), 41 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py index ceee37858..91268cf6b 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py @@ -69,7 +69,9 @@ def migrate_cluster_to_gpu(self, orig_cluster): def create_saving_estimator(self, source_cluster: ClusterGetAccessor, - reshaped_cluster: ClusterGetAccessor): + reshaped_cluster: ClusterGetAccessor, + target_cost: float = None, + source_cost: float = None): raw_pricing_config = self.configs.get_value_silent('pricing') if raw_pricing_config: pricing_config = JSONPropertiesContainer(prop_arg=raw_pricing_config, file_load=False) @@ -79,7 +81,9 @@ def create_saving_estimator(self, pricing_configs={'databricks': pricing_config}) saving_estimator = DBAWSSavingsEstimator(price_provider=databricks_price_provider, reshaped_cluster=reshaped_cluster, - source_cluster=source_cluster) + source_cluster=source_cluster, + target_cost=target_cost, + source_cost=source_cost) return saving_estimator def create_local_submission_job(self, job_prop, ctxt) -> Any: @@ -310,8 +314,3 @@ def _get_cost_per_cluster(self, cluster: ClusterGetAccessor): cost = self.price_provider.get_instance_price(instance=instance_type) dbu_cost += cost * nodes_cnt return self.__calculate_ec2_cost(cluster) + dbu_cost - - def _setup_costs(self): - # calculate target_cost - self.target_cost = self._get_cost_per_cluster(self.reshaped_cluster) - self.source_cost = self._get_cost_per_cluster(self.source_cluster) diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py index f0c863e47..6deb9c369 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py @@ -69,7 +69,9 @@ def migrate_cluster_to_gpu(self, orig_cluster): def create_saving_estimator(self, source_cluster: ClusterGetAccessor, - reshaped_cluster: ClusterGetAccessor): + reshaped_cluster: ClusterGetAccessor, + target_cost: float = None, + source_cost: float = None): raw_pricing_config = self.configs.get_value_silent('pricing') if raw_pricing_config: pricing_config = JSONPropertiesContainer(prop_arg=raw_pricing_config, file_load=False) @@ -79,7 +81,9 @@ def create_saving_estimator(self, pricing_configs={'databricks-azure': pricing_config}) saving_estimator = DBAzureSavingsEstimator(price_provider=db_azure_price_provider, reshaped_cluster=reshaped_cluster, - source_cluster=source_cluster) + source_cluster=source_cluster, + target_cost=target_cost, + source_cost=source_cost) return saving_estimator def create_local_submission_job(self, job_prop, ctxt) -> Any: @@ -381,8 +385,3 @@ def _get_cost_per_cluster(self, cluster: ClusterGetAccessor): cost = self.price_provider.get_instance_price(instance=instance_type) db_azure_cost += cost * nodes_cnt return db_azure_cost - - def _setup_costs(self): - # calculate target_cost - self.target_cost = self._get_cost_per_cluster(self.reshaped_cluster) - self.source_cost = self._get_cost_per_cluster(self.source_cluster) diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py index f2865dd6d..78cfa49be 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py @@ -99,7 +99,9 @@ def migrate_cluster_to_gpu(self, orig_cluster): def create_saving_estimator(self, source_cluster: ClusterGetAccessor, - reshaped_cluster: ClusterGetAccessor): + reshaped_cluster: ClusterGetAccessor, + target_cost: float = None, + source_cost: float = None): raw_pricing_config = self.configs.get_value_silent('pricing') if raw_pricing_config: pricing_config = JSONPropertiesContainer(prop_arg=raw_pricing_config, @@ -110,7 +112,9 @@ def create_saving_estimator(self, pricing_configs={'gcloud': pricing_config}) saving_estimator = DataprocSavingsEstimator(price_provider=pricing_provider, reshaped_cluster=reshaped_cluster, - source_cluster=source_cluster) + source_cluster=source_cluster, + target_cost=target_cost, + source_cost=source_cost) return saving_estimator def create_local_submission_job(self, job_prop, ctxt) -> Any: @@ -532,8 +536,3 @@ def _get_cost_per_cluster(self, cluster: ClusterGetAccessor): workers_cost = self.__calculate_group_cost(cluster, SparkNodeType.WORKER) dataproc_cost = self.price_provider.get_container_cost() return master_cost + workers_cost + dataproc_cost - - def _setup_costs(self): - # calculate target_cost - self.target_cost = self._get_cost_per_cluster(self.reshaped_cluster) - self.source_cost = self._get_cost_per_cluster(self.source_cluster) diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/emr.py b/user_tools/src/spark_rapids_pytools/cloud_api/emr.py index a938e2f3e..c0e04c8f5 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/emr.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/emr.py @@ -95,7 +95,9 @@ def validate_job_submission_args(self, submission_args: dict) -> dict: def create_saving_estimator(self, source_cluster: ClusterGetAccessor, - reshaped_cluster: ClusterGetAccessor): + reshaped_cluster: ClusterGetAccessor, + target_cost: float = None, + source_cost: float = None): raw_pricing_config = self.configs.get_value_silent('pricing') if raw_pricing_config: pricing_config = JSONPropertiesContainer(prop_arg=raw_pricing_config, file_load=False) @@ -105,7 +107,9 @@ def create_saving_estimator(self, pricing_configs={'emr': pricing_config}) saving_estimator = EmrSavingsEstimator(price_provider=emr_price_provider, reshaped_cluster=reshaped_cluster, - source_cluster=source_cluster) + source_cluster=source_cluster, + target_cost=target_cost, + source_cost=source_cost) return saving_estimator def create_local_submission_job(self, job_prop, ctxt) -> Any: @@ -493,8 +497,3 @@ def _get_cost_per_cluster(self, cluster: ClusterGetAccessor): total_cost += self._calculate_ec2_cost(cluster, node_type) total_cost += self._calculate_emr_cost(cluster, node_type) return total_cost - - def _setup_costs(self): - # calculate target_cost - self.target_cost = self._get_cost_per_cluster(self.reshaped_cluster) - self.source_cost = self._get_cost_per_cluster(self.source_cluster) diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py b/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py index ef96c6420..28584617b 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py @@ -82,7 +82,9 @@ def get_footer_message(self) -> str: def create_saving_estimator(self, source_cluster: ClusterGetAccessor, - reshaped_cluster: ClusterGetAccessor): + reshaped_cluster: ClusterGetAccessor, + target_cost: float = None, + source_cost: float = None): if self.platform == 'dataproc': region = 'us-central1' raw_pricing_config = self.configs.get_value_silent('csp_pricing') @@ -95,7 +97,9 @@ def create_saving_estimator(self, pricing_configs={'gcloud': pricing_config}) saving_estimator = OnpremSavingsEstimator(price_provider=pricing_provider, reshaped_cluster=reshaped_cluster, - source_cluster=source_cluster) + source_cluster=source_cluster, + target_cost=target_cost, + source_cost=source_cost) return saving_estimator def set_offline_cluster(self, cluster_args: dict = None): @@ -311,8 +315,3 @@ def _get_cost_per_cluster(self, cluster: ClusterGetAccessor): dataproc_cost = self.price_provider.get_container_cost() total_cost = master_cost + workers_cost + dataproc_cost return total_cost - - def _setup_costs(self): - # calculate target_cost - self.target_cost = self._get_cost_per_cluster(self.reshaped_cluster) - self.source_cost = self._get_cost_per_cluster(self.source_cluster) diff --git a/user_tools/src/spark_rapids_pytools/pricing/price_provider.py b/user_tools/src/spark_rapids_pytools/pricing/price_provider.py index 6c705da69..e34cfc2db 100644 --- a/user_tools/src/spark_rapids_pytools/pricing/price_provider.py +++ b/user_tools/src/spark_rapids_pytools/pricing/price_provider.py @@ -127,14 +127,20 @@ class SavingsEstimator: price_provider: PriceProvider source_cluster: ClusterGetAccessor reshaped_cluster: ClusterGetAccessor - target_cost: float = field(default=None, init=False) - source_cost: float = field(default=None, init=False) + target_cost: float = field(default=None) + source_cost: float = field(default=None) comments: list = field(default_factory=lambda: [], init=False) logger: Logger = field(default=None, init=False) def _setup_costs(self): # calculate target_cost - pass + if self.target_cost is None: + self.target_cost = self._get_cost_per_cluster(self.reshaped_cluster) + if self.source_cost is None: + self.source_cost = self._get_cost_per_cluster(self.source_cluster) + + self.logger.info("self.source_cost = %s", self.source_cost) + self.logger.info("self.target_cost = %s", self.target_cost) def __post_init__(self): # when debug is set to true set it in the environment. diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index a8861c6a1..51e9323ba 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -291,6 +291,12 @@ def __process_filter_args(self, arg_val: str): selected_filter = QualFilterApp.fromstring(default_filter_txt) self.ctxt.set_ctxt('filterApps', selected_filter) + def _process_external_pricing_args(self): + cpu_cluster_price = self.wrapper_options.get('cpuClusterPrice') + estimated_gpu_cluster_price = self.wrapper_options.get('estimatedGpuClusterPrice') + self.ctxt.set_ctxt('source_cost', cpu_cluster_price) + self.ctxt.set_ctxt('target_cost', estimated_gpu_cluster_price) + def _process_custom_args(self): """ Qualification tool processes extra arguments: @@ -322,6 +328,7 @@ def _process_custom_args(self): self._process_offline_cluster_args() self._process_eventlogs_args() + self._process_external_pricing_args() # This is noise to dump everything # self.logger.debug('%s custom arguments = %s', self.pretty_name(), self.ctxt.props['wrapperCtx']) @@ -557,7 +564,9 @@ def get_cost_per_row(df_row, reshape_col: str) -> pd.Series: reshaped_cluster = ClusterReshape(self.ctxt.get_ctxt('gpuClusterProxy'), reshape_workers_cnt=lambda x: workers_cnt) estimator_obj = self.ctxt.platform.create_saving_estimator(self.ctxt.get_ctxt('cpuClusterProxy'), - reshaped_cluster) + reshaped_cluster, + target_cost=self.ctxt.get_ctxt('target_cost'), + source_cost= self.ctxt.get_ctxt('source_cost')) saving_estimator_cache.setdefault(workers_cnt, estimator_obj) cost_pd_series = get_costs_for_single_app(df_row, estimator_obj) return cost_pd_series @@ -567,7 +576,9 @@ def get_cost_per_row(df_row, reshape_col: str) -> pd.Series: # initialize the savings estimator only once reshaped_gpu_cluster = ClusterReshape(self.ctxt.get_ctxt('gpuClusterProxy')) savings_estimator = self.ctxt.platform.create_saving_estimator(self.ctxt.get_ctxt('cpuClusterProxy'), - reshaped_gpu_cluster) + reshaped_gpu_cluster, + target_cost=self.ctxt.get_ctxt('target_cost'), + source_cost= self.ctxt.get_ctxt('source_cost')) app_df_set[cost_cols] = app_df_set.apply( lambda row: get_costs_for_single_app(row, estimator=savings_estimator), axis=1) else: diff --git a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py index 4e921908a..3221bd3be 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py +++ b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py @@ -307,12 +307,16 @@ class QualifyUserArgModel(ToolUserArgModel): target_platform: Optional[CspEnv] = None filter_apps: Optional[QualFilterApp] = None gpu_cluster_recommendation: Optional[QualGpuClusterReshapeType] = None + cpu_cluster_price: Optional[float] = None + estimated_gpu_cluster_price: Optional[float] = None def init_tool_args(self): self.p_args['toolArgs']['platform'] = self.platform self.p_args['toolArgs']['savingsCalculations'] = True self.p_args['toolArgs']['filterApps'] = self.filter_apps self.p_args['toolArgs']['targetPlatform'] = self.target_platform + self.p_args['toolArgs']['cpuClusterPrice'] = self.cpu_cluster_price + self.p_args['toolArgs']['estimatedGpuClusterPrice'] = self.estimated_gpu_cluster_price # check the reshapeType argument if self.gpu_cluster_recommendation is None: self.p_args['toolArgs']['gpuClusterRecommendation'] = QualGpuClusterReshapeType.get_default() @@ -405,7 +409,9 @@ def build_tools_args(self) -> dict: 'toolsJar': None, 'gpuClusterRecommendation': self.p_args['toolArgs']['gpuClusterRecommendation'], # used to initialize the pricing information - 'targetPlatform': self.p_args['toolArgs']['targetPlatform'] + 'targetPlatform': self.p_args['toolArgs']['targetPlatform'], + 'cpuClusterPrice': self.p_args['toolArgs']['cpuClusterPrice'], + 'estimatedGpuClusterPrice': self.p_args['toolArgs']['estimatedGpuClusterPrice'] } return wrapped_args diff --git a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py index bf0aac3ff..cdcafd9d2 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py +++ b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py @@ -41,6 +41,8 @@ def qualification(self, target_platform: str = None, output_folder: str = None, filter_apps: str = None, + cpu_cluster_price: float = None, + estimated_gpu_cluster_price: float = None, gpu_cluster_recommendation: str = QualGpuClusterReshapeType.tostring( QualGpuClusterReshapeType.get_default()), verbose: bool = False): @@ -77,6 +79,8 @@ def qualification(self, 'Recommended', or 'Strongly Recommended' based on speedups. "SAVINGS" lists all the apps that have positive estimated GPU savings except for the apps that are "Not Applicable" + :param cpu_cluster_price: the CPU cluster hourly price provided by the user. + :param estimated_gpu_cluster_price: the GPU cluster hourly price provided by the user. :param gpu_cluster_recommendation: The type of GPU cluster recommendation to generate. Requires "Cluster". @@ -96,6 +100,8 @@ def qualification(self, target_platform=target_platform, output_folder=output_folder, filter_apps=filter_apps, + cpu_cluster_price=cpu_cluster_price, + estimated_gpu_cluster_price=estimated_gpu_cluster_price, gpu_cluster_recommendation=gpu_cluster_recommendation) if qual_args: tool_obj = QualificationAsLocal(platform_type=qual_args['runtimePlatform'], From 7d44b69f4ffcffe4e8c75df3baef3be011fabdf1 Mon Sep 17 00:00:00 2001 From: cindyyuanjiang Date: Wed, 27 Sep 2023 11:31:50 -0700 Subject: [PATCH 2/3] merge conflict Signed-off-by: cindyyuanjiang --- .../src/spark_rapids_tools/cmdli/argprocessor.py | 11 +---------- user_tools/src/spark_rapids_tools/cmdli/tools_cli.py | 9 --------- 2 files changed, 1 insertion(+), 19 deletions(-) diff --git a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py index 4766fc637..67624ac7a 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py +++ b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py @@ -307,28 +307,22 @@ class QualifyUserArgModel(ToolUserArgModel): target_platform: Optional[CspEnv] = None filter_apps: Optional[QualFilterApp] = None gpu_cluster_recommendation: Optional[QualGpuClusterReshapeType] = None -<<<<<<< HEAD cpu_cluster_price: Optional[float] = None estimated_gpu_cluster_price: Optional[float] = None -======= cpu_discount: Optional[int] = None gpu_discount: Optional[int] = None global_discount: Optional[int] = None ->>>>>>> dev def init_tool_args(self): self.p_args['toolArgs']['platform'] = self.platform self.p_args['toolArgs']['savingsCalculations'] = True self.p_args['toolArgs']['filterApps'] = self.filter_apps self.p_args['toolArgs']['targetPlatform'] = self.target_platform -<<<<<<< HEAD self.p_args['toolArgs']['cpuClusterPrice'] = self.cpu_cluster_price self.p_args['toolArgs']['estimatedGpuClusterPrice'] = self.estimated_gpu_cluster_price -======= self.p_args['toolArgs']['cpuDiscount'] = self.cpu_discount self.p_args['toolArgs']['gpuDiscount'] = self.gpu_discount self.p_args['toolArgs']['globalDiscount'] = self.global_discount ->>>>>>> dev # check the reshapeType argument if self.gpu_cluster_recommendation is None: self.p_args['toolArgs']['gpuClusterRecommendation'] = QualGpuClusterReshapeType.get_default() @@ -422,14 +416,11 @@ def build_tools_args(self) -> dict: 'gpuClusterRecommendation': self.p_args['toolArgs']['gpuClusterRecommendation'], # used to initialize the pricing information 'targetPlatform': self.p_args['toolArgs']['targetPlatform'], -<<<<<<< HEAD 'cpuClusterPrice': self.p_args['toolArgs']['cpuClusterPrice'], - 'estimatedGpuClusterPrice': self.p_args['toolArgs']['estimatedGpuClusterPrice'] -======= + 'estimatedGpuClusterPrice': self.p_args['toolArgs']['estimatedGpuClusterPrice'], 'cpuDiscount': self.p_args['toolArgs']['cpuDiscount'], 'gpuDiscount': self.p_args['toolArgs']['gpuDiscount'], 'globalDiscount': self.p_args['toolArgs']['globalDiscount'] ->>>>>>> dev } return wrapped_args diff --git a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py index bc330f340..2f971dcbe 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py +++ b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py @@ -41,14 +41,11 @@ def qualification(self, target_platform: str = None, output_folder: str = None, filter_apps: str = None, -<<<<<<< HEAD cpu_cluster_price: float = None, estimated_gpu_cluster_price: float = None, -======= cpu_discount: int = None, gpu_discount: int = None, global_discount: int = None, ->>>>>>> dev gpu_cluster_recommendation: str = QualGpuClusterReshapeType.tostring( QualGpuClusterReshapeType.get_default()), verbose: bool = False): @@ -85,17 +82,14 @@ def qualification(self, 'Recommended', or 'Strongly Recommended' based on speedups. "SAVINGS" lists all the apps that have positive estimated GPU savings except for the apps that are "Not Applicable" -<<<<<<< HEAD :param cpu_cluster_price: the CPU cluster hourly price provided by the user. :param estimated_gpu_cluster_price: the GPU cluster hourly price provided by the user. -======= :param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value (e.g. 30 for 30% discount). :param gpu_discount: A percent discount for the gpu cluster cost in the form of an integer value (e.g. 30 for 30% discount). :param global_discount: A percent discount for both the cpu and gpu cluster costs in the form of an integer value (e.g. 30 for 30% discount). ->>>>>>> dev :param gpu_cluster_recommendation: The type of GPU cluster recommendation to generate. Requires "Cluster". @@ -115,14 +109,11 @@ def qualification(self, target_platform=target_platform, output_folder=output_folder, filter_apps=filter_apps, -<<<<<<< HEAD cpu_cluster_price=cpu_cluster_price, estimated_gpu_cluster_price=estimated_gpu_cluster_price, -======= cpu_discount=cpu_discount, gpu_discount=gpu_discount, global_discount=global_discount, ->>>>>>> dev gpu_cluster_recommendation=gpu_cluster_recommendation) if qual_args: tool_obj = QualificationAsLocal(platform_type=qual_args['runtimePlatform'], From 16303f0c8a477688d04791925a497e2f8eacd73c Mon Sep 17 00:00:00 2001 From: cindyyuanjiang Date: Wed, 27 Sep 2023 13:50:35 -0700 Subject: [PATCH 3/3] fixed python style Signed-off-by: cindyyuanjiang --- .../src/spark_rapids_pytools/cloud_api/sp_types.py | 4 +++- .../src/spark_rapids_pytools/pricing/price_provider.py | 6 +++--- .../src/spark_rapids_pytools/rapids/qualification.py | 9 +++++---- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py b/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py index 2c99f8bf8..57979e2d5 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py @@ -806,7 +806,9 @@ def migrate_cluster_to_gpu(self, orig_cluster): def create_saving_estimator(self, source_cluster: ClusterGetAccessor, - reshaped_cluster: ClusterGetAccessor): + reshaped_cluster: ClusterGetAccessor, + target_cost: float = None, + source_cost: float = None): raise NotImplementedError def create_local_submission_job(self, job_prop, ctxt) -> Any: diff --git a/user_tools/src/spark_rapids_pytools/pricing/price_provider.py b/user_tools/src/spark_rapids_pytools/pricing/price_provider.py index e34cfc2db..a4a35f7c6 100644 --- a/user_tools/src/spark_rapids_pytools/pricing/price_provider.py +++ b/user_tools/src/spark_rapids_pytools/pricing/price_provider.py @@ -132,6 +132,9 @@ class SavingsEstimator: comments: list = field(default_factory=lambda: [], init=False) logger: Logger = field(default=None, init=False) + def _get_cost_per_cluster(self, cluster: ClusterGetAccessor): + raise NotImplementedError + def _setup_costs(self): # calculate target_cost if self.target_cost is None: @@ -139,9 +142,6 @@ def _setup_costs(self): if self.source_cost is None: self.source_cost = self._get_cost_per_cluster(self.source_cluster) - self.logger.info("self.source_cost = %s", self.source_cost) - self.logger.info("self.target_cost = %s", self.target_cost) - def __post_init__(self): # when debug is set to true set it in the environment. self.logger = ToolLogging.get_and_setup_logger('rapids.tools.savings') diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index 190f76d7a..7b5ed96b1 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -296,6 +296,7 @@ def _process_external_pricing_args(self): estimated_gpu_cluster_price = self.wrapper_options.get('estimatedGpuClusterPrice') self.ctxt.set_ctxt('source_cost', cpu_cluster_price) self.ctxt.set_ctxt('target_cost', estimated_gpu_cluster_price) + def _process_price_discount_args(self): def check_discount_percentage(discount_type: str, discount_value: int): if discount_value < 0 or discount_value > 100: @@ -602,8 +603,8 @@ def get_cost_per_row(df_row, reshape_col: str) -> pd.Series: reshape_workers_cnt=lambda x: workers_cnt) estimator_obj = self.ctxt.platform.create_saving_estimator(self.ctxt.get_ctxt('cpuClusterProxy'), reshaped_cluster, - target_cost=self.ctxt.get_ctxt('target_cost'), - source_cost= self.ctxt.get_ctxt('source_cost')) + self.ctxt.get_ctxt('target_cost'), + self.ctxt.get_ctxt('source_cost')) saving_estimator_cache.setdefault(workers_cnt, estimator_obj) cost_pd_series = get_costs_for_single_app(df_row, estimator_obj) return cost_pd_series @@ -614,8 +615,8 @@ def get_cost_per_row(df_row, reshape_col: str) -> pd.Series: reshaped_gpu_cluster = ClusterReshape(self.ctxt.get_ctxt('gpuClusterProxy')) savings_estimator = self.ctxt.platform.create_saving_estimator(self.ctxt.get_ctxt('cpuClusterProxy'), reshaped_gpu_cluster, - target_cost=self.ctxt.get_ctxt('target_cost'), - source_cost= self.ctxt.get_ctxt('source_cost')) + self.ctxt.get_ctxt('target_cost'), + self.ctxt.get_ctxt('source_cost')) app_df_set[cost_cols] = app_df_set.apply( lambda row: get_costs_for_single_app(row, estimator=savings_estimator), axis=1) else: