Skip to content

Commit 7bb99a9

Browse files
[Develop] Backing instance max count updates - _is_node_in_replacement_valid(), is_backing_instance_valid(), and unit tests (#622)
* Add logic to `is_backing_instance_valid()` to check the IP to make sure the instance matches what is being tracked (#618) The missing instance map did not track what the IP address was that was associated with the slurm node. Because of this if a new instance is launched before an instance becomes healthy, the increment is not reset for the instance count map. This change uses a class object to track the data and links the node name to the ip. Also use the `is_backing_instance_valid()` function in `is_state_healthy()` instead of the plain `node.instance` object check to allow for the delay in EC2 consistency. * Refactor logic in `_is_node_in_replacement_valid()` to account for `node.instance` being `None` (#620) * Add unit tests to cover max_count > 0 in _is_node_in_replacement_valid
1 parent 293efb7 commit 7bb99a9

File tree

4 files changed

+95
-19
lines changed

4 files changed

+95
-19
lines changed

src/slurm_plugin/clustermgtd.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1200,17 +1200,31 @@ def _find_active_nodes(partitions_name_map):
12001200
active_nodes += partition.slurm_nodes
12011201
return list(dict.fromkeys(active_nodes))
12021202

1203-
def _is_node_in_replacement_valid(self, node, check_node_is_valid):
1203+
def _is_node_in_replacement_valid(self, node: SlurmNode, check_node_is_valid):
12041204
"""
12051205
Check node is replacement timeout or in replacement.
12061206
12071207
If check_node_is_valid=True, check whether a node is in replacement,
12081208
If check_node_is_valid=False, check whether a node is replacement timeout.
12091209
"""
1210-
if node.instance and node.name in self._static_nodes_in_replacement:
1211-
time_is_expired = time_is_up(
1212-
node.instance.launch_time, self._current_time, grace_time=self._config.node_replacement_timeout
1210+
log.debug(f"Checking if node is in replacement {node}")
1211+
if (
1212+
node.is_backing_instance_valid(
1213+
self._config.ec2_instance_missing_max_count,
1214+
self._nodes_without_backing_instance_count_map,
1215+
log_warn_if_unhealthy=True,
1216+
)
1217+
and node.name in self._static_nodes_in_replacement
1218+
):
1219+
# Set `time_is_expired` to `False` if `node.instance` is `None` since we don't have a launch time yet
1220+
time_is_expired = (
1221+
False
1222+
if not node.instance
1223+
else time_is_up(
1224+
node.instance.launch_time, self._current_time, grace_time=self._config.node_replacement_timeout
1225+
)
12131226
)
1227+
log.debug(f"Node {node} is in replacement and timer expired? {time_is_expired}, instance? {node.instance}")
12141228
return not time_is_expired if check_node_is_valid else time_is_expired
12151229
return False
12161230

src/slurm_plugin/slurm_resources.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,17 @@ class SlurmReservation:
180180
users: str
181181

182182

183+
class MissingInstance:
184+
name: str
185+
ip: str
186+
count: int
187+
188+
def __init__(self, name, ip, count):
189+
self.name = name
190+
self.ip = ip
191+
self.count = count
192+
193+
183194
class SlurmNode(metaclass=ABCMeta):
184195
SLURM_SCONTROL_COMPLETING_STATE = "COMPLETING"
185196
SLURM_SCONTROL_BUSY_STATES = {"MIXED", "ALLOCATED", SLURM_SCONTROL_COMPLETING_STATE}
@@ -427,7 +438,7 @@ def is_powering_down_with_nodeaddr(self):
427438
def is_backing_instance_valid(
428439
self,
429440
ec2_instance_missing_max_count,
430-
nodes_without_backing_instance_count_map: dict,
441+
nodes_without_backing_instance_count_map: dict[str, MissingInstance],
431442
log_warn_if_unhealthy=True,
432443
):
433444
"""Check if a slurm node's addr is set, it points to a valid instance in EC2."""
@@ -445,7 +456,11 @@ def is_backing_instance_valid(
445456
)
446457
# Allow a few iterations for the eventual consistency of EC2 data
447458
logger.debug(f"Map of slurm nodes without backing instances {nodes_without_backing_instance_count_map}")
448-
missing_instance_loop_count = nodes_without_backing_instance_count_map.get(self.name, 0)
459+
missing_instance = nodes_without_backing_instance_count_map.get(self.name, None)
460+
missing_instance_loop_count = missing_instance.count if missing_instance else 0
461+
if missing_instance and self.nodeaddr != missing_instance.ip:
462+
# Reset the loop count since the nodeaddr has changed
463+
missing_instance_loop_count = 0
449464
# If the loop count has been reached, the instance is unhealthy and will be terminated
450465
if missing_instance_loop_count >= ec2_instance_missing_max_count:
451466
if log_warn_if_unhealthy:
@@ -454,11 +469,12 @@ def is_backing_instance_valid(
454469
nodes_without_backing_instance_count_map.pop(self.name, None)
455470
self.ec2_backing_instance_valid = False
456471
else:
457-
nodes_without_backing_instance_count_map[self.name] = missing_instance_loop_count + 1
472+
instance_to_add = MissingInstance(self.name, self.nodeaddr, missing_instance_loop_count + 1)
473+
nodes_without_backing_instance_count_map[self.name] = instance_to_add
458474
if log_warn_if_unhealthy:
459475
logger.warning(
460476
f"Incrementing missing EC2 instance count for node {self.name} to "
461-
f"{nodes_without_backing_instance_count_map[self.name]}."
477+
f"{nodes_without_backing_instance_count_map[self.name].count}."
462478
)
463479
else:
464480
# Remove the slurm node from the map since the instance is healthy

tests/slurm_plugin/slurm_resources/test_slurm_resources.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
DynamicNode,
1919
EC2InstanceHealthState,
2020
InvalidNodenameError,
21+
MissingInstance,
2122
SlurmPartition,
2223
SlurmResumeJob,
2324
StaticNode,
@@ -1185,26 +1186,34 @@ def test_slurm_node_is_powering_down_with_nodeaddr(node, expected_result):
11851186
StaticNode("queue1-st-c5xlarge-1", "ip-1", "hostname", "IDLE+CLOUD+POWER", "queue1"),
11861187
None,
11871188
2,
1188-
{"queue1-st-c5xlarge-1": 1},
1189-
{"queue1-st-c5xlarge-1": 2},
1189+
{"queue1-st-c5xlarge-1": MissingInstance("queue1-st-c5xlarge-1", "ip-1", 1)},
1190+
{"queue1-st-c5xlarge-1": MissingInstance("queue1-st-c5xlarge-1", "ip-1", 2)},
11901191
True,
11911192
),
11921193
(
11931194
StaticNode("queue1-st-c5xlarge-1", "ip-1", "hostname", "IDLE+CLOUD+POWER", "queue1"),
11941195
None,
11951196
2,
1196-
{"queue1-st-c5xlarge-1": 2},
1197-
{"queue1-st-c5xlarge-1": 2},
1197+
{"queue1-st-c5xlarge-1": MissingInstance("queue1-st-c5xlarge-1", "ip-1", 2)},
1198+
{"queue1-st-c5xlarge-1": MissingInstance("queue1-st-c5xlarge-1", "ip-1", 2)},
11981199
False,
11991200
),
12001201
(
12011202
StaticNode("queue1-st-c5xlarge-1", "ip-1", "hostname", "IDLE+CLOUD+POWER", "queue1"),
12021203
"Instance",
12031204
2,
1204-
{"queue1-st-c5xlarge-1": 3},
1205+
{"queue1-st-c5xlarge-1": MissingInstance("queue1-st-c5xlarge-1", "ip-1", 3)},
12051206
{},
12061207
True,
12071208
),
1209+
(
1210+
StaticNode("queue1-st-c5xlarge-1", "ip-1", "hostname", "IDLE+CLOUD+POWER", "queue1"),
1211+
"Instance",
1212+
3,
1213+
{"queue1-st-c5xlarge-1": MissingInstance("queue1-st-c5xlarge-1", "ip-2", 2)},
1214+
{"queue1-st-c5xlarge-1": MissingInstance("queue1-st-c5xlarge-1", "ip-1", 1)},
1215+
True,
1216+
),
12081217
],
12091218
ids=[
12101219
"static_no_backing_zero_max_count",
@@ -1214,6 +1223,7 @@ def test_slurm_node_is_powering_down_with_nodeaddr(node, expected_result):
12141223
"static_no_backing_count_not_exceeded",
12151224
"static_no_backing_with_count_exceeded",
12161225
"static_backed_with_count_exceeded",
1226+
"static_no_backing_count_not_exceeded_with_wrong_ip",
12171227
],
12181228
)
12191229
def test_slurm_node_is_backing_instance_valid(node, instance, max_count, count_map, final_map, expected_result):
@@ -1225,7 +1235,9 @@ def test_slurm_node_is_backing_instance_valid(node, instance, max_count, count_m
12251235
).is_equal_to(expected_result)
12261236
assert_that(node.ec2_backing_instance_valid).is_equal_to(expected_result)
12271237
if count_map:
1228-
assert_that(count_map[node.name]).is_equal_to(final_map.get(node.name, None))
1238+
assert_that(count_map[node.name].count).is_equal_to(final_map.get(node.name, None).count)
1239+
assert_that(count_map[node.name].ip).is_equal_to(final_map.get(node.name, None).ip)
1240+
assert_that(count_map[node.name].ip).is_equal_to(node.nodeaddr)
12291241

12301242

12311243
@pytest.mark.parametrize(

tests/slurm_plugin/test_clustermgtd.py

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -767,6 +767,7 @@ def test_handle_health_check(
767767
region="region",
768768
boto3_config=None,
769769
fleet_config={},
770+
ec2_instance_missing_max_count=0,
770771
)
771772

772773
cluster_manager = ClusterManager(mock_sync_config)
@@ -831,6 +832,7 @@ def test_update_static_nodes_in_replacement(current_replacing_nodes, slurm_nodes
831832
region="region",
832833
boto3_config=None,
833834
fleet_config={},
835+
ec2_instance_missing_max_count=0,
834836
)
835837
cluster_manager = ClusterManager(mock_sync_config)
836838
cluster_manager._static_nodes_in_replacement = current_replacing_nodes
@@ -2646,43 +2648,61 @@ def initialize_console_logger_mock(mocker):
26462648

26472649

26482650
@pytest.mark.parametrize(
2649-
"current_replacing_nodes, node, instance, current_time, expected_result",
2651+
"current_replacing_nodes, node, instance, current_time, max_count, expected_result",
26502652
[
26512653
(
26522654
set(),
26532655
StaticNode("queue1-st-c5xlarge-1", "ip-1", "hostname", "IDLE+CLOUD", "queue1"),
26542656
EC2Instance("id-1", "ip-1", "hostname", {"ip-1"}, datetime(2020, 1, 1, 0, 0, 0)),
26552657
datetime(2020, 1, 1, 0, 0, 29),
2658+
0,
26562659
False,
26572660
),
26582661
(
26592662
{"queue1-st-c5xlarge-1"},
26602663
StaticNode("queue1-st-c5xlarge-1", "ip-1", "hostname", "IDLE+CLOUD", "queue1"),
26612664
None,
26622665
datetime(2020, 1, 1, 0, 0, 29),
2666+
0,
26632667
False,
26642668
),
26652669
(
26662670
{"queue1-st-c5xlarge-1"},
26672671
StaticNode("queue1-st-c5xlarge-1", "ip-1", "hostname", "DOWN+CLOUD", "queue1"),
26682672
EC2Instance("id-1", "ip-1", "hostname", {"ip-1"}, datetime(2020, 1, 1, 0, 0, 0)),
26692673
datetime(2020, 1, 1, 0, 0, 29),
2674+
0,
26702675
True,
26712676
),
26722677
(
26732678
{"queue1-st-c5xlarge-1"},
26742679
StaticNode("queue1-st-c5xlarge-1", "ip-1", "hostname", "IDLE+CLOUD", "queue1"),
26752680
EC2Instance("id-1", "ip-1", "hostname", {"ip-1"}, datetime(2020, 1, 1, 0, 0, 0)),
26762681
datetime(2020, 1, 1, 0, 0, 30),
2682+
0,
26772683
False,
26782684
),
2685+
(
2686+
{"queue1-st-c5xlarge-1"},
2687+
StaticNode("queue1-st-c5xlarge-1", "ip-1", "hostname", "IDLE+CLOUD", "queue1"),
2688+
None,
2689+
datetime(2020, 1, 1, 0, 0, 30),
2690+
1,
2691+
True,
2692+
),
2693+
],
2694+
ids=[
2695+
"not_in_replacement",
2696+
"no-backing-instance",
2697+
"in_replacement",
2698+
"timeout",
2699+
"no-backing-instance-with-max-count",
26792700
],
2680-
ids=["not_in_replacement", "no-backing-instance", "in_replacement", "timeout"],
26812701
)
26822702
@pytest.mark.usefixtures(
26832703
"initialize_instance_manager_mock", "initialize_executor_mock", "initialize_console_logger_mock"
26842704
)
2685-
def test_is_node_being_replaced(current_replacing_nodes, node, instance, current_time, expected_result):
2705+
def test_is_node_being_replaced(current_replacing_nodes, node, instance, current_time, max_count, expected_result):
26862706
mock_sync_config = SimpleNamespace(
26872707
node_replacement_timeout=30,
26882708
insufficient_capacity_timeout=3,
@@ -2691,6 +2711,7 @@ def test_is_node_being_replaced(current_replacing_nodes, node, instance, current
26912711
region="region",
26922712
boto3_config=None,
26932713
fleet_config={},
2714+
ec2_instance_missing_max_count=max_count,
26942715
)
26952716
cluster_manager = ClusterManager(mock_sync_config)
26962717
cluster_manager._current_time = current_time
@@ -2700,24 +2721,34 @@ def test_is_node_being_replaced(current_replacing_nodes, node, instance, current
27002721

27012722

27022723
@pytest.mark.parametrize(
2703-
"node, instance, current_node_in_replacement, is_replacement_timeout",
2724+
"node, instance, current_node_in_replacement, max_count, is_replacement_timeout",
27042725
[
27052726
(
27062727
StaticNode("queue1-st-c5xlarge-1", "ip-1", "hostname", "DOWN+CLOUD+NOT_RESPONDING", "queue1"),
27072728
None,
27082729
{"queue1-st-c5xlarge-1"},
2730+
0,
2731+
False,
2732+
),
2733+
(
2734+
StaticNode("queue1-st-c5xlarge-1", "ip-1", "hostname", "DOWN+CLOUD+NOT_RESPONDING", "queue1"),
2735+
None,
2736+
{"queue1-st-c5xlarge-1"},
2737+
1,
27092738
False,
27102739
),
27112740
(
27122741
StaticNode("queue1-st-c5xlarge-1", "ip-1", "hostname", "DOWN+CLOUD+NOT_RESPONDING", "queue1"),
27132742
EC2Instance("id-1", "ip-1", "hostname", {"ip-1"}, datetime(2020, 1, 1, 0, 0, 0)),
27142743
{"queue1-st-c5xlarge-1"},
2744+
0,
27152745
True,
27162746
),
27172747
(
27182748
DynamicNode("queue1-dy-c5xlarge-1", "ip-1", "hostname", "MIXED+CLOUD+NOT_RESPONDING+POWERING_UP", "queue1"),
27192749
None,
27202750
{"some_node_in_replacement"},
2751+
0,
27212752
False,
27222753
),
27232754
(
@@ -2730,20 +2761,22 @@ def test_is_node_being_replaced(current_replacing_nodes, node, instance, current
27302761
),
27312762
EC2Instance("id-1", "ip-1", "hostname", {"ip-1"}, datetime(2020, 1, 1, 0, 0, 0)),
27322763
{"some_node_in_replacement"},
2764+
0,
27332765
False,
27342766
),
27352767
(
27362768
StaticNode("queue1-st-c5xlarge-1", "ip-1", "hostname", "DOWN+CLOUD+NOT_RESPONDING", "queue1"),
27372769
EC2Instance("id-1", "ip-1", "hostname", {"ip-1"}, datetime(2020, 1, 1, 0, 0, 0)),
27382770
{"some_node_in_replacement"},
2771+
0,
27392772
False,
27402773
),
27412774
],
27422775
)
27432776
@pytest.mark.usefixtures(
27442777
"initialize_instance_manager_mock", "initialize_executor_mock", "initialize_console_logger_mock"
27452778
)
2746-
def test_is_node_replacement_timeout(node, current_node_in_replacement, is_replacement_timeout, instance):
2779+
def test_is_node_replacement_timeout(node, current_node_in_replacement, max_count, is_replacement_timeout, instance):
27472780
node.instance = instance
27482781
mock_sync_config = SimpleNamespace(
27492782
node_replacement_timeout=30,
@@ -2753,6 +2786,7 @@ def test_is_node_replacement_timeout(node, current_node_in_replacement, is_repla
27532786
region="region",
27542787
boto3_config=None,
27552788
fleet_config={},
2789+
ec2_instance_missing_max_count=0,
27562790
)
27572791
cluster_manager = ClusterManager(mock_sync_config)
27582792
cluster_manager._current_time = datetime(2020, 1, 2, 0, 0, 0)

0 commit comments

Comments
 (0)