-
Notifications
You must be signed in to change notification settings - Fork 232
Fix race condition in JobsList
#7061
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #7061 +/- ##
==========================================
- Coverage 77.60% 77.60% -0.00%
==========================================
Files 566 566
Lines 43549 43554 +5
==========================================
+ Hits 33793 33794 +1
- Misses 9756 9760 +4 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Do the slurm project know about this issue? |
|
Thanks @khsrali! Just testing your solution in the field now:
So it seems to fix the issue robustly for me. Regarding configuration, my main question is: does |
|
Thanks @mbercx for trying this out,
Not at all, it's placed before EBM call, and only on individual calls. I'd like to come up with some sorta bullet proof regression tests for this solution, to make 100% sure if this resolve the issue.
I couldn't find any similar reported issue online. Before any reporting, I'd like to make 100% sure if the lag in slurm is the issue. |
|
Great, thanks for the clarification @khsrali! In that case perhaps we can just set this sleep time to Fully support adding a test for this. From my end: I'll run ~1000 structures with |
| logger.info(f'scheduled request to update CalcJob<{node.pk}>') | ||
| ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) | ||
|
|
||
| if not node.get_last_job_info(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest using a while loop to fetch the job info with a time interval and timeout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while loop with timeout is an antipattern to dealing with remote communication. The transport already use transport.exec_command_wait to wait for the command to finish. The while loop is a tight loop that block the event loop and can cause performance downgrade. Meanwhile the error handling is hard to be standardlized inside. We don't want shot our foot with such workaround.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, the code waits a fixed 5 seconds (await asyncio.sleep(5)), even if the job info becomes available earlier. A better approach is to poll periodically with a timeout, like this:
interval = 1
timeout = 5
tstart = time.time()
while not node.get_last_job_info():
if time.time() - tstart > timeout:
break
await asyncio.sleep(interval)This approach still uses await, so it doesn’t block the event loop. It just allows earlier exit when the job info is ready, instead of always waiting the full 5 seconds.
I don't really understand your concern about this approach. It’s quite standard in async workflows where you want to wait for a condition to become true without blocking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately this will not work, because node.get_last_job_info() is only updated in func::do_update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. This becomes more complex. In this case, I would not sleep before the do_update, instead, we can handle it inside the do_udpate function.
We can use a flag has_job_info,
If job_info is None:
if not `already_had_job_info` and t < timeout:
# which means the job_info is not appear because of slurm , so we should wait instead of assuming it's done
else:
# do the normal code
else:
already_had_job_info = True # record that we already see the job.
...|
This workaround has completely stabilized the aiida interactions with the Slurm cluster I'm using, thank you! While the fix itself is brilliant, it's frankly concerning that Slurm has an underlying issue that requires such an unconventional solution.... |
|
Did you check the You may want to check this function: def get_jobs(
self,
jobs: list[str] | None = None,
user: str | None = None,
as_dict: bool = False,
) -> list[JobInfo] | dict[str, JobInfo]:
"""Return the list of currently active jobs.
:param jobs: A list of jobs to check; only these are checked.
:param user: A string with a user: only jobs of this user are checked.
:param as_dict: If ``False`` (default), a list of ``JobInfo`` objects is returned. If ``True``, a dictionary is
returned, where the ``job_id`` is the key and the values are the ``JobInfo`` objects.
:returns: List of active jobs.
"""
with self.transport:
retval, stdout, stderr = self.transport.exec_command_wait(self._get_joblist_command(jobs=jobs, user=user))
joblist = self._parse_joblist_output(retval, stdout, stderr)
if as_dict:
jobdict = {job.job_id: job for job in joblist}
if None in jobdict:
raise SchedulerError('Found at least one job without jobid')
return jobdict
return joblist@rikigigi @mbercx, can you check your daemon log to see if you can see any following error/warning messages? |
|
@unkcpz and @superstar54 I think there's a misunderstanding here, the issue is not that the job is not submitted or we don't have a job-id. |
Yes, no confusion on my side. My comment was only about replacing the hard-coded wait time with a more flexible approach. |
|
#4326 |
d146b8b to
9f8ddd1
Compare
JobsList
|
Thanks for the ping @khsrali! I see you've found a more robust/elegant solution? I'll do a bit of field testing to see if I run into any issues. 🚀 |
|
@mbercx yes! this hopefully is the correct fix. |
9f8ddd1 to
06a6b17
Compare
| if str(job_id) in self._inspecting_jobs: | ||
| future.set_result(self._jobs_cache.get(job_id, None)) | ||
| else: | ||
| racing_requests[job_id] = future |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you first check whether the self._jobs_cache.get resturns a non-None result? If so, then presumably you can set the future even if it has been racing? Or is it guaranteed to be None if the job was not in self._inspecting_jobs? But I might be misunderstanding.
Something like this:
| if str(job_id) in self._inspecting_jobs: | |
| future.set_result(self._jobs_cache.get(job_id, None)) | |
| else: | |
| racing_requests[job_id] = future | |
| job_status = self._jobs_cache.get(job_id, None) | |
| if str(job_id) in self._inspecting_jobs or job_status is not None: | |
| future.set_result(self._jobs_cache.get(job_id, None)) | |
| else: | |
| racing_requests[job_id] = future |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, looking at the code in _get_jobs_from_scheduler it seems like it will always be None?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the question
| racing_requests[job_id] = future | ||
| finally: | ||
| self._job_update_requests = {} | ||
| self._job_update_requests = racing_requests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is still a race condition here since you can overwrite a poorly timed call to self._job_update_requests.setdefault in request_job_info_update.
I think instead of this finally clause we should be poping the dict items as we iterate on them above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used a better pop solution, just to be safe.
P.S. I don't think if this ever happens, because the for loop is actually blocking the loop (although it's inside an async function), that means during the process exactly between
self._jobs_cache = await self._get_jobs_from_scheduler() and the end of the function, self._job_update_requests is really "frozen".
It's only when it reaches the end of the function, that it relinquishes resources to the loop. (there's nothing async in the function apart the await that I mentioned)
|
I've been doing some more field testing of this version of the PR:
Transport already closed issueTraceback (most recent call last):
File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/engine/utils.py", line 205, in exponential_backoff_retry
result = await coro()
^^^^^^^^^^^^
File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/engine/processes/calcjobs/tasks.py", line 196, in do_update
job_info = await cancellable.with_interrupt(update_request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/engine/utils.py", line 115, in with_interrupt
result = await next(wait_iter)
^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Cellar/[email protected]/3.12.12/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/tasks.py", line 631, in _wait_for_one
return f.result() # May raise f.exception().
^^^^^^^^^^
File "/opt/homebrew/Cellar/[email protected]/3.12.12/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/futures.py", line 202, in result
raise self._exception.with_traceback(self._exception_tb)
File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/engine/processes/calcjobs/manager.py", line 135, in _update_job_info
self._jobs_cache = await self._get_jobs_from_scheduler()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/engine/processes/calcjobs/manager.py", line 97, in _get_jobs_from_scheduler
with self._transport_queue.request_transport(self._authinfo) as request:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Cellar/[email protected]/3.12.12/Frameworks/Python.framework/Versions/3.12/lib/python3.12/contextlib.py", line 144, in __exit__
next(self.gen)
File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/engine/transports.py", line 122, in request_transport
transport_request.future.result().close()
File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/transports/transport.py", line 1838, in close
return self.run_command_blocking(self.close_async)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/transports/transport.py", line 1832, in run_command_blocking
return loop.run_until_complete(func(*args, **kwargs))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/engine/runners.py", line 160, in run_until_complete
return self._loop.run_until_complete(future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mbercx/.aiida_venvs/defect/lib/python3.12/site-packages/nest_asyncio.py", line 98, in run_until_complete
return f.result()
^^^^^^^^^^
File "/opt/homebrew/Cellar/[email protected]/3.12.12/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/futures.py", line 202, in result
raise self._exception.with_traceback(self._exception_tb)
File "/opt/homebrew/Cellar/[email protected]/3.12.12/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/tasks.py", line 314, in __step_run_and_handle_result
result = coro.send(None)
^^^^^^^^^^^^^^^
File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/transports/plugins/ssh_async.py", line 190, in close_async
raise InvalidOperation('Cannot close the transport: it is already closed')
aiida.common.exceptions.InvalidOperation: Cannot close the transport: it is already closed |
|
Thanks a lot @mbercx , for confirmation! I was not aware of #7086, but that also seem to be a separate bug. Thanks for reporting. I'm tracing back "Transport already closed issue", but that certainly has other cause and I'm resolving that as a separate bug. Now I'm confident that this PR reasonably resolves the racing issue we had in mind. In addition, the test that I wrote simulates that scenario, which would clearly fail before this fix. |
f9f7018 to
65e501c
Compare
|
Great you found the root of the problem, @khsrali. I have a glimpse at the file, is it |
|
@unkcpz
I don't have strong opinions, but as it's now, the racing logic is prevented. |
| for future in self._job_update_requests.values(): | ||
| if not future.done(): | ||
| future.set_exception(exception) | ||
| for job_id in self._polling_jobs: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @khsrali for implementing my suggestion, I think the code is now clearer and less error-prone.
I still think we need to ponder one thing: Is it safe to iterate over self._polling_jobs? I don't think they necessarily need to be in sync with _job_update_requests. I suspect a safer solution is to still iterate over _job_update_requests as before, and check whether job_id is in self._polling_jobs before poping out the future.
| for job_id in self._polling_jobs: | ||
| future = self._job_update_requests.pop(job_id) | ||
| if future.done(): | ||
| continue | ||
| future.set_exception(exception) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in this except branch we can preserve the original behaviour and except everything in _job_update_requests, since I believe that is the semantic meaning of this branch? It's not quite clear what are the possible exception causes, but this is a catch-all. Something like:
| for job_id in self._polling_jobs: | |
| future = self._job_update_requests.pop(job_id) | |
| if future.done(): | |
| continue | |
| future.set_exception(exception) | |
| for job_id in set(self._job_update_requests.keys()): | |
| future = self._job_update_requests.pop(job_id) | |
| if not future.done(): | |
| future.set_exception(exception) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to have a test for this, but it might be hard.
| for job_id in self._polling_jobs: | ||
| future = self._job_update_requests.pop(job_id) | ||
| if future.done(): | ||
| continue | ||
| future.set_result(self._jobs_cache.get(job_id, None)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is safer, because I don't see any guarantee that job_ids in self._polling_jobs are necessarily present in self._job_update_requests, the the pop call can throw a KeyError.
| for job_id in self._polling_jobs: | |
| future = self._job_update_requests.pop(job_id) | |
| if future.done(): | |
| continue | |
| future.set_result(self._jobs_cache.get(job_id, None)) | |
| for job_id in set(self._job_update_requests.keys()): | |
| # Skip job_ids that were not polled yet (e.g. when the job update requests arrived in between `_get_jobs_from_scheduler` and `update_job_info` | |
| # However, ff the future is already done, we want to pop it from the dict regardless (hence the or condition) | |
| if not job_id in self._polling_jobs() or self._job_update: | |
| future = self._job_update_requests.pop(job_id) | |
| if not future.done(): | |
| future.set_result(self._jobs_cache.get(job_id, None)) |
|
I'm on holidays, I can only get back to this in two weeks. |
Edit:
Problem
The JobsList class had a race condition where:
This premature "DONE" status causes several critical issues:
This issue only surfaces when using async transport plugins like core.ssh_async or aiida-firecrest, where the timing conditions make the race condition more likely to occur.
Solution
Only resolve futures for jobs that were actually inspected by the scheduler
Testing
Added test_prevent_racing_condition which explicitly tests the race condition scenario