@@ -85,6 +85,8 @@ def start(self):
8585 """Enable executor, starting requested amount of workers
8686
8787 Workers are started always, not provisioned dynamically"""
88+ logger .debug (f"Starting { self .executor_name } with { self .nb_workers } threads" )
89+
8890 self .drain ()
8991 self ._workers : set [threading .Thread ] = set ()
9092 self .no_more = False
@@ -139,26 +141,30 @@ def drain(self):
139141
140142 def join (self ):
141143 """Await completion of workers, requesting them to stop taking new task"""
142- logger .debug (f"joining all threads for { self .executor_name } " )
144+ logger .debug (
145+ f"joining all threads for { self .executor_name } ; threads have "
146+ f"{ self .thread_deadline_sec } s to join before we give-up waiting for them"
147+ )
143148 self .no_more = True
144- for num , t in enumerate (self ._workers ):
145- deadline = datetime .datetime .now (tz = datetime .UTC ) + datetime .timedelta (
146- seconds = self .thread_deadline_sec
147- )
148- logger .debug (
149- f"Giving { self .executor_name } -{ num } { self .thread_deadline_sec } s to join"
150- )
149+ deadline = datetime .datetime .now (tz = datetime .UTC ) + datetime .timedelta (
150+ seconds = self .thread_deadline_sec
151+ )
152+ alive_threads = list (filter (lambda t : t .is_alive (), self ._workers ))
153+ for t in filter (lambda t : t not in alive_threads , self ._workers ):
154+ logger .debug (f"Thread { t .name } is already dead. Skipping…" )
155+ while (
156+ len (alive_threads ) > 0 and datetime .datetime .now (tz = datetime .UTC ) < deadline
157+ ):
151158 e = threading .Event ()
152- while t .is_alive () and datetime .datetime .now (tz = datetime .UTC ) < deadline :
153- t .join (1 )
154- e .wait (timeout = 2 )
155- if t .is_alive ():
156- logger .debug (
157- f"Thread { self .executor_name } -{ num } is not joining. Skipping…"
158- )
159- else :
160- logger .debug (f"Thread { self .executor_name } -{ num } joined" )
161- logger .debug (f"all threads joined for { self .executor_name } " )
159+ for t in alive_threads :
160+ t .join (0.1 ) # just indicate to the thread that we want to stop
161+ e .wait (2 ) # wait a bit more to let things cool down
162+ for t in filter (lambda t : not t .is_alive (), alive_threads ):
163+ logger .debug (f"Thread { t .name } joined" )
164+ alive_threads .remove (t )
165+ for t in alive_threads :
166+ logger .debug (f"Thread { t .name } never joined. Skipping…" )
167+ logger .debug (f"join completed for { self .executor_name } " )
162168
163169 def shutdown (self , * , wait = True ):
164170 """stop the executor, either somewhat immediately or awaiting completion"""
0 commit comments