Skip to content

Commit 876350f

Browse files
committed
[JobRerunner] fix to also support jobs with dictionaries as parameters
As we found out, some jobs get a dictionary as parameter, and not just an OpenStack ID as the code and comments suggest. That makes things a bit complicated, as we still want to prevent jobs running on the same id with the same callbacks from running, but need to store the payload of the request (the dict) now as well for re-execution.
1 parent c655d7f commit 876350f

File tree

2 files changed

+213
-61
lines changed

2 files changed

+213
-61
lines changed

networking_nsxv3/common/synchronization.py

Lines changed: 211 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
"""
22
Synchronization - classes related concurrent execution scheduling and limits
33
"""
4+
from typing import Callable, Union, Optional
5+
46
import eventlet
57
eventlet.monkey_patch()
68

@@ -66,12 +68,38 @@ def retry_next(self):
6668
return False
6769

6870

71+
CBPARAMS = Union[str, dict]
72+
73+
6974
class Runnable(object):
7075

71-
def __init__(self, idn, fn, priority=Priority.LOWEST):
76+
def __init__(self, fnparams: CBPARAMS, fn: Callable[[CBPARAMS], None], priority=Priority.LOWEST):
7277
self.priority = priority
73-
self.idn = idn
74-
self.fn = fn
78+
self._fnparams = fnparams
79+
self._fn = fn
80+
81+
# contradicting to code comments, we sometimes get a dictionary
82+
# as parameter for the callback. Some data from that
83+
# dictionary is then actually used, e.g. revision and resource id.
84+
# Not going to happen. Fixing that would require major
85+
# refactoring of the rpc and realizer objects. Not going to happen.
86+
#
87+
# Instead, in these cases we will still use the openstack id and the
88+
# name of the callback function to prevent parallel running of jobs.
89+
# But when a job is submitted to the Rerunner, it will keep track of the
90+
# dictionary contents if one exists and keep these for rerunning. So essentially
91+
# we create subjobs.
92+
# Realistically we need a better data structure than the active queue and Rerunner.
93+
94+
# fnparams is a str for most of the callbacks:
95+
if isinstance(fnparams, str):
96+
self.idn = fnparams
97+
elif isinstance(fnparams, dict):
98+
self.idn = fnparams['id']
99+
else:
100+
self.idn = str(fnparams)
101+
LOG.warning('unexpected type %s for job parameters of %s', type(fnparams), fn.__name__)
102+
75103
self._runcount = 0
76104
self._created = time.time()
77105
self._scheduled = None
@@ -81,7 +109,10 @@ def __init__(self, idn, fn, priority=Priority.LOWEST):
81109

82110
@property
83111
def identifier(self) -> tuple:
84-
return (self.idn, self.fn.__name__)
112+
return (self.idn, self._fn.__name__)
113+
114+
def debugid(self)->str:
115+
return str((self.idn, self._fn.__name__, str(self._fnparams)))
85116

86117
def set_scheduled(self):
87118
""" called when we submit the job to the worker pool """
@@ -90,7 +121,7 @@ def set_scheduled(self):
90121
self._jobdone = None
91122
self._rescheduled = None
92123

93-
def set_start(self):
124+
def _set_start(self):
94125
""" called in our wrapper when we actually start fn """
95126
# we need to reset the other timings
96127
# because we use the same job for rerunning
@@ -99,7 +130,7 @@ def set_start(self):
99130
self._rescheduled = None
100131
self._runcount += 1
101132

102-
def set_done(self):
133+
def _set_done(self):
103134
""" called in our wrapper when fn returns """
104135
self._jobdone = time.time()
105136
self._rescheduled = None
@@ -132,6 +163,13 @@ def get_statline(self) -> str:
132163
return (f"timings for job {self} - runcount: {self._runcount} age: {age} "
133164
f"scheduled: {scheduled} started: {started} runtime: {runtime} rescheduled: {rescheduled}")
134165

166+
def execute(self):
167+
self._set_start()
168+
try:
169+
self._fn(self._fnparams)
170+
finally:
171+
self._set_done()
172+
135173
def __repr__(self):
136174
# lets not just use the object id, maybe
137175
return str(self.identifier)
@@ -141,9 +179,11 @@ def __eq__(self, other):
141179
Note, the priority is not part of the comparison
142180
Thus if a runnable with higher priority is about to be
143181
added to the queue it will be rejected silently.
182+
To prevent starvation, the queue will update the priority of the
183+
existing element, in case it was lower than the item that was about to be added.
144184
"""
145185
if isinstance(other, Runnable):
146-
return (self.idn == other.idn and self.fn == other.fn)
186+
return (self._fnparams == other._fnparams and self._fn == other._fn)
147187
else:
148188
return False
149189

@@ -162,11 +202,6 @@ def __lt__(self, other):
162202

163203
return self.priority < other.priority
164204

165-
def __hash__(self):
166-
# with the original repr this is broken, as __eq__ takes the fn into
167-
# account as well!
168-
return hash((self.idn, self.fn))
169-
170205

171206
class UniqFiFoQueue(eventlet.queue.Queue):
172207
"""
@@ -238,14 +273,147 @@ def _get(self):
238273
return heapq.heappop(self.queue)
239274

240275

276+
class JobList():
277+
"""
278+
We are keeping track of the jobs and their parameters for the
279+
JobRerunner, based on the type of job and openstack id.
280+
There can be multiple similar jobs (same id, same method) but
281+
with different parameters, if the parameter is a dict.
282+
In these cases this JobList will keep track of the jobs, because
283+
a job with the same additional parameters will compare equal.
284+
285+
add:
286+
287+
When a job is added the _runnables list can be empty, then the job
288+
can run and we store it here for reference, with count 1.
289+
If the list is not empty the job can either already exist or it
290+
can be a job with different additional parameters.
291+
292+
If the job already exists, we increase the counter, and do not allow it to run.
293+
If the job does not exist, we add it to the list with count = 1 and allow it to run.
294+
295+
done:
296+
297+
When a job is done, we will look at our list, and decrease the counter.
298+
If the counter is 0, the job was not submitted a second time, and we can remove
299+
it from the list.
300+
If the counter is not 0, the job was requested to run again, and we keep it in the
301+
list with the updated counter.
302+
303+
done will then choose a job from the list, that is supposed to run again,
304+
remove it from the list and return it to the JobRerunner.
305+
306+
This might be the same job that was just finished or it could be a different one.
307+
For now we will choose the oldest one based on age, which should be the same job that
308+
was just done, but we might want to change that so we use a helper function for that
309+
for now in the POC.
310+
311+
"""
312+
def __init__(self):
313+
self._job_identifier: Optional[str] = None
314+
self._runnables: List[tuple[int, Runnable]] = []
315+
316+
def __len__(self):
317+
return len(self._runnables)
318+
319+
@property
320+
def size(self):
321+
return sum( count for count, _ in self._runnables)
322+
323+
def __repr__(self):
324+
return f"Joblist: {self._job_identifier}, len={len(self)}, {self._runnables}"
325+
326+
def add(self, job:Runnable)->bool:
327+
328+
if self._job_identifier is not None:
329+
if job.identifier != self._job_identifier:
330+
raise ValueError("Can only add jobs of same type to a JobList")
331+
else:
332+
self._job_identifier = job.identifier
333+
334+
# search through our list and update the counter or append the job:
335+
for index, (count, existing_job) in enumerate(self._runnables):
336+
if job == existing_job:
337+
count += 1
338+
self._runnables[index] = (count, existing_job)
339+
return False
340+
else:
341+
# this is the first of its kind, we can run it.
342+
# note that a job that gets re-executed will be removed from
343+
# the list (with the others of the same kind still present)
344+
# so when it returns it will be the only one of its kind and
345+
# can run. after it is finished a different one will be returned by done.
346+
self._runnables.append((1, job))
347+
return True
348+
349+
def _runnable_is_done(self, job:Runnable):
350+
# search through our list and update the counter or remove the job:
351+
for index, (count, existing_job) in enumerate(self._runnables):
352+
if job == existing_job:
353+
# we do not need this job with this parameters again, it is done,
354+
# so we remove it from the list.
355+
# Note: the list might not be empty!
356+
LOG.debug("Job %s is done, updating JobList, request count was: %d", job.debugid(), count)
357+
count -= 1
358+
if count <= 0:
359+
if count < 0:
360+
LOG.warning("Job count in JobList was %d for %s", count, job.debugid())
361+
del self._runnables[index]
362+
return
363+
self._runnables[index] = (count, existing_job)
364+
return
365+
else:
366+
# we should never mark a job done, that was not added in the first place,
367+
# if its not in the list, something is wrong
368+
raise KeyError(f'No such job {job}')
369+
370+
def _runnable_pop_next(self) -> Optional[Runnable]:
371+
""" find the next job to run and remove it from the list,
372+
or return None if there is None to run.
373+
"""
374+
if not self._runnables:
375+
return None
376+
# no job should currently be running, because we are in the "done" part of the
377+
# workflow. So we can choose any job we like, remove it from the list and run it.
378+
# when that job returns and needs no re-execution it will be removed from the list,
379+
# and we will not pop it here again, so the next one in line will be returned.
380+
# we always append to our list, so we can just pop the first one here and get the oldest.
381+
count, job = self._runnables.pop()
382+
LOG.debug("found job to run next with %d rerun requests open: %s", count, job)
383+
return job
384+
385+
def done(self, job:Runnable)->Optional[Runnable]:
386+
if self._job_identifier is not None:
387+
if job.identifier != self._job_identifier:
388+
raise ValueError("Can only remove jobs of same type from a JobList")
389+
else:
390+
raise KeyError("JobList is empty")
391+
392+
self._runnable_is_done(job)
393+
return self._runnable_pop_next()
394+
395+
def get_count(self, job):
396+
397+
if self._job_identifier is not None:
398+
if job.identifier != self._job_identifier:
399+
raise ValueError("Can only remove jobs of same type from a JobList")
400+
401+
for count, existing_job in self._runnables:
402+
if job == existing_job:
403+
return count
404+
405+
return 0
406+
407+
241408
class JobRerunner():
242409
""" Thread save data structure to reschedule jobs when they are already running
243410
244411
When a job is retrieved from the active queue and already running in a worker thread,
245412
there is a chance that another job for the same object is added to the active
246-
queue and also started in a worker. While there then is some locking happening
247-
that should prevent race conditions, this still blocks the worker thread(s),
248-
which degrades performance and re-executes the job unnecessarily often.
413+
queue and also started in a worker. To prevent race conditions, the worker threads
414+
will use a lock to prevent two jobs running on the same objects, but this leads to
415+
blocking of each of the affected workers, which degrades performance and also re-executes
416+
the jobs unnecessarily often.
249417
250418
To prevent this, we use this JobRerunner:
251419
@@ -275,8 +443,8 @@ class JobRerunner():
275443
_lname_torerun = 'JobRerunner-torerun'
276444

277445
def __init__(self):
278-
self._running = dict()
279-
self._to_rerun = collections.deque()
446+
self._running: collections.defaultdict[(int, str),JobList] = collections.defaultdict(JobList)
447+
self._to_rerun: collections.deque[Runnable] = collections.deque()
280448

281449
def get_rerunnable(self) -> Runnable:
282450
# Let's also use the LockManager that is used for locking
@@ -296,26 +464,19 @@ def get_rerunnable(self) -> Runnable:
296464

297465
def job_done(self, job: Runnable):
298466
LOG.debug("JobRerunner job_done called for %s", job)
467+
299468
with LockManager.get_lock(self._lname_running):
300-
count = self._running.get(job, 0)
301469

302-
if count == 1:
303-
del self._running[job]
470+
joblist: JobList = self._running[job.identifier]
471+
next_job = joblist.done(job)
472+
if not next_job:
473+
del self._running[job.identifier]
304474
LOG.info("JobRerunner (done, no reruns requested) %s", job.get_statline())
305-
elif count > 1:
306-
# we only allow exactly one job to run at a time,
307-
# all jobs arriving later will increase the counter while
308-
# the job is still running or they get re-queued.
309-
# if a job is in the ready deque it will at some point
310-
# re-appear and so we can forget about the counter.
311-
LOG.info("JobRerunner (done, %d reruns requested) %s", count - 1, job.get_statline())
312-
del self._running[job]
313-
with LockManager.get_lock(self._lname_torerun):
314-
self._to_rerun.append(job)
315475
else:
316-
# prevent the error from spreading
317-
del self._running[job]
318-
LOG.warning("JobRerunner job_done called too often for job %s", job)
476+
# we got a job to rerun from our helper
477+
LOG.info("JobRerunner (done, got rerun) done: %s next: %s", job.get_statline(), next_job.get_statline())
478+
with LockManager.get_lock(self._lname_torerun):
479+
self._to_rerun.append(next_job)
319480

320481
def add_job(self, job: Runnable) -> bool:
321482
""" Add job to list of jobs running/to be started or mark it for re-execution
@@ -327,22 +488,20 @@ def add_job(self, job: Runnable) -> bool:
327488
328489
"""
329490
with LockManager.get_lock(self._lname_running):
330-
count = self._running.get(job, 0)
331-
if count <= 0:
491+
joblist:JobList = self._running[job.identifier]
492+
493+
if joblist.add(job):
332494
# no job running, we can run the job
333-
# if the counter is < 0 our accounting is wrong,
334-
# so we fix it and run the job.
335-
self._running[job] = 1
336495
LOG.debug("JobRerunner no identical job is currently running, can start %s", job)
337496
return True
338-
339-
self._running[job] = count + 1
340-
LOG.debug("JobRerunner job %s already running, marked for rescheduling, count: %d ", job, count)
497+
else:
498+
count = joblist.get_count(job)
499+
LOG.debug("JobRerunner job %s already running, marked for rescheduling, count: %d ", job, count)
341500

342501
sum = 0
343-
for job, scount in self._running.items():
344-
sum += scount
345-
LOG.debug("JobRerunner stat: job %s is running, submission count: %d", job, scount)
502+
for identifier, joblist in self._running.items():
503+
sum += joblist.size
504+
LOG.debug("JobRerunner stat: job %s is running, submission count: %d", identifier, joblist.size)
346505

347506
# let's log these as info for debugging, they should be sufficient in prod
348507
# to find issues with the JobRerunner:
@@ -379,9 +538,14 @@ def __init__(self, active_size=INFINITY, passive_size=INFINITY,
379538
def run(self, priority, ids, fn):
380539
""" Submit a job with priority
381540
541+
Note: the second parameter apparently sometimes is a dictionary, in contrast
542+
to the documentation in the code!
543+
Fixing this would requires too much refactoring at the moment -- mutax
544+
382545
Keyword arguments:
383546
priority -- job priority of type Priority.class
384-
ids -- list of IDs (identifiers) that will be passed to the 'fn'
547+
ids -- list of OpenStack-IDs (identifiers) that will be passed to the 'fn'
548+
OR list of dictionaries(!) of OpenStack objects (containing their id)
385549
fn -- a function about to be executed by the runner with an argument ID
386550
"""
387551
if self._state != "started":
@@ -419,15 +583,13 @@ def _start(self):
419583
# check if we are allowed to run it,
420584
# if yes mark it as running and spawn it
421585
if self._rerunner.add_job(job):
422-
LOG.info(MESSAGE.format("Processing", job.idn, Priority(job.priority).name, job.fn.__name__))
586+
LOG.info(MESSAGE.format("Processing", job.idn, Priority(job.priority).name, job))
423587

424588
# ideally we would be able to add a callback to the
425589
# greenthread, but this is hidden in the pool, so
426590
# let's wrap the function once more.
427591
def wrap(rerun, ajob):
428-
ajob.set_start()
429-
ajob.fn(ajob.idn)
430-
ajob.set_done()
592+
ajob.execute()
431593
rerun.job_done(ajob)
432594

433595
job.set_scheduled()

0 commit comments

Comments
 (0)