Skip to content
Merged
3 changes: 3 additions & 0 deletions nemo_run/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@


class UnknownStatusError(Exception): ...


class PersistentSacctFailure(Exception): ...

Check notice

Code scanning / CodeQL

Statement has no effect Note

This statement has no effect.

Copilot Autofix

AI about 1 month ago

In general, to fix "statement has no effect" issues, remove or replace expression statements that do not produce side effects with an appropriate construct (e.g., pass for empty blocks, or a proper method/attribute definition if behavior was intended). For empty class bodies, pass is the standard way to indicate that no additional attributes or methods are defined.

Here, the best minimal fix is to replace the ... expression in the body of PersistentSacctFailure with pass, matching common Python style and avoiding any behavior change: the class remains a simple subclass of Exception with no extra logic. Given the consistent pattern in this file, we can also replace the ... bodies of SetValueError and UnknownStatusError with pass for consistency and to prevent the same warning there as well, all within nemo_run/exceptions.py. No new methods, imports, or definitions are required.

Suggested changeset 1
nemo_run/exceptions.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/nemo_run/exceptions.py b/nemo_run/exceptions.py
--- a/nemo_run/exceptions.py
+++ b/nemo_run/exceptions.py
@@ -14,10 +14,13 @@
 # limitations under the License.
 
 
-class SetValueError(ValueError): ...
+class SetValueError(ValueError):
+    pass
 
 
-class UnknownStatusError(Exception): ...
+class UnknownStatusError(Exception):
+    pass
 
 
-class PersistentSacctFailure(Exception): ...
+class PersistentSacctFailure(Exception):
+    pass
EOF
@@ -14,10 +14,13 @@
# limitations under the License.


class SetValueError(ValueError): ...
class SetValueError(ValueError):
pass


class UnknownStatusError(Exception): ...
class UnknownStatusError(Exception):
pass


class PersistentSacctFailure(Exception): ...
class PersistentSacctFailure(Exception):
pass
Copilot is powered by AI and may make mistakes. Always verify output.
8 changes: 7 additions & 1 deletion nemo_run/run/torchx_backend/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from nemo_run.core.execution.base import Executor
from nemo_run.core.frontend.console.api import CONSOLE
from nemo_run.exceptions import UnknownStatusError
from nemo_run.exceptions import PersistentSacctFailure, UnknownStatusError
from nemo_run.run.logs import get_logs
from nemo_run.run.torchx_backend.runner import Runner, get_runner

Expand Down Expand Up @@ -158,6 +158,12 @@ def wait_and_exit(
while tries < timeout:
try:
status = runner.wait(app_handle, wait_interval=2)
except PersistentSacctFailure as e:
logger.error(
f"sacct has been unreachable for too long for job {app_id}, cancelling: {e}"
)
runner.cancel(app_handle)
raise UnknownStatusError(str(e)) from e
except RuntimeError as e:
if "can't start new thread" in str(e) and thread_retries < 5:
thread_retries += 1
Expand Down
24 changes: 21 additions & 3 deletions nemo_run/run/torchx_backend/schedulers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,13 @@
from nemo_run.core.execution.base import Executor
from nemo_run.core.execution.slurm import SlurmBatchRequest, SlurmExecutor, SlurmJobDetails
from nemo_run.core.tunnel.client import LocalTunnel, PackagingJob, SSHTunnel, Tunnel
from nemo_run.exceptions import PersistentSacctFailure
from nemo_run.run import experiment as run_experiment
from nemo_run.run.ray.slurm import SlurmRayRequest
from nemo_run.run.torchx_backend.schedulers.api import SchedulerMixin

MAX_CONSECUTIVE_SACCT_FAILURES = 30

log: logging.Logger = logging.getLogger(__name__)
SLURM_JOB_DIRS = os.path.join(get_nemorun_home(), ".slurm_jobs")

Expand All @@ -74,6 +77,7 @@ def __init__(
self.tunnel: Optional[Tunnel] = None
super().__init__(session_name)
self.experiment = experiment
self._consecutive_sacct_failures: dict[str, int] = {}

# TODO: Move this into the SlurmExecutor
def _initialize_tunnel(self, tunnel: SSHTunnel | LocalTunnel):
Expand Down Expand Up @@ -240,9 +244,23 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
return None

assert self.tunnel, "Tunnel is None."
p = self.tunnel.run(
f"sacct --parsable2 -j {app_id}",
)
try:
p = self.tunnel.run(
f"sacct --parsable2 -j {app_id}",
)
except Exception as e:
count = self._consecutive_sacct_failures.get(app_id, 0) + 1
self._consecutive_sacct_failures[app_id] = count
if count >= MAX_CONSECUTIVE_SACCT_FAILURES:
raise PersistentSacctFailure(
f"sacct failed {count} consecutive times for job {app_id}: {e}"
) from e
log.warning(
f"Failed to query sacct for job {app_id} ({count}/{MAX_CONSECUTIVE_SACCT_FAILURES}): "
f"{e}. Treating as transient."
)
return DescribeAppResponse(app_id=app_id, state=AppState.UNKNOWN)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meaning, this will never be bubbled up. Is that fine?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes i think this should be fine.. it doesn't happen very often that sacct doesn't run. But when it fails, it's unfortuante to loose the Run session (and have a dangling job). I think this should be safe

self._consecutive_sacct_failures.pop(app_id, None)
output = p.stdout.strip().split("\n")

if len(output) <= 1:
Expand Down
79 changes: 79 additions & 0 deletions test/run/torchx_backend/schedulers/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@

from nemo_run.core.execution.slurm import SlurmBatchRequest, SlurmExecutor
from nemo_run.core.tunnel.client import LocalTunnel
from nemo_run.exceptions import PersistentSacctFailure
from nemo_run.run.torchx_backend.schedulers.slurm import (
MAX_CONSECUTIVE_SACCT_FAILURES,
SlurmTunnelScheduler,
TunnelLogIterator,
_get_job_dirs,
Expand Down Expand Up @@ -380,6 +382,83 @@ def test_describe_returns_unknown_on_persistent_permission_error(slurm_scheduler
assert result.state == AppState.UNKNOWN


def test_describe_returns_unknown_on_sacct_exception(slurm_scheduler, mocker):
"""Regression: transient sacct failure (e.g. after hours of polling) must not
propagate an exception and kill the wait loop. describe() should return UNKNOWN
(non-terminal) so polling continues until the job completes."""
from torchx.specs import AppState

job_dirs = {"12345": ("/path/to/job", LocalTunnel(job_dir="/path/to/tunnel"), "log*")}
mocker.patch(
"nemo_run.run.torchx_backend.schedulers.slurm._get_job_dirs",
return_value=job_dirs,
)
mocker.patch.object(SlurmTunnelScheduler, "_initialize_tunnel")

slurm_scheduler.tunnel = mock.MagicMock()
slurm_scheduler.tunnel.run.side_effect = Exception("sacct: command failed")

result = slurm_scheduler.describe("12345")
assert result is not None
assert result.state == AppState.UNKNOWN


def test_describe_raises_persistent_sacct_failure_after_threshold(slurm_scheduler, mocker):
"""After MAX_CONSECUTIVE_SACCT_FAILURES consecutive sacct exceptions, describe() must
raise PersistentSacctFailure so the caller can cancel the job instead of spinning forever."""
job_dirs = {"12345": ("/path/to/job", LocalTunnel(job_dir="/path/to/tunnel"), "log*")}
mocker.patch(
"nemo_run.run.torchx_backend.schedulers.slurm._get_job_dirs",
return_value=job_dirs,
)
mocker.patch.object(SlurmTunnelScheduler, "_initialize_tunnel")

slurm_scheduler.tunnel = mock.MagicMock()
slurm_scheduler.tunnel.run.side_effect = Exception("sacct: command failed")

for _ in range(MAX_CONSECUTIVE_SACCT_FAILURES - 1):
result = slurm_scheduler.describe("12345")
assert result.state == AppState.UNKNOWN

with pytest.raises(PersistentSacctFailure, match="12345"):
slurm_scheduler.describe("12345")


def test_describe_resets_sacct_failure_counter_on_success(slurm_scheduler, mocker):
"""A successful sacct call must reset the consecutive failure counter so that
subsequent transient failures start fresh."""
job_dirs = {"12345": ("/path/to/job", LocalTunnel(job_dir="/path/to/tunnel"), "log*")}
mocker.patch(
"nemo_run.run.torchx_backend.schedulers.slurm._get_job_dirs",
return_value=job_dirs,
)
mocker.patch.object(SlurmTunnelScheduler, "_initialize_tunnel")

slurm_scheduler.tunnel = mock.MagicMock()

# Fail just below the threshold
slurm_scheduler.tunnel.run.side_effect = Exception("sacct: command failed")
for _ in range(MAX_CONSECUTIVE_SACCT_FAILURES - 1):
slurm_scheduler.describe("12345")

# Recover — sacct returns valid output
header = "JobID|JobName|State|ExitCode"
row = "12345|exp.master|RUNNING|0:0"
success_result = mock.MagicMock()
success_result.stdout = f"{header}\n{row}"
slurm_scheduler.tunnel.run.side_effect = None
slurm_scheduler.tunnel.run.return_value = success_result
slurm_scheduler.describe("12345")

assert slurm_scheduler._consecutive_sacct_failures.get("12345", 0) == 0

# Fail again — counter should restart from 1, not trigger threshold immediately
slurm_scheduler.tunnel.run.side_effect = Exception("sacct: command failed")
result = slurm_scheduler.describe("12345")
assert result.state == AppState.UNKNOWN
assert slurm_scheduler._consecutive_sacct_failures["12345"] == 1


def test_schedule_with_dependencies(slurm_scheduler, slurm_executor):
mock_request = mock.MagicMock()
mock_request.cmd = ["sbatch", "--requeue", "--parsable"]
Expand Down
13 changes: 12 additions & 1 deletion test/run/torchx_backend/test_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from torchx.specs import AppDef, AppStatus

from nemo_run.core.execution.base import Executor
from nemo_run.exceptions import UnknownStatusError
from nemo_run.exceptions import PersistentSacctFailure, UnknownStatusError
from nemo_run.run.logs import get_logs
from nemo_run.run.torchx_backend.launcher import ContextThread, launch, wait_and_exit

Expand Down Expand Up @@ -231,6 +231,17 @@ def test_wait_and_exit_other_runtime_error_propagates(mock_runner):
wait_and_exit(app_handle=mock_app_handle, log=False, runner=mock_runner)


def test_wait_and_exit_cancels_job_on_persistent_sacct_failure(mock_runner):
"""PersistentSacctFailure must cancel the job and raise UnknownStatusError."""
mock_app_handle = "dummy://nemo_run/my-test-run"
mock_runner.wait.side_effect = PersistentSacctFailure("sacct failed 30 times for 12345")

with pytest.raises(UnknownStatusError):
wait_and_exit(app_handle=mock_app_handle, log=False, runner=mock_runner)

mock_runner.cancel.assert_called_once_with(mock_app_handle)


@patch("threading.Thread.run")
def test_context_thread_run(mocked_run, setup_and_teardown):
def test_function():
Expand Down
Loading