Skip to content

Commit 1358eec

Browse files
sven-rosenzweigmutax
authored andcommitted
Reworking scheduling of jobs to runners
The main change in this PR is the modification of the way jobs are handled in the NSX-T agent. Please see the JobRerunner class for an in-depth explanation of the changes. Before this commit jobs are added to one of two queues, called active and passive. The active queue contains all requests coming in via API calls, while the passive queue is filled with maintenance and resync jobs. Both queues used to be priority queues allowing each element to be added only once. Jobs then were taken from the active queue until empty, then jobs from the passive queue would be added to the active queue. Jobs taken from the active queue would then be submitted to a worker pool allowing up to 40 greenthreads to run the jobs concurrently However, to avoid race conditions, only one job is allowed to run per OpenStack-ID. If more than one job is scheduled to run on the pool, these additional jobs will wait on a lock and block the worker thread until the first job is done. This means that the agent can be blocked and appear fully occupied, handling 40 tasks simultaneously, while in reality most or all tasks are waiting for each other. Instead of scheduling all jobs to the worker pool immediately, risking a lock, we now first check if the same job is already running, and if this is the case we will rerun the job after it has finished. We then can schedule another job that can run to the worker instead. We need to rerun the job, because a jobs can run for several seconds and new API requests could arrive during that time. With this change we also prevent rerunning the same job more than once, when additional requests arrive while the job is already marked for re-execution. Additional fixes and enhancements in this commit: UniqPriorityQueue: - fixing add() If a job is about to be added a second time, but with a higher priority, the job will correctly not be added, but the priority of the existing job was not updated. This means jobs from the passive queue, that have a lower priority, will always be executed last, even if a high-priority job arrived via API call. We changed the active queue to a Fifo, to prevent passive jobs to never get executed and keep execution order of api calls if possible. With the fix in place, however, we can switch back to a prio queue if needed. Runnable: - fix hash() and make repr more verbose The Runnable class was not following the requirements for objects that compare equal to also have the same hashvalue. Also the Runnable was only taking the OpenStack ID into account, not the name of the function. Thus a Runnable could, e.g., not be used correctly as a key in dictionaries. - __repr__ repr was updated to include the name of the function, so we see what kind of update is being executed in the logs. - __lt__ making Runnable order items with same priority by age, preventing jobs from overtaking each other. - add timing info for logging We currently do not get good info about the timings or basic stats of the jobs running. This commit adds timing info to Runnable and a method to extract them as string for logging.
1 parent 4d7abed commit 1358eec

File tree

2 files changed

+458
-14
lines changed

2 files changed

+458
-14
lines changed

networking_nsxv3/common/synchronization.py

Lines changed: 276 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
eventlet.monkey_patch()
66

77
import networking_nsxv3.prometheus.exporter as EXPORTER
8+
from networking_nsxv3.common.locking import LockManager
89
from oslo_log import log as logging
910
from oslo_config import cfg
1011
import enum
@@ -71,9 +72,69 @@ def __init__(self, idn, fn, priority=Priority.LOWEST):
7172
self.priority = priority
7273
self.idn = idn
7374
self.fn = fn
75+
self._runcount = 0
76+
self._created = time.time()
77+
self._scheduled = None
78+
self._started = None
79+
self._jobdone = None
80+
self._rescheduled = None
81+
82+
@property
83+
def identifier(self) -> tuple:
84+
return (self.idn, self.fn.__name__)
85+
86+
def set_scheduled(self):
87+
""" called when we submit the job to the worker pool """
88+
self._scheduled = time.time()
89+
self._started = None
90+
self._jobdone = None
91+
self._rescheduled = None
92+
93+
def set_start(self):
94+
""" called in our wrapper when we actually start fn """
95+
# we need to reset the other timings
96+
# because we use the same job for rerunning
97+
self._started = time.time()
98+
self._jobdone = None
99+
self._rescheduled = None
100+
self._runcount += 1
101+
102+
def set_done(self):
103+
""" called in our wrapper when fn returns """
104+
self._jobdone = time.time()
105+
self._rescheduled = None
106+
107+
def set_rescheduled(self):
108+
""" called when the job is taken out of the rerun-queue """
109+
self._rescheduled = time.time()
110+
111+
def get_statline(self) -> str:
112+
113+
age = f"{time.time() - self._created:0.4f}"
114+
115+
scheduled = '-'
116+
started = '-'
117+
runtime = '-'
118+
rescheduled = '-'
119+
120+
if self._scheduled and self._created:
121+
scheduled = f"{self._scheduled - self._created:0.4f}"
122+
123+
if self._started and self._scheduled:
124+
started = f"{self._started - self._scheduled:0.4f}"
125+
126+
if self._jobdone and self._started:
127+
runtime = f"{self._jobdone - self._started:0.4f}"
128+
129+
if self._rescheduled and self._jobdone:
130+
rescheduled = f"{self._rescheduled - self._jobdone:0.4f}"
131+
132+
return (f"timings for job {self} - runcount: {self._runcount} age: {age} "
133+
f"scheduled: {scheduled} started: {started} runtime: {runtime} rescheduled: {rescheduled}")
74134

75135
def __repr__(self):
76-
return str(self.idn)
136+
# lets not just use the object id, maybe
137+
return str(self.identifier)
77138

78139
def __eq__(self, other):
79140
"""
@@ -90,25 +151,204 @@ def __ne__(self, other):
90151
return (not self.__eq__(other))
91152

92153
def __lt__(self, other):
93-
""" Order Runnable by their priority """
154+
""" Order Runnable by their priority
155+
Only the passive queue is ordered by priority.
156+
The active queue is FiFo.
157+
"""
158+
# if the priority is equal, we want to order
159+
# by creation time to handle oldest jobs first
160+
if self.priority == other.priority:
161+
return self._created < other._created
162+
94163
return self.priority < other.priority
95164

96165
def __hash__(self):
97-
return hash(self.__repr__())
166+
# with the original repr this is broken, as __eq__ takes the fn into
167+
# account as well!
168+
return hash((self.idn, self.fn))
169+
170+
171+
class UniqFiFoQueue(eventlet.queue.Queue):
172+
"""
173+
A subclass of :class:`Queue` that maintains job order by insertion.
174+
175+
Problem with the old approach:
176+
Jobs may starve in the active queue.
177+
178+
- The internal sync run adds up to 20 "outdated" objects to the passive queue.
179+
- If space is available, jobs move from the passive to the active queue.
180+
- The active queue prioritizes jobs based on priority while enforcing uniqueness
181+
(determined by OpenStack ID and execution method).
182+
183+
Issues:
184+
1. Job starvation – If high-priority jobs keep arriving and the agent is at full capacity,
185+
low-priority jobs may never get processed.
186+
2. Blocking of new high-priority jobs – Lower-priority jobs in the passive queue
187+
can prevent new high-priority jobs from being added due to the uniqueness constraint
188+
(see the `__eq__` method of the `Runnable` class).
189+
3. Adding a job with the **HIGHEST** priority does not
190+
guarantee execution in insertion order. See
191+
https://docs.python.org/3/library/heapq.html#priority-queue-implementation-notes
192+
for details.
193+
"""
194+
195+
def _init(self, maxsize):
196+
self.queue = collections.deque()
197+
198+
def _put(self, item):
199+
if item not in self.queue:
200+
# Add item to the right side of the deque
201+
self.queue.append(item)
202+
self._put_bookkeeping()
203+
else:
204+
LOG.info("Not adding item %s to fifo queue, already present!", item)
205+
206+
def _get(self):
207+
return self.queue.popleft()
98208

99209

100210
class UniqPriorityQueue(eventlet.queue.Queue):
101211

102212
def _init(self, maxsize):
103213
self.queue = []
104214

105-
def _put(self, item, heappush=heapq.heappush):
106-
if item not in self.queue:
107-
heappush(self.queue, item)
215+
def _put(self, item):
216+
217+
try:
218+
x = self.queue[self.queue.index(item)]
219+
# if the prio of the new item is higher (smaller value)
220+
# update the prio of the existing job and repair the heap
221+
if item.priority < x.priority:
222+
LOG.debug("Not adding item %s to prio queue, already present, but updating prio %s -> %s",
223+
item, x.priority, item.priority)
224+
x.priority = item.priority
225+
heapq.heapify(self.queue)
226+
else:
227+
LOG.debug("Not adding item %s to prio queue, already present!", item)
228+
return
229+
except ValueError:
230+
# item is not in list
231+
pass
232+
233+
# item not found, add it
234+
heapq.heappush(self.queue, item)
108235
self._put_bookkeeping()
109236

110-
def _get(self, heappop=heapq.heappop):
111-
return heappop(self.queue)
237+
def _get(self):
238+
return heapq.heappop(self.queue)
239+
240+
241+
class JobRerunner():
242+
""" Thread save data structure to reschedule jobs when they are already running
243+
244+
When a job is retrieved from the active queue and already running in a worker thread,
245+
there is a chance that another job for the same object is added to the active
246+
queue and also started in a worker. While there then is some locking happening
247+
that should prevent race conditions, this still blocks the worker thread(s),
248+
which degrades performance and re-executes the job unnecessarily often.
249+
250+
To prevent this, we use this JobRerunner:
251+
252+
In this class we keep track of all jobs currently running in the workers.
253+
254+
When taking a new job from the active queue, we check if this job is
255+
already running. If this is not the case, we will add it to our book keeping
256+
here and it will be sent to a worker, wrapped in a function that will call
257+
the JobRunner on job completion to update the book keeping.
258+
259+
If we find the job already running, the new job will be dropped, but a counter
260+
for the job will be increased to mark the job for re-execution once its done,
261+
in case a change to the object had occured while the worker was running.
262+
263+
When a worker is done with a job, it will notify us and we can either
264+
remove the id from our book keeping or, if a new job was added in the meantime,
265+
mark it as ready for re-execution. Note that a job will intentionally only be
266+
re-executed once, independently of how often it was added while a worker was
267+
already running it.
268+
269+
Before taking a new job from the active queue in the runner, we first check in
270+
the JobRerunner for a ready job, making sure that these jobs are prioritized
271+
to pick up changes quickly.
272+
"""
273+
274+
_lname_running = 'JobRerunner-running'
275+
_lname_torerun = 'JobRerunner-torerun'
276+
277+
def __init__(self):
278+
self._running = dict()
279+
self._to_rerun = collections.deque()
280+
281+
def get_rerunnable(self) -> Runnable:
282+
# Let's also use the LockManager that is used for locking
283+
# in the code already. A simpler lock might do fine, but
284+
# for the LockManager we know it is working.
285+
286+
with LockManager.get_lock(self._lname_torerun):
287+
try:
288+
job = self._to_rerun.popleft()
289+
job.set_rescheduled()
290+
LOG.info("JobRerunner (about to rerun) %s", job.get_statline())
291+
except IndexError:
292+
job = None
293+
LOG.debug("JobRerunner had no rerunnable job")
294+
295+
return job
296+
297+
def job_done(self, job: Runnable):
298+
LOG.debug("JobRerunner job_done called for %s", job)
299+
with LockManager.get_lock(self._lname_running):
300+
count = self._running.get(job, 0)
301+
302+
if count == 1:
303+
del self._running[job]
304+
LOG.info("JobRerunner (done, no reruns requested) %s", job.get_statline())
305+
elif count > 1:
306+
# we only allow exactly one job to run at a time,
307+
# all jobs arriving later will increase the counter while
308+
# the job is still running or they get re-queued.
309+
# if a job is in the ready deque it will at some point
310+
# re-appear and so we can forget about the counter.
311+
LOG.info("JobRerunner (done, %d reruns requested) %s", count - 1, job.get_statline())
312+
del self._running[job]
313+
with LockManager.get_lock(self._lname_torerun):
314+
self._to_rerun.append(job)
315+
else:
316+
# prevent the error from spreading
317+
del self._running[job]
318+
LOG.warning("JobRerunner job_done called too often for job %s", job)
319+
320+
def add_job(self, job: Runnable) -> bool:
321+
""" Add job to list of jobs running/to be started or mark it for re-execution
322+
323+
returns True if the job is currently not running and should
324+
be scheduled next / added to the workers.
325+
326+
returns False if the job is already running and was marked for re-execution
327+
328+
"""
329+
with LockManager.get_lock(self._lname_running):
330+
count = self._running.get(job, 0)
331+
if count <= 0:
332+
# no job running, we can run the job
333+
# if the counter is < 0 our accounting is wrong,
334+
# so we fix it and run the job.
335+
self._running[job] = 1
336+
LOG.debug("JobRerunner no identical job is currently running, can start %s", job)
337+
return True
338+
339+
self._running[job] = count + 1
340+
LOG.debug("JobRerunner job %s already running, marked for rescheduling, count: %d ", job, count)
341+
342+
sum = 0
343+
for job, scount in self._running.items():
344+
sum += scount
345+
LOG.debug("JobRerunner stat: job %s is running, submission count: %d", job, scount)
346+
347+
# let's log these as info for debugging, they should be sufficient in prod
348+
# to find issues with the JobRerunner:
349+
LOG.info("JobRerunner stat: %d jobs tracked, total submission count: %d, ready for re-exection: %d",
350+
len(self._running), sum, len(self._to_rerun))
351+
return False
112352

113353

114354
class Runner(object):
@@ -129,11 +369,12 @@ class Runner(object):
129369
def __init__(self, active_size=INFINITY, passive_size=INFINITY,
130370
workers_size=1):
131371
# if queue_size is < 0, the queue size is infinite.
132-
self._active = UniqPriorityQueue(maxsize=active_size)
372+
self._active = UniqFiFoQueue(active_size)
133373
self._passive = UniqPriorityQueue(maxsize=passive_size)
134374
self._workers = eventlet.greenpool.GreenPool(size=workers_size)
135375
self._idle = workers_size
136376
self._state = "not started"
377+
self._rerunner = JobRerunner()
137378

138379
def run(self, priority, ids, fn):
139380
""" Submit a job with priority
@@ -150,7 +391,7 @@ def run(self, priority, ids, fn):
150391

151392
for jid in ids:
152393
try:
153-
LOG.info(MESSAGE.format("Enqueued", jid, priority.name, fn.__name__))
394+
LOG.info(MESSAGE.format("About to enqueue", jid, priority.name, fn.__name__))
154395

155396
job = Runnable(jid, fn, priority.value)
156397
if priority.value == Priority.HIGHEST:
@@ -169,10 +410,31 @@ def _start(self):
169410
if self.active() < self._idle and self.passive() > 0:
170411
self._active.put_nowait(self._passive.get_nowait())
171412
self._passive.task_done()
172-
job = self._active.get(block=True, timeout=TIMEOUT)
173-
LOG.info(MESSAGE.format("Processing", job.idn, Priority(job.priority).name, job.fn.__name__))
174-
self._workers.spawn(job.fn, job.idn)#.wait()
175-
self._active.task_done()
413+
pulled_from_queue = False
414+
job = self._rerunner.get_rerunnable()
415+
if not job:
416+
job = self._active.get(block=True, timeout=TIMEOUT)
417+
pulled_from_queue = True
418+
419+
# check if we are allowed to run it,
420+
# if yes mark it as running and spawn it
421+
if self._rerunner.add_job(job):
422+
LOG.info(MESSAGE.format("Processing", job.idn, Priority(job.priority).name, job.fn.__name__))
423+
424+
# ideally we would be able to add a callback to the
425+
# greenthread, but this is hidden in the pool, so
426+
# let's wrap the function once more.
427+
def wrap(rerun, ajob):
428+
ajob.set_start()
429+
ajob.fn(ajob.idn)
430+
ajob.set_done()
431+
rerun.job_done(ajob)
432+
433+
job.set_scheduled()
434+
self._workers.spawn(wrap, self._rerunner, job)
435+
436+
if pulled_from_queue:
437+
self._active.task_done()
176438
except eventlet.queue.Empty:
177439
LOG.info("No activity for the last {} seconds.".format(TIMEOUT))
178440
LOG.info("Sizes Queue[Active=%s, Passive=%s], Jobs=%s",

0 commit comments

Comments
 (0)