@@ -177,6 +177,39 @@ def _put(self, item):
177
177
def _get (self ):
178
178
return heapq .heappop (self .queue )
179
179
180
+ class JobStats ():
181
+ """
182
+ lets collect some stats for debugging, we might rip it out again,
183
+ so I am not embedding it too deep into Runnable or the _running dict
184
+ """
185
+
186
+ def __init__ (self , jid : str ):
187
+ self ._jobid = jid
188
+ self ._created = time .time ()
189
+ self ._done = None
190
+ self ._popped = None
191
+
192
+ def done (self ):
193
+ self ._done = time .time ()
194
+
195
+ def restarted (self ):
196
+ self ._popped = time .time ()
197
+
198
+ def __repr__ (self ):
199
+
200
+ now = time .time ()
201
+ age = now - self ._created
202
+ runtime = None
203
+ restart = None
204
+
205
+ if self ._done :
206
+ runtime = f"{ self ._done - self ._created :0.4f} "
207
+
208
+ if self ._popped :
209
+ restart = f"{ self ._popped - self ._done :0.4f} "
210
+
211
+ return f"job { self ._jobid } age: { age :0.4f} s time until done: { runtime or '-' } wait for restart: { restart or '-' } "
212
+
180
213
181
214
class JobRerunner ():
182
215
""" Thread save data structure to reschedule jobs when it is already running
@@ -214,22 +247,40 @@ class JobRerunner():
214
247
def __init__ (self ):
215
248
self ._ready = collections .deque ()
216
249
self ._running = dict ()
250
+ self ._stats = dict ()
217
251
218
252
def pop (self ) -> Runnable :
219
253
# Let's also use the LockManager that is used for locking
220
254
# in the code already. A simpler lock might do fine, but
221
255
# for the LockManager we know it is working.
222
256
257
+
223
258
with LockManager .get_lock ("JobRerunner-deque" ):
224
259
try :
225
260
job = self ._ready .popleft ()
226
261
LOG .debug ("JobRerunner had rerunnable job: %s" , job )
227
262
except IndexError :
228
263
job = None
229
264
LOG .debug ("JobRerunner had no rerunnable job" )
265
+
266
+ if job :
267
+ # no nested locks if not necessary
268
+ with LockManager .get_lock ("JobRerunner-stats" ):
269
+ stats = self ._stats .get (job )
270
+ if stats is not None :
271
+ stats .restarted ()
272
+ LOG .debug ("JobRerunner stats: %s" , stats )
273
+ del self ._stats [job ]
230
274
return job
231
275
232
276
def job_done (self , job : Runnable ):
277
+
278
+ with LockManager .get_lock ("JobRerunner-stats" ):
279
+ stats = self ._stats .get (job )
280
+ if stats is not None :
281
+ stats .done ()
282
+ LOG .info ("JobRerunner stats: %s" , stats )
283
+
233
284
LOG .debug ("JobRerunner job_done called for %s" , job )
234
285
with LockManager .get_lock ("JobRerunner-running" ):
235
286
count = self ._running .get (job , 0 )
@@ -261,12 +312,7 @@ def add_job(self, job: Runnable) -> bool:
261
312
"""
262
313
with LockManager .get_lock ("JobRerunner-running" ):
263
314
count = self ._running .get (job , 0 )
264
- if count == 0 :
265
- # no job running, we can run the job
266
- self ._running [job ] = 1
267
- LOG .debug ("JobRerunner no identical job is currently running, can start %s" , job )
268
- return True
269
- elif count > 0 :
315
+ if count > 0 :
270
316
count += 1
271
317
self ._running [job ] = count
272
318
LOG .debug ("JobRerunner job %s already running, marked for rescheduling, count: %d " , job , count )
@@ -279,8 +325,21 @@ def add_job(self, job: Runnable) -> bool:
279
325
# to find issues with the JobRerunner:
280
326
LOG .info ("JobRerunner stat: %d jobs waiting, total submission count: %d" , len (self ._running ), sum )
281
327
LOG .info ("JobRerunner stat: %d jobs ready for re-execution" , len (self ._ready ))
328
+ return False
282
329
283
- return False
330
+ # no job running, we can run the job
331
+ self ._running [job ] = 1
332
+
333
+ LOG .debug ("JobRerunner no identical job is currently running, can start %s" , job )
334
+
335
+ with LockManager .get_lock ("JobRerunner-stats" ):
336
+ stats = self ._stats .get (job )
337
+ self ._stats [job ] = JobStats (str (job ))
338
+
339
+ if stats :
340
+ LOG .error ("JobRerunner stats should not exist when adding a new job: %s" , stats )
341
+
342
+ return True
284
343
285
344
286
345
class Runner (object ):
0 commit comments