42
42
import sys
43
43
import traceback
44
44
from threading import Thread
45
+ from time import sleep
45
46
from typing import Optional
46
47
47
48
# NB: Avoid relative imports so that this script can be run standalone.
@@ -53,6 +54,7 @@ class Task:
53
54
def __init__ (self , uuid : str ) -> None :
54
55
self .uuid = uuid
55
56
self .outputs = {}
57
+ self .finished = False
56
58
self .cancel_requested = False
57
59
58
60
def update (
@@ -131,10 +133,10 @@ def execute_script():
131
133
132
134
self ._report_completion ()
133
135
134
- # TODO: Consider whether to retain a reference to this Thread, and
135
- # expose a "force" option for cancelation that kills it forcibly; see:
136
- # https://www.geeksforgeeks.org/python-different-ways-to-kill-a- thread/
137
- Thread ( target = execute_script , name = f"Appose- { self .uuid } " ) .start ()
136
+ # Create a thread and save a reference to it, in case its script
137
+ # ends up killing the thread. This happens e.g. if it calls sys.exit.
138
+ self . thread = Thread ( target = execute_script , name = f"Appose- { self . uuid } " )
139
+ self .thread .start ()
138
140
139
141
def _report_launch (self ) -> None :
140
142
self ._respond (ResponseType .LAUNCH , None )
@@ -144,27 +146,55 @@ def _report_completion(self) -> None:
144
146
self ._respond (ResponseType .COMPLETION , args )
145
147
146
148
def _respond (self , response_type : ResponseType , args : Optional [Args ]) -> None :
149
+ already_terminated = False
150
+ if response_type .is_terminal ():
151
+ if self .finished :
152
+ # This is not the first terminal response. Let's
153
+ # remember, in case an exception is generated below,
154
+ # so that we can avoid infinite recursion loops.
155
+ already_terminated = True
156
+ self .finished = True
157
+
147
158
response = {"task" : self .uuid , "responseType" : response_type .value }
148
159
if args is not None :
149
160
response .update (args )
150
161
# NB: Flush is necessary to ensure service receives the data!
151
162
try :
152
163
print (encode (response ), flush = True )
153
164
except Exception :
154
- # Encoding can fail due to unsupported types, when the response
155
- # or its elements are not supported by JSON encoding.
165
+ if already_terminated :
166
+ # An exception triggered a failure response which
167
+ # then triggered another exception. Let's stop here
168
+ # to avoid the risk of infinite recursion loops.
169
+ return
170
+ # Encoding can fail due to unsupported types, when the
171
+ # response or its elements are not supported by JSON encoding.
156
172
# No matter what goes wrong, we want to tell the caller.
157
- if response_type is ResponseType .FAILURE :
158
- # TODO: How to address this hypothetical case
159
- # of a failure message triggering another failure?
160
- raise
161
173
self .fail (traceback .format_exc ())
162
174
163
175
164
176
def main () -> None :
165
177
_set_worker (True )
166
178
167
179
tasks = {}
180
+ running = True
181
+
182
+ def cleanup_threads ():
183
+ while running :
184
+ sleep (0.05 )
185
+ dead = {
186
+ uuid : task
187
+ for uuid , task in tasks .items ()
188
+ if not task .thread .is_alive ()
189
+ }
190
+ for uuid , task in dead .items ():
191
+ tasks .pop (uuid )
192
+ if not task .finished :
193
+ # The task died before reporting a terminal status.
194
+ # We report this situation as failure by thread death.
195
+ task .fail ("thread death" )
196
+
197
+ Thread (target = cleanup_threads , name = "Appose-Janitor" ).start ()
168
198
169
199
while True :
170
200
try :
@@ -193,6 +223,8 @@ def main() -> None:
193
223
continue
194
224
task .cancel_requested = True
195
225
226
+ running = False
227
+
196
228
197
229
if __name__ == "__main__" :
198
230
main ()
0 commit comments