@@ -95,10 +95,10 @@ def create_kernel_manager(*args: Any, **kwargs: Any) -> KernelManager:
9595 help = "Share a single zmq.Context to talk to all my kernels" ,
9696 ).tag (config = True )
9797
98- _created_context = Bool (False )
99-
10098 context = Instance ("zmq.Context" )
10199
100+ _created_context = Bool (False )
101+
102102 _pending_kernels = Dict ()
103103
104104 @property
@@ -111,7 +111,12 @@ def _context_default(self) -> zmq.Context:
111111 self ._created_context = True
112112 return zmq .Context ()
113113
114+ connection_dir = Unicode ("" )
115+
116+ _kernels = Dict ()
117+
114118 def __del__ (self ):
119+ """Handle garbage collection. Destroy context if applicable."""
115120 if self ._created_context and self .context and not self .context .closed :
116121 if self .log :
117122 self .log .debug ("Destroying zmq context for %s" , self )
@@ -123,10 +128,6 @@ def __del__(self):
123128 else :
124129 super_del ()
125130
126- connection_dir = Unicode ("" )
127-
128- _kernels = Dict ()
129-
130131 def list_kernel_ids (self ) -> t .List [str ]:
131132 """Return a list of the kernel ids of the active kernels."""
132133 # Create a copy so we can iterate over kernels in operations
@@ -171,17 +172,19 @@ async def _add_kernel_when_ready(
171172 try :
172173 await kernel_awaitable
173174 self ._kernels [kernel_id ] = km
174- finally :
175175 self ._pending_kernels .pop (kernel_id , None )
176+ except Exception as e :
177+ self .log .exception (e )
176178
177179 async def _remove_kernel_when_ready (
178180 self , kernel_id : str , kernel_awaitable : t .Awaitable
179181 ) -> None :
180182 try :
181183 await kernel_awaitable
182184 self .remove_kernel (kernel_id )
183- finally :
184185 self ._pending_kernels .pop (kernel_id , None )
186+ except Exception as e :
187+ self .log .exception (e )
185188
186189 def _using_pending_kernels (self ):
187190 """Returns a boolean; a clearer method for determining if
@@ -207,15 +210,15 @@ async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwarg
207210 kwargs ['kernel_id' ] = kernel_id # Make kernel_id available to manager and provisioner
208211
209212 starter = ensure_async (km .start_kernel (** kwargs ))
210- fut = asyncio .ensure_future (self ._add_kernel_when_ready (kernel_id , km , starter ))
211- self ._pending_kernels [kernel_id ] = fut
213+ task = asyncio .create_task (self ._add_kernel_when_ready (kernel_id , km , starter ))
214+ self ._pending_kernels [kernel_id ] = task
212215 # Handling a Pending Kernel
213216 if self ._using_pending_kernels ():
214217 # If using pending kernels, do not block
215218 # on the kernel start.
216219 self ._kernels [kernel_id ] = km
217220 else :
218- await fut
221+ await task
219222 # raise an exception if one occurred during kernel startup.
220223 if km .ready .exception ():
221224 raise km .ready .exception () # type: ignore
@@ -224,22 +227,6 @@ async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwarg
224227
225228 start_kernel = run_sync (_async_start_kernel )
226229
227- async def _shutdown_kernel_when_ready (
228- self ,
229- kernel_id : str ,
230- now : t .Optional [bool ] = False ,
231- restart : t .Optional [bool ] = False ,
232- ) -> None :
233- """Wait for a pending kernel to be ready
234- before shutting the kernel down.
235- """
236- # Only do this if using pending kernels
237- if self ._using_pending_kernels ():
238- kernel = self ._kernels [kernel_id ]
239- await kernel .ready
240- # Once out of a pending state, we can call shutdown.
241- await ensure_async (self .shutdown_kernel (kernel_id , now = now , restart = restart ))
242-
243230 async def _async_shutdown_kernel (
244231 self ,
245232 kernel_id : str ,
@@ -258,22 +245,21 @@ async def _async_shutdown_kernel(
258245 Will the kernel be restarted?
259246 """
260247 self .log .info ("Kernel shutdown: %s" % kernel_id )
261- # If we're using pending kernels, block shutdown when a kernel is pending.
262- if self ._using_pending_kernels () and kernel_id in self ._pending_kernels :
263- raise RuntimeError ("Kernel is in a pending state. Cannot shutdown." )
264248 # If the kernel is still starting, wait for it to be ready.
265- elif kernel_id in self ._pending_kernels :
266- kernel = self ._pending_kernels [kernel_id ]
249+ if kernel_id in self ._pending_kernels :
250+ task = self ._pending_kernels [kernel_id ]
267251 try :
268- await kernel
252+ await task
269253 km = self .get_kernel (kernel_id )
270254 await t .cast (asyncio .Future , km .ready )
255+ except asyncio .CancelledError :
256+ pass
271257 except Exception :
272258 self .remove_kernel (kernel_id )
273259 return
274260 km = self .get_kernel (kernel_id )
275261 # If a pending kernel raised an exception, remove it.
276- if km .ready .exception ():
262+ if not km . ready . cancelled () and km .ready .exception ():
277263 self .remove_kernel (kernel_id )
278264 return
279265 stopper = ensure_async (km .shutdown_kernel (now , restart ))
@@ -320,13 +306,19 @@ async def _async_shutdown_all(self, now: bool = False) -> None:
320306 """Shutdown all kernels."""
321307 kids = self .list_kernel_ids ()
322308 kids += list (self ._pending_kernels )
323- futs = [ensure_async (self ._shutdown_kernel_when_ready (kid , now = now )) for kid in set (kids )]
309+ kms = list (self ._kernels .values ())
310+ futs = [ensure_async (self .shutdown_kernel (kid , now = now )) for kid in set (kids )]
324311 await asyncio .gather (* futs )
325- # When using "shutdown all", all pending kernels
326- # should be awaited before exiting this method.
312+ # If using pending kernels, the kernels will not have been fully shut down.
327313 if self ._using_pending_kernels ():
328- for km in self ._kernels .values ():
329- await km .ready
314+ for km in kms :
315+ try :
316+ await km .ready
317+ except asyncio .CancelledError :
318+ self ._pending_kernels [km .kernel_id ].cancel ()
319+ except Exception :
320+ # Will have been logged in _add_kernel_when_ready
321+ pass
330322
331323 shutdown_all = run_sync (_async_shutdown_all )
332324
0 commit comments