@@ -216,9 +216,9 @@ def _get(self):
216
216
217
217
218
218
class JobRerunner ():
219
- """ Thread save data structure to reschedule jobs when it is already running
219
+ """ Thread save data structure to reschedule jobs when they are already running
220
220
221
- When a job is retrieved from the active queue and running in a worker thread,
221
+ When a job is retrieved from the active queue and already running in a worker thread,
222
222
there is a chance that another job for the same object is added to the active
223
223
queue and also started in a worker. While there then is some locking happening
224
224
that should prevent race conditions, this still blocks the worker thread(s),
@@ -248,18 +248,21 @@ class JobRerunner():
248
248
to pick up changes quickly.
249
249
"""
250
250
251
+ _lname_running = 'JobRerunner-running'
252
+ _lname_torerun = 'JobRerunner-torerun'
253
+
251
254
def __init__ (self ):
252
- self ._ready = collections .deque ()
253
255
self ._running = dict ()
256
+ self ._to_rerun = collections .deque ()
254
257
255
- def pop (self ) -> Runnable :
258
+ def get_rerunnable (self ) -> Runnable :
256
259
# Let's also use the LockManager that is used for locking
257
260
# in the code already. A simpler lock might do fine, but
258
261
# for the LockManager we know it is working.
259
262
260
- with LockManager .get_lock ("JobRerunner-deque" ):
263
+ with LockManager .get_lock (self . _lname_torerun ):
261
264
try :
262
- job = self ._ready .popleft ()
265
+ job = self ._to_rerun .popleft ()
263
266
LOG .debug ("JobRerunner had rerunnable job: %s" , job )
264
267
LOG .info ("JobRerunner (rerun) %s" , job .get_statline ())
265
268
except IndexError :
@@ -270,7 +273,7 @@ def pop(self) -> Runnable:
270
273
271
274
def job_done (self , job : Runnable ):
272
275
LOG .debug ("JobRerunner job_done called for %s" , job )
273
- with LockManager .get_lock ("JobRerunner-running" ):
276
+ with LockManager .get_lock (self . _lname_running ):
274
277
count = self ._running .get (job , 0 )
275
278
276
279
if count == 1 :
@@ -286,8 +289,8 @@ def job_done(self, job: Runnable):
286
289
LOG .info ("JobRerunner (requeue) %s" , job .get_statline ())
287
290
LOG .debug ("JobRerunner job %s is done, %d reruns requested, marking it for re-execution" , job , count )
288
291
del self ._running [job ]
289
- with LockManager .get_lock ("JobRerunner-deque" ):
290
- self ._ready .append (job )
292
+ with LockManager .get_lock (self . _lname_torerun ):
293
+ self ._to_rerun .append (job )
291
294
else :
292
295
# prevent the error from spreading
293
296
del self ._running [job ]
@@ -302,7 +305,7 @@ def add_job(self, job: Runnable) -> bool:
302
305
returns False if the job is already running and was marked for re-execution
303
306
304
307
"""
305
- with LockManager .get_lock ("JobRerunner-running" ):
308
+ with LockManager .get_lock (self . _lname_running ):
306
309
count = self ._running .get (job , 0 )
307
310
if count <= 0 :
308
311
# no job running, we can run the job
@@ -323,7 +326,7 @@ def add_job(self, job: Runnable) -> bool:
323
326
# let's log these as info for debugging, they should be sufficient in prod
324
327
# to find issues with the JobRerunner:
325
328
LOG .info ("JobRerunner stat: %d jobs waiting, total submission count: %d" , len (self ._running ), sum )
326
- LOG .info ("JobRerunner stat: %d jobs ready for re-execution" , len (self ._ready ))
329
+ LOG .info ("JobRerunner stat: %d jobs ready for re-execution" , len (self ._to_rerun ))
327
330
return False
328
331
329
332
@@ -386,11 +389,11 @@ def _start(self):
386
389
if self .active () < self ._idle and self .passive () > 0 :
387
390
self ._active .put_nowait (self ._passive .get_nowait ())
388
391
self ._passive .task_done ()
389
- job = self . _rerunner . pop ()
390
- from_queue = False
392
+ pulled_from_queue = False
393
+ job = self . _rerunner . get_rerunnable ()
391
394
if not job :
392
395
job = self ._active .get (block = True , timeout = TIMEOUT )
393
- from_queue = True
396
+ pulled_from_queue = True
394
397
395
398
# check if we are allowed to run it,
396
399
# if yes mark it as running and spawn it
@@ -408,7 +411,7 @@ def wrap(rerun, ajob):
408
411
409
412
self ._workers .spawn (wrap , self ._rerunner , job )
410
413
411
- if from_queue :
414
+ if pulled_from_queue :
412
415
self ._active .task_done ()
413
416
except eventlet .queue .Empty :
414
417
LOG .info ("No activity for the last {} seconds." .format (TIMEOUT ))
0 commit comments