Skip to content

Commit

Permalink
Update error handling in python for parsing cluster information (#1394)
Browse files Browse the repository at this point in the history
* Handle invalid values for num worker nodes and num execs per node

Signed-off-by: Partho Sarthi <[email protected]>

* Fix pylint

Signed-off-by: Partho Sarthi <[email protected]>

---------

Signed-off-by: Partho Sarthi <[email protected]>
  • Loading branch information
parthosa authored Oct 28, 2024
1 parent 1504968 commit 730a05d
Showing 1 changed file with 15 additions and 3 deletions.
18 changes: 15 additions & 3 deletions user_tools/src/spark_rapids_pytools/common/cluster_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ def _get_cluster_template_args(self, cluster_info_df: pd.Series) -> Optional[dic
num_driver_nodes = 1
app_id = cluster_info_df.get('App ID')
num_worker_nodes = cluster_info_df.get('Num Worker Nodes')
# If number of worker nodes is invalid, log error and return
if pd.isna(num_worker_nodes) or num_worker_nodes <= 0:
self._log_inference_failure(app_id, 'Number of worker nodes cannot be determined. '
'See logs for details.')
return None

cores_per_executor = cluster_info_df.get('Cores Per Executor')
execs_per_node = cluster_info_df.get('Num Executors Per Node')
total_cores_per_node = execs_per_node * cores_per_executor
Expand All @@ -85,6 +91,11 @@ def _get_cluster_template_args(self, cluster_info_df: pd.Series) -> Optional[dic
'NUM_WORKER_NODES': int(num_worker_nodes),
}
if self.platform.get_platform_name() == CspEnv.ONPREM:
# For on-prem, if total cores per node is invalid, log error and return
if pd.isna(total_cores_per_node) or total_cores_per_node <= 0:
self._log_inference_failure(app_id, 'Total cores per node cannot be determined. '
'See logs for details.')
return None
# For on-prem, we need to include number of cores per worker node
cluster_prop['NUM_WORKER_CORES'] = int(total_cores_per_node)
else:
Expand All @@ -95,9 +106,10 @@ def _get_cluster_template_args(self, cluster_info_df: pd.Series) -> Optional[dic
driver_node_type = self.platform.configs.get_value('clusterInference', 'defaultCpuInstances', 'driver')
worker_node_type = cluster_info_df.get('Worker Node Type')
if pd.isna(worker_node_type):
# If worker instance is not set, use the default value based on the number of cores
if pd.isna(total_cores_per_node):
self._log_inference_failure(app_id, 'Total cores per node cannot be determined.')
# For CSPs, if worker instance is not set and total cores per node is invalid, log error and return
if pd.isna(total_cores_per_node) or total_cores_per_node <= 0:
self._log_inference_failure(app_id, 'Total cores per node cannot be determined. '
'See logs for details.')
return None
# TODO - need to account for number of GPUs per executor
worker_node_type = self.platform.get_matching_worker_node_type(total_cores_per_node)
Expand Down

0 comments on commit 730a05d

Please sign in to comment.