Skip to content

Commit

Permalink
[FEA] Add qualification user tool options to support external pricing (
Browse files Browse the repository at this point in the history
…#595)

* initial implementation for external pricing options

Signed-off-by: cindyyuanjiang <[email protected]>

* merge conflict

Signed-off-by: cindyyuanjiang <[email protected]>

* fixed python style

Signed-off-by: cindyyuanjiang <[email protected]>

---------

Signed-off-by: cindyyuanjiang <[email protected]>
  • Loading branch information
cindyyuanjiang authored Sep 29, 2023
1 parent 162b728 commit da8f88f
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 41 deletions.
13 changes: 6 additions & 7 deletions user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ def migrate_cluster_to_gpu(self, orig_cluster):

def create_saving_estimator(self,
source_cluster: ClusterGetAccessor,
reshaped_cluster: ClusterGetAccessor):
reshaped_cluster: ClusterGetAccessor,
target_cost: float = None,
source_cost: float = None):
raw_pricing_config = self.configs.get_value_silent('pricing')
if raw_pricing_config:
pricing_config = JSONPropertiesContainer(prop_arg=raw_pricing_config, file_load=False)
Expand All @@ -79,7 +81,9 @@ def create_saving_estimator(self,
pricing_configs={'databricks': pricing_config})
saving_estimator = DBAWSSavingsEstimator(price_provider=databricks_price_provider,
reshaped_cluster=reshaped_cluster,
source_cluster=source_cluster)
source_cluster=source_cluster,
target_cost=target_cost,
source_cost=source_cost)
return saving_estimator

def create_local_submission_job(self, job_prop, ctxt) -> Any:
Expand Down Expand Up @@ -310,8 +314,3 @@ def _get_cost_per_cluster(self, cluster: ClusterGetAccessor):
cost = self.price_provider.get_instance_price(instance=instance_type)
dbu_cost += cost * nodes_cnt
return self.__calculate_ec2_cost(cluster) + dbu_cost

def _setup_costs(self):
# calculate target_cost
self.target_cost = self._get_cost_per_cluster(self.reshaped_cluster)
self.source_cost = self._get_cost_per_cluster(self.source_cluster)
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ def migrate_cluster_to_gpu(self, orig_cluster):

def create_saving_estimator(self,
source_cluster: ClusterGetAccessor,
reshaped_cluster: ClusterGetAccessor):
reshaped_cluster: ClusterGetAccessor,
target_cost: float = None,
source_cost: float = None):
raw_pricing_config = self.configs.get_value_silent('pricing')
if raw_pricing_config:
pricing_config = JSONPropertiesContainer(prop_arg=raw_pricing_config, file_load=False)
Expand All @@ -79,7 +81,9 @@ def create_saving_estimator(self,
pricing_configs={'databricks-azure': pricing_config})
saving_estimator = DBAzureSavingsEstimator(price_provider=db_azure_price_provider,
reshaped_cluster=reshaped_cluster,
source_cluster=source_cluster)
source_cluster=source_cluster,
target_cost=target_cost,
source_cost=source_cost)
return saving_estimator

def create_local_submission_job(self, job_prop, ctxt) -> Any:
Expand Down Expand Up @@ -381,8 +385,3 @@ def _get_cost_per_cluster(self, cluster: ClusterGetAccessor):
cost = self.price_provider.get_instance_price(instance=instance_type)
db_azure_cost += cost * nodes_cnt
return db_azure_cost

def _setup_costs(self):
# calculate target_cost
self.target_cost = self._get_cost_per_cluster(self.reshaped_cluster)
self.source_cost = self._get_cost_per_cluster(self.source_cluster)
13 changes: 6 additions & 7 deletions user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ def migrate_cluster_to_gpu(self, orig_cluster):

def create_saving_estimator(self,
source_cluster: ClusterGetAccessor,
reshaped_cluster: ClusterGetAccessor):
reshaped_cluster: ClusterGetAccessor,
target_cost: float = None,
source_cost: float = None):
raw_pricing_config = self.configs.get_value_silent('pricing')
if raw_pricing_config:
pricing_config = JSONPropertiesContainer(prop_arg=raw_pricing_config,
Expand All @@ -110,7 +112,9 @@ def create_saving_estimator(self,
pricing_configs={'gcloud': pricing_config})
saving_estimator = DataprocSavingsEstimator(price_provider=pricing_provider,
reshaped_cluster=reshaped_cluster,
source_cluster=source_cluster)
source_cluster=source_cluster,
target_cost=target_cost,
source_cost=source_cost)
return saving_estimator

def create_local_submission_job(self, job_prop, ctxt) -> Any:
Expand Down Expand Up @@ -532,8 +536,3 @@ def _get_cost_per_cluster(self, cluster: ClusterGetAccessor):
workers_cost = self.__calculate_group_cost(cluster, SparkNodeType.WORKER)
dataproc_cost = self.price_provider.get_container_cost()
return master_cost + workers_cost + dataproc_cost

def _setup_costs(self):
# calculate target_cost
self.target_cost = self._get_cost_per_cluster(self.reshaped_cluster)
self.source_cost = self._get_cost_per_cluster(self.source_cluster)
13 changes: 6 additions & 7 deletions user_tools/src/spark_rapids_pytools/cloud_api/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ def validate_job_submission_args(self, submission_args: dict) -> dict:

def create_saving_estimator(self,
source_cluster: ClusterGetAccessor,
reshaped_cluster: ClusterGetAccessor):
reshaped_cluster: ClusterGetAccessor,
target_cost: float = None,
source_cost: float = None):
raw_pricing_config = self.configs.get_value_silent('pricing')
if raw_pricing_config:
pricing_config = JSONPropertiesContainer(prop_arg=raw_pricing_config, file_load=False)
Expand All @@ -105,7 +107,9 @@ def create_saving_estimator(self,
pricing_configs={'emr': pricing_config})
saving_estimator = EmrSavingsEstimator(price_provider=emr_price_provider,
reshaped_cluster=reshaped_cluster,
source_cluster=source_cluster)
source_cluster=source_cluster,
target_cost=target_cost,
source_cost=source_cost)
return saving_estimator

def create_local_submission_job(self, job_prop, ctxt) -> Any:
Expand Down Expand Up @@ -493,8 +497,3 @@ def _get_cost_per_cluster(self, cluster: ClusterGetAccessor):
total_cost += self._calculate_ec2_cost(cluster, node_type)
total_cost += self._calculate_emr_cost(cluster, node_type)
return total_cost

def _setup_costs(self):
# calculate target_cost
self.target_cost = self._get_cost_per_cluster(self.reshaped_cluster)
self.source_cost = self._get_cost_per_cluster(self.source_cluster)
13 changes: 6 additions & 7 deletions user_tools/src/spark_rapids_pytools/cloud_api/onprem.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ def get_footer_message(self) -> str:

def create_saving_estimator(self,
source_cluster: ClusterGetAccessor,
reshaped_cluster: ClusterGetAccessor):
reshaped_cluster: ClusterGetAccessor,
target_cost: float = None,
source_cost: float = None):
if self.platform == 'dataproc':
region = 'us-central1'
raw_pricing_config = self.configs.get_value_silent('csp_pricing')
Expand All @@ -95,7 +97,9 @@ def create_saving_estimator(self,
pricing_configs={'gcloud': pricing_config})
saving_estimator = OnpremSavingsEstimator(price_provider=pricing_provider,
reshaped_cluster=reshaped_cluster,
source_cluster=source_cluster)
source_cluster=source_cluster,
target_cost=target_cost,
source_cost=source_cost)
return saving_estimator

def set_offline_cluster(self, cluster_args: dict = None):
Expand Down Expand Up @@ -311,8 +315,3 @@ def _get_cost_per_cluster(self, cluster: ClusterGetAccessor):
dataproc_cost = self.price_provider.get_container_cost()
total_cost = master_cost + workers_cost + dataproc_cost
return total_cost

def _setup_costs(self):
# calculate target_cost
self.target_cost = self._get_cost_per_cluster(self.reshaped_cluster)
self.source_cost = self._get_cost_per_cluster(self.source_cluster)
4 changes: 3 additions & 1 deletion user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,9 @@ def migrate_cluster_to_gpu(self, orig_cluster):

def create_saving_estimator(self,
source_cluster: ClusterGetAccessor,
reshaped_cluster: ClusterGetAccessor):
reshaped_cluster: ClusterGetAccessor,
target_cost: float = None,
source_cost: float = None):
raise NotImplementedError

def create_local_submission_job(self, job_prop, ctxt) -> Any:
Expand Down
12 changes: 9 additions & 3 deletions user_tools/src/spark_rapids_pytools/pricing/price_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,20 @@ class SavingsEstimator:
price_provider: PriceProvider
source_cluster: ClusterGetAccessor
reshaped_cluster: ClusterGetAccessor
target_cost: float = field(default=None, init=False)
source_cost: float = field(default=None, init=False)
target_cost: float = field(default=None)
source_cost: float = field(default=None)
comments: list = field(default_factory=lambda: [], init=False)
logger: Logger = field(default=None, init=False)

def _get_cost_per_cluster(self, cluster: ClusterGetAccessor):
raise NotImplementedError

def _setup_costs(self):
# calculate target_cost
pass
if self.target_cost is None:
self.target_cost = self._get_cost_per_cluster(self.reshaped_cluster)
if self.source_cost is None:
self.source_cost = self._get_cost_per_cluster(self.source_cluster)

def __post_init__(self):
# when debug is set to true set it in the environment.
Expand Down
15 changes: 13 additions & 2 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,12 @@ def __process_filter_args(self, arg_val: str):
selected_filter = QualFilterApp.fromstring(default_filter_txt)
self.ctxt.set_ctxt('filterApps', selected_filter)

def _process_external_pricing_args(self):
cpu_cluster_price = self.wrapper_options.get('cpuClusterPrice')
estimated_gpu_cluster_price = self.wrapper_options.get('estimatedGpuClusterPrice')
self.ctxt.set_ctxt('source_cost', cpu_cluster_price)
self.ctxt.set_ctxt('target_cost', estimated_gpu_cluster_price)

def _process_price_discount_args(self):
def check_discount_percentage(discount_type: str, discount_value: int):
if discount_value < 0 or discount_value > 100:
Expand Down Expand Up @@ -356,6 +362,7 @@ def _process_custom_args(self):

self._process_offline_cluster_args()
self._process_eventlogs_args()
self._process_external_pricing_args()
self._process_price_discount_args()
# This is noise to dump everything
# self.logger.debug('%s custom arguments = %s', self.pretty_name(), self.ctxt.props['wrapperCtx'])
Expand Down Expand Up @@ -595,7 +602,9 @@ def get_cost_per_row(df_row, reshape_col: str) -> pd.Series:
reshaped_cluster = ClusterReshape(self.ctxt.get_ctxt('gpuClusterProxy'),
reshape_workers_cnt=lambda x: workers_cnt)
estimator_obj = self.ctxt.platform.create_saving_estimator(self.ctxt.get_ctxt('cpuClusterProxy'),
reshaped_cluster)
reshaped_cluster,
self.ctxt.get_ctxt('target_cost'),
self.ctxt.get_ctxt('source_cost'))
saving_estimator_cache.setdefault(workers_cnt, estimator_obj)
cost_pd_series = get_costs_for_single_app(df_row, estimator_obj)
return cost_pd_series
Expand All @@ -605,7 +614,9 @@ def get_cost_per_row(df_row, reshape_col: str) -> pd.Series:
# initialize the savings estimator only once
reshaped_gpu_cluster = ClusterReshape(self.ctxt.get_ctxt('gpuClusterProxy'))
savings_estimator = self.ctxt.platform.create_saving_estimator(self.ctxt.get_ctxt('cpuClusterProxy'),
reshaped_gpu_cluster)
reshaped_gpu_cluster,
self.ctxt.get_ctxt('target_cost'),
self.ctxt.get_ctxt('source_cost'))
app_df_set[cost_cols] = app_df_set.apply(
lambda row: get_costs_for_single_app(row, estimator=savings_estimator), axis=1)
else:
Expand Down
6 changes: 6 additions & 0 deletions user_tools/src/spark_rapids_tools/cmdli/argprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ class QualifyUserArgModel(ToolUserArgModel):
target_platform: Optional[CspEnv] = None
filter_apps: Optional[QualFilterApp] = None
gpu_cluster_recommendation: Optional[QualGpuClusterReshapeType] = None
cpu_cluster_price: Optional[float] = None
estimated_gpu_cluster_price: Optional[float] = None
cpu_discount: Optional[int] = None
gpu_discount: Optional[int] = None
global_discount: Optional[int] = None
Expand All @@ -316,6 +318,8 @@ def init_tool_args(self):
self.p_args['toolArgs']['savingsCalculations'] = True
self.p_args['toolArgs']['filterApps'] = self.filter_apps
self.p_args['toolArgs']['targetPlatform'] = self.target_platform
self.p_args['toolArgs']['cpuClusterPrice'] = self.cpu_cluster_price
self.p_args['toolArgs']['estimatedGpuClusterPrice'] = self.estimated_gpu_cluster_price
self.p_args['toolArgs']['cpuDiscount'] = self.cpu_discount
self.p_args['toolArgs']['gpuDiscount'] = self.gpu_discount
self.p_args['toolArgs']['globalDiscount'] = self.global_discount
Expand Down Expand Up @@ -412,6 +416,8 @@ def build_tools_args(self) -> dict:
'gpuClusterRecommendation': self.p_args['toolArgs']['gpuClusterRecommendation'],
# used to initialize the pricing information
'targetPlatform': self.p_args['toolArgs']['targetPlatform'],
'cpuClusterPrice': self.p_args['toolArgs']['cpuClusterPrice'],
'estimatedGpuClusterPrice': self.p_args['toolArgs']['estimatedGpuClusterPrice'],
'cpuDiscount': self.p_args['toolArgs']['cpuDiscount'],
'gpuDiscount': self.p_args['toolArgs']['gpuDiscount'],
'globalDiscount': self.p_args['toolArgs']['globalDiscount']
Expand Down
6 changes: 6 additions & 0 deletions user_tools/src/spark_rapids_tools/cmdli/tools_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ def qualification(self,
target_platform: str = None,
output_folder: str = None,
filter_apps: str = None,
cpu_cluster_price: float = None,
estimated_gpu_cluster_price: float = None,
cpu_discount: int = None,
gpu_discount: int = None,
global_discount: int = None,
Expand Down Expand Up @@ -80,6 +82,8 @@ def qualification(self,
'Recommended', or 'Strongly Recommended' based on speedups. "SAVINGS"
lists all the apps that have positive estimated GPU savings except for the apps that
are "Not Applicable"
:param cpu_cluster_price: the CPU cluster hourly price provided by the user.
:param estimated_gpu_cluster_price: the GPU cluster hourly price provided by the user.
:param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param gpu_discount: A percent discount for the gpu cluster cost in the form of an integer value
Expand All @@ -105,6 +109,8 @@ def qualification(self,
target_platform=target_platform,
output_folder=output_folder,
filter_apps=filter_apps,
cpu_cluster_price=cpu_cluster_price,
estimated_gpu_cluster_price=estimated_gpu_cluster_price,
cpu_discount=cpu_discount,
gpu_discount=gpu_discount,
global_discount=global_discount,
Expand Down

0 comments on commit da8f88f

Please sign in to comment.