Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Add qualification user tool options to support external pricing #595

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ def migrate_cluster_to_gpu(self, orig_cluster):

def create_saving_estimator(self,
source_cluster: ClusterGetAccessor,
reshaped_cluster: ClusterGetAccessor):
reshaped_cluster: ClusterGetAccessor,
target_cost: float = None,
source_cost: float = None):
raw_pricing_config = self.configs.get_value_silent('pricing')
if raw_pricing_config:
pricing_config = JSONPropertiesContainer(prop_arg=raw_pricing_config, file_load=False)
Expand All @@ -79,7 +81,9 @@ def create_saving_estimator(self,
pricing_configs={'databricks': pricing_config})
saving_estimator = DBAWSSavingsEstimator(price_provider=databricks_price_provider,
reshaped_cluster=reshaped_cluster,
source_cluster=source_cluster)
source_cluster=source_cluster,
target_cost=target_cost,
source_cost=source_cost)
return saving_estimator

def create_local_submission_job(self, job_prop, ctxt) -> Any:
Expand Down Expand Up @@ -310,8 +314,3 @@ def _get_cost_per_cluster(self, cluster: ClusterGetAccessor):
cost = self.price_provider.get_instance_price(instance=instance_type)
dbu_cost += cost * nodes_cnt
return self.__calculate_ec2_cost(cluster) + dbu_cost

def _setup_costs(self):
# calculate target_cost
self.target_cost = self._get_cost_per_cluster(self.reshaped_cluster)
self.source_cost = self._get_cost_per_cluster(self.source_cluster)
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ def migrate_cluster_to_gpu(self, orig_cluster):

def create_saving_estimator(self,
source_cluster: ClusterGetAccessor,
reshaped_cluster: ClusterGetAccessor):
reshaped_cluster: ClusterGetAccessor,
target_cost: float = None,
source_cost: float = None):
raw_pricing_config = self.configs.get_value_silent('pricing')
if raw_pricing_config:
pricing_config = JSONPropertiesContainer(prop_arg=raw_pricing_config, file_load=False)
Expand All @@ -79,7 +81,9 @@ def create_saving_estimator(self,
pricing_configs={'databricks-azure': pricing_config})
saving_estimator = DBAzureSavingsEstimator(price_provider=db_azure_price_provider,
reshaped_cluster=reshaped_cluster,
source_cluster=source_cluster)
source_cluster=source_cluster,
target_cost=target_cost,
source_cost=source_cost)
return saving_estimator

def create_local_submission_job(self, job_prop, ctxt) -> Any:
Expand Down Expand Up @@ -381,8 +385,3 @@ def _get_cost_per_cluster(self, cluster: ClusterGetAccessor):
cost = self.price_provider.get_instance_price(instance=instance_type)
db_azure_cost += cost * nodes_cnt
return db_azure_cost

def _setup_costs(self):
# calculate target_cost
self.target_cost = self._get_cost_per_cluster(self.reshaped_cluster)
self.source_cost = self._get_cost_per_cluster(self.source_cluster)
13 changes: 6 additions & 7 deletions user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ def migrate_cluster_to_gpu(self, orig_cluster):

def create_saving_estimator(self,
source_cluster: ClusterGetAccessor,
reshaped_cluster: ClusterGetAccessor):
reshaped_cluster: ClusterGetAccessor,
target_cost: float = None,
source_cost: float = None):
raw_pricing_config = self.configs.get_value_silent('pricing')
if raw_pricing_config:
pricing_config = JSONPropertiesContainer(prop_arg=raw_pricing_config,
Expand All @@ -110,7 +112,9 @@ def create_saving_estimator(self,
pricing_configs={'gcloud': pricing_config})
saving_estimator = DataprocSavingsEstimator(price_provider=pricing_provider,
reshaped_cluster=reshaped_cluster,
source_cluster=source_cluster)
source_cluster=source_cluster,
target_cost=target_cost,
source_cost=source_cost)
return saving_estimator

def create_local_submission_job(self, job_prop, ctxt) -> Any:
Expand Down Expand Up @@ -532,8 +536,3 @@ def _get_cost_per_cluster(self, cluster: ClusterGetAccessor):
workers_cost = self.__calculate_group_cost(cluster, SparkNodeType.WORKER)
dataproc_cost = self.price_provider.get_container_cost()
return master_cost + workers_cost + dataproc_cost

def _setup_costs(self):
# calculate target_cost
self.target_cost = self._get_cost_per_cluster(self.reshaped_cluster)
self.source_cost = self._get_cost_per_cluster(self.source_cluster)
13 changes: 6 additions & 7 deletions user_tools/src/spark_rapids_pytools/cloud_api/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ def validate_job_submission_args(self, submission_args: dict) -> dict:

def create_saving_estimator(self,
source_cluster: ClusterGetAccessor,
reshaped_cluster: ClusterGetAccessor):
reshaped_cluster: ClusterGetAccessor,
target_cost: float = None,
source_cost: float = None):
raw_pricing_config = self.configs.get_value_silent('pricing')
if raw_pricing_config:
pricing_config = JSONPropertiesContainer(prop_arg=raw_pricing_config, file_load=False)
Expand All @@ -105,7 +107,9 @@ def create_saving_estimator(self,
pricing_configs={'emr': pricing_config})
saving_estimator = EmrSavingsEstimator(price_provider=emr_price_provider,
reshaped_cluster=reshaped_cluster,
source_cluster=source_cluster)
source_cluster=source_cluster,
target_cost=target_cost,
source_cost=source_cost)
return saving_estimator

def create_local_submission_job(self, job_prop, ctxt) -> Any:
Expand Down Expand Up @@ -493,8 +497,3 @@ def _get_cost_per_cluster(self, cluster: ClusterGetAccessor):
total_cost += self._calculate_ec2_cost(cluster, node_type)
total_cost += self._calculate_emr_cost(cluster, node_type)
return total_cost

def _setup_costs(self):
# calculate target_cost
self.target_cost = self._get_cost_per_cluster(self.reshaped_cluster)
self.source_cost = self._get_cost_per_cluster(self.source_cluster)
13 changes: 6 additions & 7 deletions user_tools/src/spark_rapids_pytools/cloud_api/onprem.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ def get_footer_message(self) -> str:

def create_saving_estimator(self,
source_cluster: ClusterGetAccessor,
reshaped_cluster: ClusterGetAccessor):
reshaped_cluster: ClusterGetAccessor,
target_cost: float = None,
source_cost: float = None):
if self.platform == 'dataproc':
region = 'us-central1'
raw_pricing_config = self.configs.get_value_silent('csp_pricing')
Expand All @@ -95,7 +97,9 @@ def create_saving_estimator(self,
pricing_configs={'gcloud': pricing_config})
saving_estimator = OnpremSavingsEstimator(price_provider=pricing_provider,
reshaped_cluster=reshaped_cluster,
source_cluster=source_cluster)
source_cluster=source_cluster,
target_cost=target_cost,
source_cost=source_cost)
return saving_estimator

def set_offline_cluster(self, cluster_args: dict = None):
Expand Down Expand Up @@ -311,8 +315,3 @@ def _get_cost_per_cluster(self, cluster: ClusterGetAccessor):
dataproc_cost = self.price_provider.get_container_cost()
total_cost = master_cost + workers_cost + dataproc_cost
return total_cost

def _setup_costs(self):
# calculate target_cost
self.target_cost = self._get_cost_per_cluster(self.reshaped_cluster)
self.source_cost = self._get_cost_per_cluster(self.source_cluster)
4 changes: 3 additions & 1 deletion user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,9 @@ def migrate_cluster_to_gpu(self, orig_cluster):

def create_saving_estimator(self,
source_cluster: ClusterGetAccessor,
reshaped_cluster: ClusterGetAccessor):
reshaped_cluster: ClusterGetAccessor,
target_cost: float = None,
source_cost: float = None):
raise NotImplementedError

def create_local_submission_job(self, job_prop, ctxt) -> Any:
Expand Down
12 changes: 9 additions & 3 deletions user_tools/src/spark_rapids_pytools/pricing/price_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,20 @@ class SavingsEstimator:
price_provider: PriceProvider
source_cluster: ClusterGetAccessor
reshaped_cluster: ClusterGetAccessor
target_cost: float = field(default=None, init=False)
source_cost: float = field(default=None, init=False)
target_cost: float = field(default=None)
source_cost: float = field(default=None)
comments: list = field(default_factory=lambda: [], init=False)
logger: Logger = field(default=None, init=False)

def _get_cost_per_cluster(self, cluster: ClusterGetAccessor):
raise NotImplementedError

def _setup_costs(self):
# calculate target_cost
pass
if self.target_cost is None:
self.target_cost = self._get_cost_per_cluster(self.reshaped_cluster)
if self.source_cost is None:
self.source_cost = self._get_cost_per_cluster(self.source_cluster)

def __post_init__(self):
# when debug is set to true set it in the environment.
Expand Down
15 changes: 13 additions & 2 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,12 @@ def __process_filter_args(self, arg_val: str):
selected_filter = QualFilterApp.fromstring(default_filter_txt)
self.ctxt.set_ctxt('filterApps', selected_filter)

def _process_external_pricing_args(self):
cpu_cluster_price = self.wrapper_options.get('cpuClusterPrice')
estimated_gpu_cluster_price = self.wrapper_options.get('estimatedGpuClusterPrice')
self.ctxt.set_ctxt('source_cost', cpu_cluster_price)
self.ctxt.set_ctxt('target_cost', estimated_gpu_cluster_price)

def _process_price_discount_args(self):
def check_discount_percentage(discount_type: str, discount_value: int):
if discount_value < 0 or discount_value > 100:
Expand Down Expand Up @@ -356,6 +362,7 @@ def _process_custom_args(self):

self._process_offline_cluster_args()
self._process_eventlogs_args()
self._process_external_pricing_args()
self._process_price_discount_args()
# This is noise to dump everything
# self.logger.debug('%s custom arguments = %s', self.pretty_name(), self.ctxt.props['wrapperCtx'])
Expand Down Expand Up @@ -595,7 +602,9 @@ def get_cost_per_row(df_row, reshape_col: str) -> pd.Series:
reshaped_cluster = ClusterReshape(self.ctxt.get_ctxt('gpuClusterProxy'),
reshape_workers_cnt=lambda x: workers_cnt)
estimator_obj = self.ctxt.platform.create_saving_estimator(self.ctxt.get_ctxt('cpuClusterProxy'),
reshaped_cluster)
reshaped_cluster,
self.ctxt.get_ctxt('target_cost'),
self.ctxt.get_ctxt('source_cost'))
saving_estimator_cache.setdefault(workers_cnt, estimator_obj)
cost_pd_series = get_costs_for_single_app(df_row, estimator_obj)
return cost_pd_series
Expand All @@ -605,7 +614,9 @@ def get_cost_per_row(df_row, reshape_col: str) -> pd.Series:
# initialize the savings estimator only once
reshaped_gpu_cluster = ClusterReshape(self.ctxt.get_ctxt('gpuClusterProxy'))
savings_estimator = self.ctxt.platform.create_saving_estimator(self.ctxt.get_ctxt('cpuClusterProxy'),
reshaped_gpu_cluster)
reshaped_gpu_cluster,
self.ctxt.get_ctxt('target_cost'),
self.ctxt.get_ctxt('source_cost'))
app_df_set[cost_cols] = app_df_set.apply(
lambda row: get_costs_for_single_app(row, estimator=savings_estimator), axis=1)
else:
Expand Down
6 changes: 6 additions & 0 deletions user_tools/src/spark_rapids_tools/cmdli/argprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ class QualifyUserArgModel(ToolUserArgModel):
target_platform: Optional[CspEnv] = None
filter_apps: Optional[QualFilterApp] = None
gpu_cluster_recommendation: Optional[QualGpuClusterReshapeType] = None
cpu_cluster_price: Optional[float] = None
estimated_gpu_cluster_price: Optional[float] = None
cpu_discount: Optional[int] = None
gpu_discount: Optional[int] = None
global_discount: Optional[int] = None
Expand All @@ -316,6 +318,8 @@ def init_tool_args(self):
self.p_args['toolArgs']['savingsCalculations'] = True
self.p_args['toolArgs']['filterApps'] = self.filter_apps
self.p_args['toolArgs']['targetPlatform'] = self.target_platform
self.p_args['toolArgs']['cpuClusterPrice'] = self.cpu_cluster_price
self.p_args['toolArgs']['estimatedGpuClusterPrice'] = self.estimated_gpu_cluster_price
self.p_args['toolArgs']['cpuDiscount'] = self.cpu_discount
self.p_args['toolArgs']['gpuDiscount'] = self.gpu_discount
self.p_args['toolArgs']['globalDiscount'] = self.global_discount
Expand Down Expand Up @@ -412,6 +416,8 @@ def build_tools_args(self) -> dict:
'gpuClusterRecommendation': self.p_args['toolArgs']['gpuClusterRecommendation'],
# used to initialize the pricing information
'targetPlatform': self.p_args['toolArgs']['targetPlatform'],
'cpuClusterPrice': self.p_args['toolArgs']['cpuClusterPrice'],
'estimatedGpuClusterPrice': self.p_args['toolArgs']['estimatedGpuClusterPrice'],
'cpuDiscount': self.p_args['toolArgs']['cpuDiscount'],
'gpuDiscount': self.p_args['toolArgs']['gpuDiscount'],
'globalDiscount': self.p_args['toolArgs']['globalDiscount']
Expand Down
6 changes: 6 additions & 0 deletions user_tools/src/spark_rapids_tools/cmdli/tools_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ def qualification(self,
target_platform: str = None,
output_folder: str = None,
filter_apps: str = None,
cpu_cluster_price: float = None,
estimated_gpu_cluster_price: float = None,
cpu_discount: int = None,
gpu_discount: int = None,
global_discount: int = None,
Expand Down Expand Up @@ -80,6 +82,8 @@ def qualification(self,
'Recommended', or 'Strongly Recommended' based on speedups. "SAVINGS"
lists all the apps that have positive estimated GPU savings except for the apps that
are "Not Applicable"
:param cpu_cluster_price: the CPU cluster hourly price provided by the user.
:param estimated_gpu_cluster_price: the GPU cluster hourly price provided by the user.
:param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value
(e.g. 30 for 30% discount).
:param gpu_discount: A percent discount for the gpu cluster cost in the form of an integer value
Expand All @@ -105,6 +109,8 @@ def qualification(self,
target_platform=target_platform,
output_folder=output_folder,
filter_apps=filter_apps,
cpu_cluster_price=cpu_cluster_price,
estimated_gpu_cluster_price=estimated_gpu_cluster_price,
cpu_discount=cpu_discount,
gpu_discount=gpu_discount,
global_discount=global_discount,
Expand Down
Loading