Skip to content

Conversation

@khsrali
Copy link
Contributor

@khsrali khsrali commented Oct 10, 2025

Edit:

Problem

The JobsList class had a race condition where:

  1. Job A requests a status update, triggering a scheduler query
  2. While the scheduler is being queried (with only Job A), Job B also requests an update
  3. After the scheduler returns (with only Job A's status), both futures were resolved
  4. Job B's future was resolved as DONE, because AiiDA assumes any job ID that disappeared from the scheduler query has completed

This premature "DONE" status causes several critical issues:

  • Premature retrieval: AiiDA attempts to retrieve output files while the job is still running
  • Corrupted files: Files may be incomplete or still being written when retrieved
  • False failure reports: Jobs still running may be incorrectly marked as failed

This issue only surfaces when using async transport plugins like core.ssh_async or aiida-firecrest, where the timing conditions make the race condition more likely to occur.

Solution

Only resolve futures for jobs that were actually inspected by the scheduler

Testing

Added test_prevent_racing_condition which explicitly tests the race condition scenario

@codecov
Copy link

codecov bot commented Oct 10, 2025

Codecov Report

❌ Patch coverage is 46.15385% with 7 lines in your changes missing coverage. Please review.
✅ Project coverage is 77.60%. Comparing base (27b52da) to head (65e501c).

Files with missing lines Patch % Lines
src/aiida/engine/processes/calcjobs/manager.py 46.16% 7 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #7061      +/-   ##
==========================================
- Coverage   77.60%   77.60%   -0.00%     
==========================================
  Files         566      566              
  Lines       43549    43554       +5     
==========================================
+ Hits        33793    33794       +1     
- Misses       9756     9760       +4     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@rikigigi
Copy link
Member

Do the slurm project know about this issue?

@mbercx
Copy link
Member

mbercx commented Oct 14, 2025

Thanks @khsrali! Just testing your solution in the field now:

  • First, I managed to reproduce the issue on Thanos, see calculation job with PK 24690 (project mc3d-relax; profile dev). This happened quite quickly, like the 5th pw.x calculation I started.
  • Using your branch, I ran ~60 calculations with the same setup. None of them had the issue.

So it seems to fix the issue robustly for me. Regarding configuration, my main question is: does await asyncio.sleep(5) block one of the workers?

@khsrali
Copy link
Contributor Author

khsrali commented Oct 14, 2025

Thanks @mbercx for trying this out,

does await asyncio.sleep(5) block one of the workers?

Not at all, it's placed before EBM call, and only on individual calls.
So scheduler.get_job is happening as before, only the current calcjob will delay to receive an update. And that delay is placed on only and only the first update of that calcjob. So the overhead is very small, and non-blocking.

I'd like to come up with some sorta bullet proof regression tests for this solution, to make 100% sure if this resolve the issue.

@rikigigi

Do the slurm project know about this issue?

I couldn't find any similar reported issue online. Before any reporting, I'd like to make 100% sure if the lag in slurm is the issue.

@mbercx
Copy link
Member

mbercx commented Oct 14, 2025

Great, thanks for the clarification @khsrali! In that case perhaps we can just set this sleep time to safe_interval? Seems appropriate, and we avoid adding another option.

Fully support adding a test for this. From my end: I'll run ~1000 structures with core.ssh_async in production, come back to you in case I still run into this premature retrieval problem.

logger.info(f'scheduled request to update CalcJob<{node.pk}>')
ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption)

if not node.get_last_job_info():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest using a while loop to fetch the job info with a time interval and timeout.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while loop with timeout is an antipattern to dealing with remote communication. The transport already use transport.exec_command_wait to wait for the command to finish. The while loop is a tight loop that block the event loop and can cause performance downgrade. Meanwhile the error handling is hard to be standardlized inside. We don't want shot our foot with such workaround.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the code waits a fixed 5 seconds (await asyncio.sleep(5)), even if the job info becomes available earlier. A better approach is to poll periodically with a timeout, like this:

interval = 1
timeout = 5
tstart = time.time()

while not node.get_last_job_info():
    if time.time() - tstart > timeout:
        break
    await asyncio.sleep(interval)

This approach still uses await, so it doesn’t block the event loop. It just allows earlier exit when the job info is ready, instead of always waiting the full 5 seconds.

I don't really understand your concern about this approach. It’s quite standard in async workflows where you want to wait for a condition to become true without blocking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this will not work, because node.get_last_job_info() is only updated in func::do_update.

Copy link
Member

@superstar54 superstar54 Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. This becomes more complex. In this case, I would not sleep before the do_update, instead, we can handle it inside the do_udpate function.
We can use a flag has_job_info,

If job_info is None:
    if not `already_had_job_info`  and t < timeout:
    # which means the job_info is not appear because of slurm , so we should wait instead of assuming it's done
    else:
    # do the normal code
else:
    already_had_job_info = True # record that we already see the job.
    ...

@rikigigi
Copy link
Member

This workaround has completely stabilized the aiida interactions with the Slurm cluster I'm using, thank you! While the fix itself is brilliant, it's frankly concerning that Slurm has an underlying issue that requires such an unconventional solution....

@unkcpz
Copy link
Member

unkcpz commented Oct 17, 2025

Did you check the stderr of squeue command call? In the code base, the stderr was ignored. I believe if the command is timeout or didn't return the expected output, the slurm will give some reasonable information.

You may want to check this function:

    def get_jobs(
        self,
        jobs: list[str] | None = None,
        user: str | None = None,
        as_dict: bool = False,
    ) -> list[JobInfo] | dict[str, JobInfo]:
        """Return the list of currently active jobs.

        :param jobs: A list of jobs to check; only these are checked.
        :param user: A string with a user: only jobs of this user are checked.
        :param as_dict: If ``False`` (default), a list of ``JobInfo`` objects is returned. If ``True``, a dictionary is
            returned, where the ``job_id`` is the key and the values are the ``JobInfo`` objects.
        :returns: List of active jobs.
        """
        with self.transport:
            retval, stdout, stderr = self.transport.exec_command_wait(self._get_joblist_command(jobs=jobs, user=user))

        joblist = self._parse_joblist_output(retval, stdout, stderr)
        if as_dict:
            jobdict = {job.job_id: job for job in joblist}
            if None in jobdict:
                raise SchedulerError('Found at least one job without jobid')
            return jobdict

        return joblist

@rikigigi @mbercx, can you check your daemon log to see if you can see any following error/warning messages?

                f"""squeue returned exit code {retval} (_parse_joblist_output function)
stdout='{stdout.strip()}'
stderr='{stderr.strip()}'"""
            )
        if stderr.strip():
            self.logger.warning(
                f"squeue returned exit code 0 (_parse_joblist_output function) but non-empty stderr='{stderr.strip()}'"
            )

@khsrali
Copy link
Contributor Author

khsrali commented Oct 17, 2025

@unkcpz and @superstar54

I think there's a misunderstanding here, the issue is not that the job is not submitted or we don't have a job-id.
What we think is the issue is that the update function concluding the job is complete because it cannot find it in the squeue results after the submission.

@superstar54
Copy link
Member

I think there's a misunderstanding here, the issue is not that the job is not submitted or we don't have a job-id.
What we think is the issue is that the update function concluding the job is complete because it cannot find it in the squeue results after the submission.

Yes, no confusion on my side. My comment was only about replacing the hard-coded wait time with a more flexible approach.

@khsrali
Copy link
Contributor Author

khsrali commented Oct 17, 2025

#4326
🤔

@khsrali khsrali force-pushed the scheduler_prejudice_bug branch from d146b8b to 9f8ddd1 Compare November 3, 2025 15:27
@khsrali khsrali changed the title Fix premature retrieval of output files before job completion in async engine Fix race condition in JobsList Nov 3, 2025
@khsrali khsrali requested review from mbercx and removed request for mbercx November 3, 2025 15:56
@mbercx
Copy link
Member

mbercx commented Nov 3, 2025

Thanks for the ping @khsrali! I see you've found a more robust/elegant solution? I'll do a bit of field testing to see if I run into any issues. 🚀

@khsrali
Copy link
Contributor Author

khsrali commented Nov 4, 2025

@mbercx yes! this hopefully is the correct fix.
In the end the problem was not slurm, it appeared to be a racing condition in the JobManger.

@khsrali khsrali force-pushed the scheduler_prejudice_bug branch from 9f8ddd1 to 06a6b17 Compare November 4, 2025 08:54
@danielhollas danielhollas self-requested a review November 4, 2025 12:59
Comment on lines 152 to 155
if str(job_id) in self._inspecting_jobs:
future.set_result(self._jobs_cache.get(job_id, None))
else:
racing_requests[job_id] = future
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you first check whether the self._jobs_cache.get resturns a non-None result? If so, then presumably you can set the future even if it has been racing? Or is it guaranteed to be None if the job was not in self._inspecting_jobs? But I might be misunderstanding.

Something like this:

Suggested change
if str(job_id) in self._inspecting_jobs:
future.set_result(self._jobs_cache.get(job_id, None))
else:
racing_requests[job_id] = future
job_status = self._jobs_cache.get(job_id, None)
if str(job_id) in self._inspecting_jobs or job_status is not None:
future.set_result(self._jobs_cache.get(job_id, None))
else:
racing_requests[job_id] = future

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, looking at the code in _get_jobs_from_scheduler it seems like it will always be None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the question

racing_requests[job_id] = future
finally:
self._job_update_requests = {}
self._job_update_requests = racing_requests
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is still a race condition here since you can overwrite a poorly timed call to self._job_update_requests.setdefault in request_job_info_update.

I think instead of this finally clause we should be poping the dict items as we iterate on them above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used a better pop solution, just to be safe.

P.S. I don't think if this ever happens, because the for loop is actually blocking the loop (although it's inside an async function), that means during the process exactly between
self._jobs_cache = await self._get_jobs_from_scheduler() and the end of the function, self._job_update_requests is really "frozen".
It's only when it reaches the end of the function, that it relinquishes resources to the loop. (there's nothing async in the function apart the await that I mentioned)

@mbercx
Copy link
Member

mbercx commented Nov 5, 2025

I've been doing some more field testing of this version of the PR:

  1. Submitting ~50 more calculations on Eiger, I find no more "early retrieval" issues.
  2. Running (i.e. with engine.run) a test workflow for a project here I am encountering a few "issues". None of these break the workflow, but quick a few error tracebacks/warnings are reported that detract from the UX. Some I encounter for both core.ssh and core.ssh_async, see 🐛 FileNotFoundError related to monitors #7086. However, one issue (see below) I am only encountering for core.ssh_async, and a cursory glance reveals it could be related to race conditions, so I'm reporting it here.
Transport already closed issue
Traceback (most recent call last):
  File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/engine/utils.py", line 205, in exponential_backoff_retry
    result = await coro()
             ^^^^^^^^^^^^
  File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/engine/processes/calcjobs/tasks.py", line 196, in do_update
    job_info = await cancellable.with_interrupt(update_request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/engine/utils.py", line 115, in with_interrupt
    result = await next(wait_iter)
             ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.12.12/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/tasks.py", line 631, in _wait_for_one
    return f.result()  # May raise f.exception().
           ^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.12.12/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/futures.py", line 202, in result
    raise self._exception.with_traceback(self._exception_tb)
  File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/engine/processes/calcjobs/manager.py", line 135, in _update_job_info
    self._jobs_cache = await self._get_jobs_from_scheduler()
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/engine/processes/calcjobs/manager.py", line 97, in _get_jobs_from_scheduler
    with self._transport_queue.request_transport(self._authinfo) as request:
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.12.12/Frameworks/Python.framework/Versions/3.12/lib/python3.12/contextlib.py", line 144, in __exit__
    next(self.gen)
  File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/engine/transports.py", line 122, in request_transport
    transport_request.future.result().close()
  File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/transports/transport.py", line 1838, in close
    return self.run_command_blocking(self.close_async)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/transports/transport.py", line 1832, in run_command_blocking
    return loop.run_until_complete(func(*args, **kwargs))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/engine/runners.py", line 160, in run_until_complete
    return self._loop.run_until_complete(future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mbercx/.aiida_venvs/defect/lib/python3.12/site-packages/nest_asyncio.py", line 98, in run_until_complete
    return f.result()
           ^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.12.12/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/futures.py", line 202, in result
    raise self._exception.with_traceback(self._exception_tb)
  File "/opt/homebrew/Cellar/[email protected]/3.12.12/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/tasks.py", line 314, in __step_run_and_handle_result
    result = coro.send(None)
             ^^^^^^^^^^^^^^^
  File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/transports/plugins/ssh_async.py", line 190, in close_async
    raise InvalidOperation('Cannot close the transport: it is already closed')
aiida.common.exceptions.InvalidOperation: Cannot close the transport: it is already closed

@khsrali
Copy link
Contributor Author

khsrali commented Nov 5, 2025

Thanks a lot @mbercx , for confirmation!

I was not aware of #7086, but that also seem to be a separate bug. Thanks for reporting.

I'm tracing back "Transport already closed issue", but that certainly has other cause and I'm resolving that as a separate bug.

Now I'm confident that this PR reasonably resolves the racing issue we had in mind. In addition, the test that I wrote simulates that scenario, which would clearly fail before this fix.

@khsrali khsrali force-pushed the scheduler_prejudice_bug branch from f9f7018 to 65e501c Compare November 5, 2025 09:54
@unkcpz
Copy link
Member

unkcpz commented Nov 5, 2025

Great you found the root of the problem, @khsrali.
Can you elaborate on which global state that operations race on? In which await point there are shared state changes happened for that global state?

I have a glimpse at the file, is it _job_cache the global state that potentially there can be multiple open transports try to write? If so, should the simple solution is to put a lock on it (by asyncio.Lock)? (the actor pattern also fit well here by using asyncio.Queue, if it is proved to be the self._jobs_cache = await self._get_jobs_from_scheduler() is the place where racing happens)

@khsrali
Copy link
Contributor Author

khsrali commented Nov 5, 2025

@unkcpz
Yes, it's exactly here self._jobs_cache = await self._get_jobs_from_scheduler().
I thought overall it's better to fix the racing logic in the code, but of course one could go also for asyncio.Lock.

asyncio.Lock might slightly slow down the process, as the pending requests have to hang in there. I don't have an estimate by how much, though.

I don't have strong opinions, but as it's now, the racing logic is prevented.

for future in self._job_update_requests.values():
if not future.done():
future.set_exception(exception)
for job_id in self._polling_jobs:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @khsrali for implementing my suggestion, I think the code is now clearer and less error-prone.

I still think we need to ponder one thing: Is it safe to iterate over self._polling_jobs? I don't think they necessarily need to be in sync with _job_update_requests. I suspect a safer solution is to still iterate over _job_update_requests as before, and check whether job_id is in self._polling_jobs before poping out the future.

Comment on lines +140 to +144
for job_id in self._polling_jobs:
future = self._job_update_requests.pop(job_id)
if future.done():
continue
future.set_exception(exception)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in this except branch we can preserve the original behaviour and except everything in _job_update_requests, since I believe that is the semantic meaning of this branch? It's not quite clear what are the possible exception causes, but this is a catch-all. Something like:

Suggested change
for job_id in self._polling_jobs:
future = self._job_update_requests.pop(job_id)
if future.done():
continue
future.set_exception(exception)
for job_id in set(self._job_update_requests.keys()):
future = self._job_update_requests.pop(job_id)
if not future.done():
future.set_exception(exception)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to have a test for this, but it might be hard.

Comment on lines +154 to +158
for job_id in self._polling_jobs:
future = self._job_update_requests.pop(job_id)
if future.done():
continue
future.set_result(self._jobs_cache.get(job_id, None))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is safer, because I don't see any guarantee that job_ids in self._polling_jobs are necessarily present in self._job_update_requests, the the pop call can throw a KeyError.

Suggested change
for job_id in self._polling_jobs:
future = self._job_update_requests.pop(job_id)
if future.done():
continue
future.set_result(self._jobs_cache.get(job_id, None))
for job_id in set(self._job_update_requests.keys()):
# Skip job_ids that were not polled yet (e.g. when the job update requests arrived in between `_get_jobs_from_scheduler` and `update_job_info`
# However, ff the future is already done, we want to pop it from the dict regardless (hence the or condition)
if not job_id in self._polling_jobs() or self._job_update:
future = self._job_update_requests.pop(job_id)
if not future.done():
future.set_result(self._jobs_cache.get(job_id, None))

@khsrali
Copy link
Contributor Author

khsrali commented Nov 9, 2025

I'm on holidays, I can only get back to this in two weeks.
I suggest to skip nitpick suggestions, and stick with what's absolutely necessarily.
This is an ongoing bug.

@GeigerJ2 GeigerJ2 moved this to In progress in aiida-core v2.7.2 Nov 10, 2025
@GeigerJ2 GeigerJ2 moved this from In progress to In review in aiida-core v2.7.2 Nov 11, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: In review

Development

Successfully merging this pull request may close these issues.

🐛 Async Engine: retrieval of files before the job is completed?

6 participants