Skip to content

Commit f9f7018

Browse files
committed
hypothetical racing prevented
1 parent 06a6b17 commit f9f7018

File tree

1 file changed

+17
-16
lines changed

1 file changed

+17
-16
lines changed

src/aiida/engine/processes/calcjobs/manager.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def __init__(self, authinfo: AuthInfo, transport_queue: 'TransportQueue', last_u
6161
self._job_update_requests: Dict[Hashable, asyncio.Future] = {} # Mapping: {job_id: Future}
6262
self._last_updated = last_updated
6363
self._update_handle: Optional[asyncio.TimerHandle] = None
64-
self._inspecting_jobs: List[str] = []
64+
self._polling_jobs: List[str] = []
6565

6666
@property
6767
def logger(self) -> logging.Logger:
@@ -102,11 +102,11 @@ async def _get_jobs_from_scheduler(self) -> Dict[Hashable, 'JobInfo']:
102102
scheduler.set_transport(transport)
103103

104104
kwargs: Dict[str, Any] = {'as_dict': True}
105-
self._inspecting_jobs = self._get_jobs_with_scheduler()
105+
self._polling_jobs = self._get_jobs_with_scheduler()
106106
if scheduler.get_feature('can_query_by_user'):
107107
kwargs['user'] = '$USER'
108108
else:
109-
kwargs['jobs'] = self._inspecting_jobs
109+
kwargs['jobs'] = self._polling_jobs
110110

111111
scheduler_response = scheduler.get_jobs(**kwargs)
112112

@@ -121,12 +121,14 @@ async def _get_jobs_from_scheduler(self) -> Dict[Hashable, 'JobInfo']:
121121
return jobs_cache
122122

123123
async def _update_job_info(self) -> None:
124-
"""Update all of the job information objects.
124+
"""Update job information and resolve pending requests.
125125
126126
This will set the futures for all pending update requests where the corresponding job has a new status compared
127127
to the last update.
128+
Note, _job_update_requests is dynamic, and might get new entries while polling from scheduler.
129+
Therefore we only update the jobs actually polled, and the new entries will be handled in the next update.
128130
"""
129-
racing_requests = {}
131+
130132
try:
131133
if not self._update_requests_outstanding():
132134
return
@@ -135,9 +137,11 @@ async def _update_job_info(self) -> None:
135137
self._jobs_cache = await self._get_jobs_from_scheduler()
136138
except Exception as exception:
137139
# Set the exception on all the update futures
138-
for future in self._job_update_requests.values():
139-
if not future.done():
140-
future.set_exception(exception)
140+
for job_id in self._polling_jobs:
141+
future = self._job_update_requests.pop(job_id)
142+
if future.done():
143+
continue
144+
future.set_exception(exception)
141145

142146
# Reset the `_update_handle` manually. Normally this is done in the `updating` coroutine, but since we
143147
# reraise this exception, that code path is never hit. If the next time a request comes in, the method
@@ -147,14 +151,11 @@ async def _update_job_info(self) -> None:
147151

148152
raise
149153
else:
150-
for job_id, future in self._job_update_requests.items():
151-
if not future.done():
152-
if str(job_id) in self._inspecting_jobs:
153-
future.set_result(self._jobs_cache.get(job_id, None))
154-
else:
155-
racing_requests[job_id] = future
156-
finally:
157-
self._job_update_requests = racing_requests
154+
for job_id in self._polling_jobs:
155+
future = self._job_update_requests.pop(job_id)
156+
if future.done():
157+
continue
158+
future.set_result(self._jobs_cache.get(job_id, None))
158159

159160
@contextlib.contextmanager
160161
def request_job_info_update(self, authinfo: AuthInfo, job_id: Hashable) -> Iterator['asyncio.Future[JobInfo]']:

0 commit comments

Comments
 (0)