@@ -213,9 +213,9 @@ def __repr__(self):
213
213
214
214
215
215
class JobRerunner ():
216
- """ Thread save data structure to reschedule jobs when it is already running
216
+ """ Thread save data structure to reschedule jobs when they are already running
217
217
218
- When a job is retrieved from the active queue and running in a worker thread,
218
+ When a job is retrieved from the active queue and already running in a worker thread,
219
219
there is a chance that another job for the same object is added to the active
220
220
queue and also started in a worker. While there then is some locking happening
221
221
that should prevent race conditions, this still blocks the worker thread(s),
@@ -245,28 +245,32 @@ class JobRerunner():
245
245
to pick up changes quickly.
246
246
"""
247
247
248
+ _lname_running = 'JobRerunner-running'
249
+ _lname_torerun = 'JobRerunner-torerun'
250
+ _lname_statdct = 'JobRerunner-statdct'
251
+
248
252
def __init__ (self ):
249
- self ._ready = collections .deque ()
250
253
self ._running = dict ()
254
+ self ._to_rerun = collections .deque ()
251
255
self ._stats = dict ()
252
256
253
- def pop (self ) -> Runnable :
257
+ def get_rerunnable (self ) -> Runnable :
254
258
# Let's also use the LockManager that is used for locking
255
259
# in the code already. A simpler lock might do fine, but
256
260
# for the LockManager we know it is working.
257
261
258
262
259
- with LockManager .get_lock ("JobRerunner-deque" ):
263
+ with LockManager .get_lock (self . _lname_torerun ):
260
264
try :
261
- job = self ._ready .popleft ()
265
+ job = self ._to_rerun .popleft ()
262
266
LOG .debug ("JobRerunner had rerunnable job: %s" , job )
263
267
except IndexError :
264
268
job = None
265
269
LOG .debug ("JobRerunner had no rerunnable job" )
266
270
267
271
if job :
268
272
# no nested locks if not necessary
269
- with LockManager .get_lock ("JobRerunner-stats" ):
273
+ with LockManager .get_lock (self . _lname_statdct ):
270
274
stats = self ._stats .get (job )
271
275
if stats is not None :
272
276
stats .restarted ()
@@ -276,7 +280,7 @@ def pop(self) -> Runnable:
276
280
277
281
def job_done (self , job : Runnable ):
278
282
LOG .debug ("JobRerunner job_done called for %s" , job )
279
- with LockManager .get_lock ("JobRerunner-running" ):
283
+ with LockManager .get_lock (self . _lname_running ):
280
284
count = self ._running .get (job , 0 )
281
285
282
286
if count == 1 :
@@ -290,12 +294,12 @@ def job_done(self, job: Runnable):
290
294
# re-appear and so we can forget about the counter.
291
295
LOG .debug ("JobRerunner job %s is done, %d reruns requested, marking it for re-execution" , job , count )
292
296
del self ._running [job ]
293
- with LockManager .get_lock ("JobRerunner-deque" ):
294
- self ._ready .append (job )
297
+ with LockManager .get_lock (self . _lname_torerun ):
298
+ self ._to_rerun .append (job )
295
299
else :
296
300
LOG .warning ("JobRerunner job_done called too often for job %s" , job )
297
301
298
- with LockManager .get_lock ("JobRerunner-stats" ):
302
+ with LockManager .get_lock (self . _lname_statdct ):
299
303
stats = self ._stats .get (job )
300
304
if stats is not None :
301
305
stats .done ()
@@ -313,7 +317,7 @@ def add_job(self, job: Runnable) -> bool:
313
317
returns False if the job is already running and was marked for re-execution
314
318
315
319
"""
316
- with LockManager .get_lock ("JobRerunner-running" ):
320
+ with LockManager .get_lock (self . _lname_running ):
317
321
count = self ._running .get (job , 0 )
318
322
if count > 0 :
319
323
count += 1
@@ -327,15 +331,15 @@ def add_job(self, job: Runnable) -> bool:
327
331
# let's log these 2 with info for debugging, they should be sufficient in prod
328
332
# to find issues with the JobRerunner:
329
333
LOG .info ("JobRerunner stat: %d jobs waiting, total submission count: %d" , len (self ._running ), sum )
330
- LOG .info ("JobRerunner stat: %d jobs ready for re-execution" , len (self ._ready ))
334
+ LOG .info ("JobRerunner stat: %d jobs ready for re-execution" , len (self ._to_rerun ))
331
335
return False
332
336
333
337
# no job running, we can run the job
334
338
self ._running [job ] = 1
335
339
336
340
LOG .debug ("JobRerunner no identical job is currently running, can start %s" , job )
337
341
338
- with LockManager .get_lock ("JobRerunner-stats" ):
342
+ with LockManager .get_lock (self . _lname_statdct ):
339
343
stats = self ._stats .get (job )
340
344
self ._stats [job ] = JobStats (str (job ))
341
345
@@ -404,11 +408,11 @@ def _start(self):
404
408
if self .active () < self ._idle and self .passive () > 0 :
405
409
self ._active .put_nowait (self ._passive .get_nowait ())
406
410
self ._passive .task_done ()
407
- job = self . _rerunner . pop ()
408
- from_queue = False
411
+ pulled_from_queue = False
412
+ job = self . _rerunner . get_rerunnable ()
409
413
if not job :
410
414
job = self ._active .get (block = True , timeout = TIMEOUT )
411
- from_queue = True
415
+ pulled_from_queue = True
412
416
413
417
# check if we are allowed to run it,
414
418
# if yes mark it as running and spawn it
@@ -424,7 +428,7 @@ def wrap(rerun, ajob):
424
428
425
429
self ._workers .spawn (wrap , self ._rerunner , job )
426
430
427
- if from_queue :
431
+ if pulled_from_queue :
428
432
self ._active .task_done ()
429
433
except eventlet .queue .Empty :
430
434
LOG .info ("No activity for the last {} seconds." .format (TIMEOUT ))
0 commit comments