From 730a05dc7b56750d2805ccb5d3261fe6fa938433 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Mon, 28 Oct 2024 09:35:22 -0700 Subject: [PATCH] Update error handling in python for parsing cluster information (#1394) * Handle invalid values for num worker nodes and num execs per node Signed-off-by: Partho Sarthi * Fix pylint Signed-off-by: Partho Sarthi --------- Signed-off-by: Partho Sarthi --- .../common/cluster_inference.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/common/cluster_inference.py b/user_tools/src/spark_rapids_pytools/common/cluster_inference.py index 6d8ac3db1..e5f708471 100644 --- a/user_tools/src/spark_rapids_pytools/common/cluster_inference.py +++ b/user_tools/src/spark_rapids_pytools/common/cluster_inference.py @@ -76,6 +76,12 @@ def _get_cluster_template_args(self, cluster_info_df: pd.Series) -> Optional[dic num_driver_nodes = 1 app_id = cluster_info_df.get('App ID') num_worker_nodes = cluster_info_df.get('Num Worker Nodes') + # If number of worker nodes is invalid, log error and return + if pd.isna(num_worker_nodes) or num_worker_nodes <= 0: + self._log_inference_failure(app_id, 'Number of worker nodes cannot be determined. ' + 'See logs for details.') + return None + cores_per_executor = cluster_info_df.get('Cores Per Executor') execs_per_node = cluster_info_df.get('Num Executors Per Node') total_cores_per_node = execs_per_node * cores_per_executor @@ -85,6 +91,11 @@ def _get_cluster_template_args(self, cluster_info_df: pd.Series) -> Optional[dic 'NUM_WORKER_NODES': int(num_worker_nodes), } if self.platform.get_platform_name() == CspEnv.ONPREM: + # For on-prem, if total cores per node is invalid, log error and return + if pd.isna(total_cores_per_node) or total_cores_per_node <= 0: + self._log_inference_failure(app_id, 'Total cores per node cannot be determined. ' + 'See logs for details.') + return None # For on-prem, we need to include number of cores per worker node cluster_prop['NUM_WORKER_CORES'] = int(total_cores_per_node) else: @@ -95,9 +106,10 @@ def _get_cluster_template_args(self, cluster_info_df: pd.Series) -> Optional[dic driver_node_type = self.platform.configs.get_value('clusterInference', 'defaultCpuInstances', 'driver') worker_node_type = cluster_info_df.get('Worker Node Type') if pd.isna(worker_node_type): - # If worker instance is not set, use the default value based on the number of cores - if pd.isna(total_cores_per_node): - self._log_inference_failure(app_id, 'Total cores per node cannot be determined.') + # For CSPs, if worker instance is not set and total cores per node is invalid, log error and return + if pd.isna(total_cores_per_node) or total_cores_per_node <= 0: + self._log_inference_failure(app_id, 'Total cores per node cannot be determined. ' + 'See logs for details.') return None # TODO - need to account for number of GPUs per executor worker_node_type = self.platform.get_matching_worker_node_type(total_cores_per_node)