Skip to content

Commit 22c7e21

Browse files
Rexrexcsn
authored andcommitted
Use subprocess based method to read shared files
* Avoid reading NFS shared files with `open` directly, because when NFS is not available(head node down) `open` call will hang forever. * Use `subprocess.run`, which has built in timeout functionality, to copy shared file to local first, or cat file directly * This will prevent computemgtd from hanging even if NFS directories are not available Signed-off-by: Rex <[email protected]>
1 parent eb1cf0d commit 22c7e21

File tree

5 files changed

+62
-25
lines changed

5 files changed

+62
-25
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ This file is used to list changes made in each version of the aws-parallelcluste
1010
- SGE: always use shortname as hostname filter with `qstat`. This will make nodewatcher more robust when using custom DHCP option, where the full hostname seen by `SGE` might differ from the hostname returned from EC2 metadata(local-hostname).
1111
- Transition from IMDSv1 to IMDSv2.
1212
- Have `computemgtd` reuse last available daemon configuration when the new one cannot be loaded.
13+
- Use methods with timeouts to read NFS shared files, which will prevent `computemgtd` from hanging when NFS filesystems are not available.
1314

1415
**BUG FIXES**
1516
- Fix a bug that caused `clustermgtd` to not immediately replace instances with failed status check that are in replacement process.

src/slurm_plugin/common.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from botocore.config import Config
2222
from botocore.exceptions import ClientError
2323
from common.schedulers.slurm_commands import InvalidNodenameError, parse_nodename, update_nodes
24-
from common.utils import grouper
24+
from common.utils import check_command_output, grouper
2525

2626
CONFIG_FILE_DIR = "/etc/parallelcluster/slurm_plugin"
2727
EC2Instance = collections.namedtuple("EC2Instance", ["id", "private_ip", "hostname", "launch_time"])
@@ -49,6 +49,7 @@
4949
# timestamp used by clustermgtd and computemgtd should be in default ISO format
5050
# YYYY-MM-DDTHH:MM:SS.ffffff+HH:MM[:SS[.ffffff]]
5151
TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S.%f%z"
52+
DEFAULT_COMMAND_TIMEOUT = 30
5253

5354

5455
logger = logging.getLogger(__name__)
@@ -430,16 +431,27 @@ def retrieve_instance_type_mapping(file_path):
430431
raise
431432

432433

433-
def _get_clustermgtd_heartbeat(clustermgtd_heartbeat_file_path):
434+
def get_clustermgtd_heartbeat(clustermgtd_heartbeat_file_path):
434435
"""Get clustermgtd's last heartbeat."""
435-
with open(clustermgtd_heartbeat_file_path, "r") as timestamp_file:
436-
# Note: heartbeat must be written with datetime.strftime to convert localized datetime into str
437-
# datetime.strptime will not work with str(datetime)
438-
# Example timestamp written to heartbeat file: 2020-07-30 19:34:02.613338+00:00
439-
return datetime.strptime(timestamp_file.read().strip(), TIMESTAMP_FORMAT)
436+
# Use subprocess based method to read shared file to prevent hanging when NFS is down
437+
# Do not copy to local. Different users need to access the file, but file should be writable by root only
438+
# Only use last line of output to avoid taking unexpected output in stdout
439+
heartbeat = (
440+
check_command_output(
441+
f"cat {clustermgtd_heartbeat_file_path}",
442+
timeout=DEFAULT_COMMAND_TIMEOUT,
443+
shell=True, # nosec
444+
)
445+
.splitlines()[-1]
446+
.strip()
447+
)
448+
# Note: heartbeat must be written with datetime.strftime to convert localized datetime into str
449+
# datetime.strptime will not work with str(datetime)
450+
# Example timestamp written to heartbeat file: 2020-07-30 19:34:02.613338+00:00
451+
return datetime.strptime(heartbeat, TIMESTAMP_FORMAT)
440452

441453

442-
def _expired_clustermgtd_heartbeat(last_heartbeat, current_time, clustermgtd_timeout):
454+
def expired_clustermgtd_heartbeat(last_heartbeat, current_time, clustermgtd_timeout):
443455
"""Test if clustermgtd heartbeat is expired."""
444456
if time_is_up(last_heartbeat, current_time, clustermgtd_timeout):
445457
logger.error(
@@ -454,9 +466,9 @@ def _expired_clustermgtd_heartbeat(last_heartbeat, current_time, clustermgtd_tim
454466

455467
def is_clustermgtd_heartbeat_valid(current_time, clustermgtd_timeout, clustermgtd_heartbeat_file_path):
456468
try:
457-
last_heartbeat = _get_clustermgtd_heartbeat(clustermgtd_heartbeat_file_path)
469+
last_heartbeat = get_clustermgtd_heartbeat(clustermgtd_heartbeat_file_path)
458470
logger.info("Latest heartbeat from clustermgtd: %s", last_heartbeat)
459-
return not _expired_clustermgtd_heartbeat(last_heartbeat, current_time, clustermgtd_timeout)
471+
return not expired_clustermgtd_heartbeat(last_heartbeat, current_time, clustermgtd_timeout)
460472
except Exception as e:
461473
logger.error("Unable to retrieve clustermgtd heartbeat with exception: %s", e)
462474
return False

src/slurm_plugin/computemgtd.py

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,16 @@
2121
from botocore.config import Config
2222
from common.schedulers.slurm_commands import get_nodes_info
2323
from common.time_utils import seconds
24-
from common.utils import get_metadata, sleep_remaining_loop_time
24+
from common.utils import get_metadata, run_command, sleep_remaining_loop_time
2525
from retrying import retry
26-
from slurm_plugin.common import CONFIG_FILE_DIR, InstanceManager, is_clustermgtd_heartbeat_valid, log_exception
26+
from slurm_plugin.common import (
27+
CONFIG_FILE_DIR,
28+
DEFAULT_COMMAND_TIMEOUT,
29+
InstanceManager,
30+
expired_clustermgtd_heartbeat,
31+
get_clustermgtd_heartbeat,
32+
log_exception,
33+
)
2734

2835
LOOP_TIME = 60
2936
RELOAD_CONFIG_ITERATIONS = 10
@@ -53,14 +60,20 @@ def __repr__(self):
5360
attrs = ", ".join(["{key}={value}".format(key=key, value=repr(value)) for key, value in self.__dict__.items()])
5461
return "{class_name}({attrs})".format(class_name=self.__class__.__name__, attrs=attrs)
5562

56-
@log_exception(log, "reading computemgtd config", catch_exception=IOError, raise_on_error=True)
63+
@log_exception(log, "reading computemgtd config", catch_exception=Exception, raise_on_error=True)
5764
def _get_config(self, config_file_path):
5865
"""Get computemgtd configuration."""
5966
log.info("Reading %s", config_file_path)
6067
config = ConfigParser()
6168
try:
62-
config.read_file(open(config_file_path, "r"))
63-
except IOError:
69+
# Use subprocess based method to copy shared file to local to prevent hanging when NFS is down
70+
run_command(
71+
f"cat {config_file_path} > {CONFIG_FILE_DIR}/.computemgtd_config.local",
72+
timeout=DEFAULT_COMMAND_TIMEOUT,
73+
shell=True, # nosec
74+
)
75+
config.read_file(open(f"{CONFIG_FILE_DIR}/.computemgtd_config.local", "r"))
76+
except Exception:
6477
log.error(f"Cannot read computemgtd configuration file: {config_file_path}")
6578
raise
6679

@@ -99,11 +112,10 @@ def _get_config(self, config_file_path):
99112
def _read_nodename_from_file(nodename_file_path):
100113
"""Read self nodename from a file."""
101114
try:
102-
log.info("Reading self nodename from %s", nodename_file_path)
103115
with open(nodename_file_path, "r") as nodename_file:
104116
nodename = nodename_file.read()
105117
return nodename
106-
except IOError as e:
118+
except Exception as e:
107119
log.error("Unable to read self nodename from %s with exception: %s\n", nodename_file_path, e)
108120
raise
109121

@@ -187,9 +199,16 @@ def _run_computemgtd():
187199
reload_config_counter -= 1
188200

189201
# Check heartbeat
190-
if not is_clustermgtd_heartbeat_valid(
191-
current_time, computemgtd_config.clustermgtd_timeout, computemgtd_config.clustermgtd_heartbeat_file_path
192-
):
202+
try:
203+
last_heartbeat = get_clustermgtd_heartbeat(computemgtd_config.clustermgtd_heartbeat_file_path)
204+
log.info("Latest heartbeat from clustermgtd: %s", last_heartbeat)
205+
except Exception as e:
206+
log.warning(
207+
"Unable to retrieve clustermgtd heartbeat. Using last known heartbeat: %s with exception: %s",
208+
last_heartbeat,
209+
e,
210+
)
211+
if expired_clustermgtd_heartbeat(last_heartbeat, current_time, computemgtd_config.clustermgtd_timeout):
193212
if computemgtd_config.disable_computemgtd_actions:
194213
log.info("All computemgtd actions currently disabled")
195214
elif _is_self_node_down(computemgtd_config.nodename):

tests/slurm_plugin/test_common.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import logging
1414
import os
1515
from datetime import datetime, timedelta, timezone
16-
from unittest.mock import call, mock_open
16+
from unittest.mock import call
1717

1818
import botocore
1919
import pytest
@@ -26,7 +26,7 @@
2626
EC2Instance,
2727
EC2InstanceHealthState,
2828
InstanceManager,
29-
_get_clustermgtd_heartbeat,
29+
get_clustermgtd_heartbeat,
3030
time_is_up,
3131
)
3232

@@ -1363,5 +1363,8 @@ def test_time_is_up(initial_time, current_time, grace_time, expected_result):
13631363
],
13641364
)
13651365
def test_get_clustermgtd_heartbeat(time, expected_parsed_time, mocker):
1366-
mocker.patch("slurm_plugin.common.open", mock_open(read_data=time.strftime(TIMESTAMP_FORMAT)))
1367-
assert_that(_get_clustermgtd_heartbeat("some file path")).is_equal_to(expected_parsed_time)
1366+
mocker.patch(
1367+
"slurm_plugin.common.check_command_output",
1368+
return_value=f"some_random_stdout\n{time.strftime(TIMESTAMP_FORMAT)}",
1369+
)
1370+
assert_that(get_clustermgtd_heartbeat("some file path")).is_equal_to(expected_parsed_time)

tests/slurm_plugin/test_computemgtd.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@
6161
)
6262
def test_computemgtd_config(config_file, expected_attributes, test_datadir, mocker):
6363
mocker.patch("slurm_plugin.computemgtd.ComputemgtdConfig._read_nodename_from_file", return_value="some_nodename")
64-
compute_config = ComputemgtdConfig(test_datadir / config_file)
64+
mocker.patch("slurm_plugin.computemgtd.run_command")
65+
mocker.patch("slurm_plugin.computemgtd.open", return_value=open(test_datadir / config_file, "r"))
66+
compute_config = ComputemgtdConfig("mocked_config_path")
6567
for key in expected_attributes:
6668
assert_that(compute_config.__dict__.get(key)).is_equal_to(expected_attributes.get(key))
6769

0 commit comments

Comments
 (0)