Skip to content

Commit e516072

Browse files
committed
[JobRerunner] fix to also support jobs with dictionaries as parameters
As we found out, some jobs get a dictionary as parameter, and not just an OpenStack ID as the code and comments suggest. That makes things a bit complicated, as we still want to prevent jobs running on the same id with the same callbacks from running, but need to store the payload of the request (the dict) now as well for re-execution.
1 parent c655d7f commit e516072

File tree

2 files changed

+215
-61
lines changed

2 files changed

+215
-61
lines changed

networking_nsxv3/common/synchronization.py

Lines changed: 213 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
"""
22
Synchronization - classes related concurrent execution scheduling and limits
33
"""
4+
from typing import Callable, Union, Optional
5+
46
import eventlet
57
eventlet.monkey_patch()
68

@@ -66,12 +68,40 @@ def retry_next(self):
6668
return False
6769

6870

71+
CBPARAMS = Union[str, dict]
72+
73+
6974
class Runnable(object):
7075

71-
def __init__(self, idn, fn, priority=Priority.LOWEST):
76+
def __init__(self, fnparams: CBPARAMS, fn: Callable[[CBPARAMS], None], priority=Priority.LOWEST):
7277
self.priority = priority
73-
self.idn = idn
74-
self.fn = fn
78+
79+
# for consistent hashing, these must never be changed, when a Runnable is used as hashable:
80+
self._fnparams = fnparams
81+
self._fn = fn
82+
83+
# contradicting to code comments, we sometimes get a dictionary
84+
# as parameter for the callback. Some data from that
85+
# dictionary is then actually used, e.g. revision and resource id.
86+
# Not going to happen. Fixing that would require major
87+
# refactoring of the rpc and realizer objects. Not going to happen.
88+
#
89+
# Instead, in these cases we will still use the openstack id and the
90+
# name of the callback function to prevent parallel running of jobs.
91+
# But when a job is submitted to the Rerunner, it will keep track of the
92+
# dictionary contents if one exists and keep these for rerunning. So essentially
93+
# we create subjobs.
94+
# Realistically we need a better data structure than the active queue and Rerunner.
95+
96+
# fnparams is a str for most of the callbacks:
97+
if isinstance(fnparams, str):
98+
self.idn = fnparams
99+
elif isinstance(fnparams, dict):
100+
self.idn = fnparams['id']
101+
else:
102+
self.idn = str(fnparams)
103+
LOG.warning('unexpected type %s for job parameters of %s', type(fnparams), fn.__name__)
104+
75105
self._runcount = 0
76106
self._created = time.time()
77107
self._scheduled = None
@@ -81,7 +111,10 @@ def __init__(self, idn, fn, priority=Priority.LOWEST):
81111

82112
@property
83113
def identifier(self) -> tuple:
84-
return (self.idn, self.fn.__name__)
114+
return (self.idn, self._fn.__name__)
115+
116+
def debugid(self)->str:
117+
return str((self.idn, self._fn.__name__, str(self._fnparams)))
85118

86119
def set_scheduled(self):
87120
""" called when we submit the job to the worker pool """
@@ -90,7 +123,7 @@ def set_scheduled(self):
90123
self._jobdone = None
91124
self._rescheduled = None
92125

93-
def set_start(self):
126+
def _set_start(self):
94127
""" called in our wrapper when we actually start fn """
95128
# we need to reset the other timings
96129
# because we use the same job for rerunning
@@ -99,7 +132,7 @@ def set_start(self):
99132
self._rescheduled = None
100133
self._runcount += 1
101134

102-
def set_done(self):
135+
def _set_done(self):
103136
""" called in our wrapper when fn returns """
104137
self._jobdone = time.time()
105138
self._rescheduled = None
@@ -132,6 +165,13 @@ def get_statline(self) -> str:
132165
return (f"timings for job {self} - runcount: {self._runcount} age: {age} "
133166
f"scheduled: {scheduled} started: {started} runtime: {runtime} rescheduled: {rescheduled}")
134167

168+
def execute(self):
169+
self._set_start()
170+
try:
171+
self._fn(self._fnparams)
172+
finally:
173+
self._set_done()
174+
135175
def __repr__(self):
136176
# lets not just use the object id, maybe
137177
return str(self.identifier)
@@ -141,9 +181,11 @@ def __eq__(self, other):
141181
Note, the priority is not part of the comparison
142182
Thus if a runnable with higher priority is about to be
143183
added to the queue it will be rejected silently.
184+
To prevent starvation, the queue will update the priority of the
185+
existing element, in case it was lower than the item that was about to be added.
144186
"""
145187
if isinstance(other, Runnable):
146-
return (self.idn == other.idn and self.fn == other.fn)
188+
return (self._fnparams == other._fnparams and self._fn == other._fn)
147189
else:
148190
return False
149191

@@ -162,11 +204,6 @@ def __lt__(self, other):
162204

163205
return self.priority < other.priority
164206

165-
def __hash__(self):
166-
# with the original repr this is broken, as __eq__ takes the fn into
167-
# account as well!
168-
return hash((self.idn, self.fn))
169-
170207

171208
class UniqFiFoQueue(eventlet.queue.Queue):
172209
"""
@@ -238,14 +275,147 @@ def _get(self):
238275
return heapq.heappop(self.queue)
239276

240277

278+
class JobList():
279+
"""
280+
We are keeping track of the jobs and their parameters for the
281+
JobRerunner, based on the type of job and openstack id.
282+
There can be multiple similar jobs (same id, same method) but
283+
with different parameters, if the parameter is a dict.
284+
In these cases this JobList will keep track of the jobs, because
285+
a job with the same additional parameters will compare equal.
286+
287+
add:
288+
289+
When a job is added the _runnables list can be empty, then the job
290+
can run and we store it here for reference, with count 1.
291+
If the list is not empty the job can either already exist or it
292+
can be a job with different additional parameters.
293+
294+
If the job already exists, we increase the counter, and do not allow it to run.
295+
If the job does not exist, we add it to the list with count = 1 and allow it to run.
296+
297+
done:
298+
299+
When a job is done, we will look at our list, and decrease the counter.
300+
If the counter is 0, the job was not submitted a second time, and we can remove
301+
it from the list.
302+
If the counter is not 0, the job was requested to run again, and we keep it in the
303+
list with the updated counter.
304+
305+
done will then choose a job from the list, that is supposed to run again,
306+
remove it from the list and return it to the JobRerunner.
307+
308+
This might be the same job that was just finished or it could be a different one.
309+
For now we will choose the oldest one based on age, which should be the same job that
310+
was just done, but we might want to change that so we use a helper function for that
311+
for now in the POC.
312+
313+
"""
314+
def __init__(self):
315+
self._job_identifier: Optional[str] = None
316+
self._runnables: List[tuple[int, Runnable]] = []
317+
318+
def __len__(self):
319+
return len(self._runnables)
320+
321+
@property
322+
def size(self):
323+
return sum( count for count, _ in self._runnables)
324+
325+
def __repr__(self):
326+
return f"Joblist: {self._job_identifier}, len={len(self)}, {self._runnables}"
327+
328+
def add(self, job:Runnable)->bool:
329+
330+
if self._job_identifier is not None:
331+
if job.identifier != self._job_identifier:
332+
raise ValueError("Can only add jobs of same type to a JobList")
333+
else:
334+
self._job_identifier = job.identifier
335+
336+
# search through our list and update the counter or append the job:
337+
for index, (count, existing_job) in enumerate(self._runnables):
338+
if job == existing_job:
339+
count += 1
340+
self._runnables[index] = (count, existing_job)
341+
return False
342+
else:
343+
# this is the first of its kind, we can run it.
344+
# note that a job that gets re-executed will be removed from
345+
# the list (with the others of the same kind still present)
346+
# so when it returns it will be the only one of its kind and
347+
# can run. after it is finished a different one will be returned by done.
348+
self._runnables.append((1, job))
349+
return True
350+
351+
def _runnable_is_done(self, job:Runnable):
352+
# search through our list and update the counter or remove the job:
353+
for index, (count, existing_job) in enumerate(self._runnables):
354+
if job == existing_job:
355+
# we do not need this job with this parameters again, it is done,
356+
# so we remove it from the list.
357+
# Note: the list might not be empty!
358+
LOG.debug("Job %s is done, updating JobList, request count was: %d", job.debugid(), count)
359+
count -= 1
360+
if count <= 0:
361+
if count < 0:
362+
LOG.warning("Job count in JobList was %d for %s", count, job.debugid())
363+
del self._runnables[index]
364+
return
365+
self._runnables[index] = (count, existing_job)
366+
return
367+
else:
368+
# we should never mark a job done, that was not added in the first place,
369+
# if its not in the list, something is wrong
370+
raise KeyError(f'No such job {job}')
371+
372+
def _runnable_pop_next(self) -> Optional[Runnable]:
373+
""" find the next job to run and remove it from the list,
374+
or return None if there is None to run.
375+
"""
376+
if not self._runnables:
377+
return None
378+
# no job should currently be running, because we are in the "done" part of the
379+
# workflow. So we can choose any job we like, remove it from the list and run it.
380+
# when that job returns and needs no re-execution it will be removed from the list,
381+
# and we will not pop it here again, so the next one in line will be returned.
382+
# we always append to our list, so we can just pop the first one here and get the oldest.
383+
count, job = self._runnables.pop()
384+
LOG.debug("found job to run next with %d rerun requests open: %s", count, job)
385+
return job
386+
387+
def done(self, job:Runnable)->Optional[Runnable]:
388+
if self._job_identifier is not None:
389+
if job.identifier != self._job_identifier:
390+
raise ValueError("Can only remove jobs of same type from a JobList")
391+
else:
392+
raise KeyError("JobList is empty")
393+
394+
self._runnable_is_done(job)
395+
return self._runnable_pop_next()
396+
397+
def get_count(self, job):
398+
399+
if self._job_identifier is not None:
400+
if job.identifier != self._job_identifier:
401+
raise ValueError("Can only remove jobs of same type from a JobList")
402+
403+
for count, existing_job in self._runnables:
404+
if job == existing_job:
405+
return count
406+
407+
return 0
408+
409+
241410
class JobRerunner():
242411
""" Thread save data structure to reschedule jobs when they are already running
243412
244413
When a job is retrieved from the active queue and already running in a worker thread,
245414
there is a chance that another job for the same object is added to the active
246-
queue and also started in a worker. While there then is some locking happening
247-
that should prevent race conditions, this still blocks the worker thread(s),
248-
which degrades performance and re-executes the job unnecessarily often.
415+
queue and also started in a worker. To prevent race conditions, the worker threads
416+
will use a lock to prevent two jobs running on the same objects, but this leads to
417+
blocking of each of the affected workers, which degrades performance and also re-executes
418+
the jobs unnecessarily often.
249419
250420
To prevent this, we use this JobRerunner:
251421
@@ -275,8 +445,8 @@ class JobRerunner():
275445
_lname_torerun = 'JobRerunner-torerun'
276446

277447
def __init__(self):
278-
self._running = dict()
279-
self._to_rerun = collections.deque()
448+
self._running: collections.defaultdict[(int, str),JobList] = collections.defaultdict(JobList)
449+
self._to_rerun: collections.deque[Runnable] = collections.deque()
280450

281451
def get_rerunnable(self) -> Runnable:
282452
# Let's also use the LockManager that is used for locking
@@ -296,26 +466,19 @@ def get_rerunnable(self) -> Runnable:
296466

297467
def job_done(self, job: Runnable):
298468
LOG.debug("JobRerunner job_done called for %s", job)
469+
299470
with LockManager.get_lock(self._lname_running):
300-
count = self._running.get(job, 0)
301471

302-
if count == 1:
303-
del self._running[job]
472+
joblist: JobList = self._running[job.identifier]
473+
next_job = joblist.done(job)
474+
if not next_job:
475+
del self._running[job.identifier]
304476
LOG.info("JobRerunner (done, no reruns requested) %s", job.get_statline())
305-
elif count > 1:
306-
# we only allow exactly one job to run at a time,
307-
# all jobs arriving later will increase the counter while
308-
# the job is still running or they get re-queued.
309-
# if a job is in the ready deque it will at some point
310-
# re-appear and so we can forget about the counter.
311-
LOG.info("JobRerunner (done, %d reruns requested) %s", count - 1, job.get_statline())
312-
del self._running[job]
313-
with LockManager.get_lock(self._lname_torerun):
314-
self._to_rerun.append(job)
315477
else:
316-
# prevent the error from spreading
317-
del self._running[job]
318-
LOG.warning("JobRerunner job_done called too often for job %s", job)
478+
# we got a job to rerun from our helper
479+
LOG.info("JobRerunner (done, got rerun) done: %s next: %s", job.get_statline(), next_job.get_statline())
480+
with LockManager.get_lock(self._lname_torerun):
481+
self._to_rerun.append(next_job)
319482

320483
def add_job(self, job: Runnable) -> bool:
321484
""" Add job to list of jobs running/to be started or mark it for re-execution
@@ -327,22 +490,20 @@ def add_job(self, job: Runnable) -> bool:
327490
328491
"""
329492
with LockManager.get_lock(self._lname_running):
330-
count = self._running.get(job, 0)
331-
if count <= 0:
493+
joblist:JobList = self._running[job.identifier]
494+
495+
if joblist.add(job):
332496
# no job running, we can run the job
333-
# if the counter is < 0 our accounting is wrong,
334-
# so we fix it and run the job.
335-
self._running[job] = 1
336497
LOG.debug("JobRerunner no identical job is currently running, can start %s", job)
337498
return True
338-
339-
self._running[job] = count + 1
340-
LOG.debug("JobRerunner job %s already running, marked for rescheduling, count: %d ", job, count)
499+
else:
500+
count = joblist.get_count(job)
501+
LOG.debug("JobRerunner job %s already running, marked for rescheduling, count: %d ", job, count)
341502

342503
sum = 0
343-
for job, scount in self._running.items():
344-
sum += scount
345-
LOG.debug("JobRerunner stat: job %s is running, submission count: %d", job, scount)
504+
for identifier, joblist in self._running.items():
505+
sum += joblist.size
506+
LOG.debug("JobRerunner stat: job %s is running, submission count: %d", identifier, joblist.size)
346507

347508
# let's log these as info for debugging, they should be sufficient in prod
348509
# to find issues with the JobRerunner:
@@ -379,9 +540,14 @@ def __init__(self, active_size=INFINITY, passive_size=INFINITY,
379540
def run(self, priority, ids, fn):
380541
""" Submit a job with priority
381542
543+
Note: the second parameter apparently sometimes is a dictionary, in contrast
544+
to the documentation in the code!
545+
Fixing this would requires too much refactoring at the moment -- mutax
546+
382547
Keyword arguments:
383548
priority -- job priority of type Priority.class
384-
ids -- list of IDs (identifiers) that will be passed to the 'fn'
549+
ids -- list of OpenStack-IDs (identifiers) that will be passed to the 'fn'
550+
OR list of dictionaries(!) of OpenStack objects (containing their id)
385551
fn -- a function about to be executed by the runner with an argument ID
386552
"""
387553
if self._state != "started":
@@ -419,15 +585,13 @@ def _start(self):
419585
# check if we are allowed to run it,
420586
# if yes mark it as running and spawn it
421587
if self._rerunner.add_job(job):
422-
LOG.info(MESSAGE.format("Processing", job.idn, Priority(job.priority).name, job.fn.__name__))
588+
LOG.info(MESSAGE.format("Processing", job.idn, Priority(job.priority).name, job))
423589

424590
# ideally we would be able to add a callback to the
425591
# greenthread, but this is hidden in the pool, so
426592
# let's wrap the function once more.
427593
def wrap(rerun, ajob):
428-
ajob.set_start()
429-
ajob.fn(ajob.idn)
430-
ajob.set_done()
594+
ajob.execute()
431595
rerun.job_done(ajob)
432596

433597
job.set_scheduled()

0 commit comments

Comments
 (0)