@@ -109,7 +109,7 @@ def __init__(self, fnparams: CBPARAMS, fn: Callable[[CBPARAMS], None], priority=
109
109
110
110
@property
111
111
def identifier (self ) -> tuple :
112
- return ( self .idn , self ._fn .__name__ )
112
+ return self .idn , self ._fn .__name__
113
113
114
114
def debugid (self )-> str :
115
115
return str ((self .idn , self ._fn .__name__ , str (self ._fnparams )))
@@ -183,12 +183,12 @@ def __eq__(self, other):
183
183
existing element, in case it was lower than the item that was about to be added.
184
184
"""
185
185
if isinstance (other , Runnable ):
186
- return ( self ._fnparams == other ._fnparams and self ._fn == other ._fn )
186
+ return self ._fnparams == other ._fnparams and self ._fn == other ._fn
187
187
else :
188
188
return False
189
189
190
190
def __ne__ (self , other ):
191
- return ( not self .__eq__ (other ) )
191
+ return not self .__eq__ (other )
192
192
193
193
def __lt__ (self , other ):
194
194
""" Order Runnable by their priority
@@ -198,7 +198,7 @@ def __lt__(self, other):
198
198
# if the priority is equal, we want to order
199
199
# by creation time to handle oldest jobs first
200
200
if self .priority == other .priority :
201
- return self ._created < other ._created
201
+ return self ._created < other ._created # noqa
202
202
203
203
return self .priority < other .priority
204
204
@@ -281,6 +281,9 @@ class JobList():
281
281
282
282
There can be multiple similar jobs (same id, same method) but with different parameters, then parameter
283
283
is a dict and not a string with only the OpenStack ID.
284
+
285
+ These calls get a dict instead of a str: {enable, disable, update]_policy_logging, address_group_update
286
+
284
287
In these cases this JobList will keep track of the jobs, merging jobs that are identical (same parameters)
285
288
into one, to prevent unneccessary re-executions, but keeping them separate otherwise to not drop them.
286
289
@@ -309,12 +312,14 @@ class JobList():
309
312
done will then choose a job from the list, that is supposed to run again,
310
313
remove it from the list and return it to the JobRerunner.
311
314
312
- This might be the same job that was just finished or it could be a different one.
315
+ This might be exactly the same job that was just finished (if it needs rerunning) or
316
+ it could be a different one, that is: same openstack id, different parameter dict.
317
+
313
318
For now we will choose the oldest one based on age, which should be the same job that
314
319
was just done, but we might want to change that so we use a helper function for that
315
- for now in the POC.
316
-
320
+ for now in the POC. The current helper uses pop() so our list basically is a FiFo.
317
321
"""
322
+
318
323
def __init__ (self ):
319
324
self ._job_identifier : Optional [str ] = None
320
325
self ._runnables : List [tuple [int , Runnable ]] = []
@@ -324,7 +329,7 @@ def __len__(self):
324
329
325
330
@property
326
331
def size (self ):
327
- return sum ( count for count , _ in self ._runnables )
332
+ return sum (count for count , _ in self ._runnables )
328
333
329
334
def empty (self ):
330
335
return self .size == 0
@@ -342,7 +347,7 @@ def add(self, job:Runnable)->bool:
342
347
343
348
See Class documentation for more details.
344
349
345
- @return: True if the same job was not present yet, False if one already existed
350
+ returns True if the same job was not present yet, False if one already existed
346
351
"""
347
352
348
353
if self ._job_identifier is not None :
@@ -354,14 +359,18 @@ def add(self, job:Runnable)->bool:
354
359
# search through our list and update the counter or append the job:
355
360
for index , (count , existing_job ) in enumerate (self ._runnables ):
356
361
if job == existing_job :
362
+ if count < 1 :
363
+ # fix the list, otherwise we would never run that job.
364
+ LOG .error ("Joblist counter for job %s is %d, indicating job should have been removed." , job , count )
365
+ count = 0
357
366
count += 1
358
367
self ._runnables [index ] = (count , existing_job )
359
- if count > 1 :
360
- # in case the list was broken and we had a job with count == 0
361
- # in it, we do not want to block and prevent the job from getting
362
- # started!
363
- return False
364
- return True
368
+ if count == 1 :
369
+ # failsafe:
370
+ # return True if this is the first time the job has been added,
371
+ # only happens at this point if our bookkeeping was off.
372
+ return True
373
+ return False
365
374
366
375
# No match found, this is the first of its kind, we can run it.
367
376
# note that a job that gets re-executed will be removed from
@@ -372,7 +381,7 @@ def add(self, job:Runnable)->bool:
372
381
return True
373
382
374
383
def _runnable_is_done (self , job :Runnable ):
375
- # search through our list and update the counter or remove the job:
384
+ """ search through our list and update the counter or remove the job """
376
385
for index , (count , existing_job ) in enumerate (self ._runnables ):
377
386
if job == existing_job :
378
387
# we do not need this job with this parameters again, it is done,
@@ -385,6 +394,8 @@ def _runnable_is_done(self, job:Runnable):
385
394
LOG .warning ("Job count in JobList was %d for %s" , count , job .debugid ())
386
395
del self ._runnables [index ]
387
396
return
397
+ # leave job in the list, so it will get retrieved again later.
398
+ # when retrieving we use pop() so the list gets cleared then.
388
399
self ._runnables [index ] = (count , existing_job )
389
400
return
390
401
@@ -394,7 +405,7 @@ def _runnable_is_done(self, job:Runnable):
394
405
395
406
396
407
def _runnable_pop_next (self ) -> Optional [Runnable ]:
397
- """ find the next job to run and remove it from the list,
408
+ """ Find the next job to run and remove it from the list,
398
409
or return None if there is None to run.
399
410
"""
400
411
if not self ._runnables :
@@ -409,11 +420,11 @@ def _runnable_pop_next(self) -> Optional[Runnable]:
409
420
return job
410
421
411
422
def done (self , job :Runnable )-> Optional [Runnable ]:
412
- """ mark a job as done
423
+ """ Mark a job as done and return next job to run.
413
424
414
425
Removes the job from the list, if there are jobs left in the list, will return the next one to run.
415
426
416
- @return: A job to run next, or None if there are no jobs left to run.
427
+ returns a job to run next, or None if there are no jobs left to run.
417
428
"""
418
429
if self ._job_identifier is not None :
419
430
if job .identifier != self ._job_identifier :
@@ -423,7 +434,7 @@ def done(self, job:Runnable)->Optional[Runnable]:
423
434
return self ._runnable_pop_next ()
424
435
425
436
def get_count (self , job ):
426
- """ get how often this specific job had been requested to run while already running"""
437
+ """ Returns how often this specific job had been requested to run while already running """
427
438
428
439
if self ._job_identifier is not None :
429
440
if job .identifier != self ._job_identifier :
@@ -452,16 +463,16 @@ class JobRerunner():
452
463
In this class we keep track of all jobs currently running in the workers.
453
464
454
465
When taking a new job from the active queue, we check if this job is
455
- already running. If this is not the case, we will add it to our book keeping
456
- here and it will be sent to a worker, wrapped in a function that will call
457
- the JobRunner on job completion to update the book keeping .
466
+ already running. If this is not the case, we will add it to our bookkeeping
467
+ here, and it will be sent to a worker, wrapped in a function that will call
468
+ the JobRunner on job completion to update the bookkeeping .
458
469
459
470
If we find the job already running, the new job will be dropped, but a counter
460
471
for the job will be increased to mark the job for re-execution once its done,
461
472
in case a change to the object had occured while the worker was running.
462
473
463
- When a worker is done with a job, it will notify us and we can either
464
- remove the id from our book keeping or, if a new job was added in the meantime,
474
+ When a worker is done with a job, it will notify us, and we can either
475
+ remove the id from our bookkeeping or, if a new job was added in the meantime,
465
476
mark it as ready for re-execution. Note that a job will intentionally only be
466
477
re-executed once, independently of how often it was added while a worker was
467
478
already running it.
@@ -495,6 +506,7 @@ def get_rerunnable(self) -> Runnable:
495
506
return job
496
507
497
508
def job_done (self , job : Runnable ):
509
+ """ Marks job as done, update bookkeeping and if needed add job to the queue for re-execution """
498
510
LOG .debug ("JobRerunner job_done called for %s" , job )
499
511
500
512
with LockManager .get_lock (self ._lname_running ):
@@ -576,7 +588,7 @@ class Runner(object):
576
588
def __init__ (self , active_size = INFINITY , passive_size = INFINITY ,
577
589
workers_size = 1 ):
578
590
# if queue_size is < 0, the queue size is infinite.
579
- self ._active = UniqFiFoQueue (active_size )
591
+ self ._active = UniqFiFoQueue (maxsize = active_size )
580
592
self ._passive = UniqPriorityQueue (maxsize = passive_size )
581
593
self ._workers = eventlet .greenpool .GreenPool (size = workers_size )
582
594
self ._idle = workers_size
0 commit comments