@@ -149,6 +149,7 @@ def _default_kernel_buffers(self):
149149
150150 def __init__ (self , ** kwargs ):
151151 self .pinned_superclass = MultiKernelManager
152+ self ._pending_kernel_tasks = {}
152153 self .pinned_superclass .__init__ (self , ** kwargs )
153154 self .last_kernel_activity = utcnow ()
154155
@@ -216,9 +217,11 @@ async def start_kernel(self, kernel_id=None, path=None, **kwargs):
216217 kwargs ["kernel_id" ] = kernel_id
217218 kernel_id = await ensure_async (self .pinned_superclass .start_kernel (self , ** kwargs ))
218219 self ._kernel_connections [kernel_id ] = 0
219- fut = asyncio .ensure_future (self ._finish_kernel_start (kernel_id ))
220+ task = asyncio .create_task (self ._finish_kernel_start (kernel_id ))
220221 if not getattr (self , "use_pending_kernels" , None ):
221- await fut
222+ await task
223+ else :
224+ self ._pending_kernel_tasks [kernel_id ] = task
222225 # add busy/activity markers:
223226 kernel = self .get_kernel (kernel_id )
224227 kernel .execution_state = "starting"
@@ -245,8 +248,8 @@ async def _finish_kernel_start(self, kernel_id):
245248 if hasattr (km , "ready" ):
246249 try :
247250 await km .ready
248- except Exception :
249- self .log .exception (km . ready . exception () )
251+ except Exception as e :
252+ self .log .exception (e )
250253 return
251254
252255 self ._kernel_ports [kernel_id ] = km .ports
@@ -372,7 +375,7 @@ def stop_buffering(self, kernel_id):
372375 buffer_info = self ._kernel_buffers .pop (kernel_id )
373376 # close buffering streams
374377 for stream in buffer_info ["channels" ].values ():
375- if not stream .closed () :
378+ if not stream .socket . closed :
376379 stream .on_recv (None )
377380 stream .close ()
378381
@@ -387,13 +390,18 @@ def stop_buffering(self, kernel_id):
387390 def shutdown_kernel (self , kernel_id , now = False , restart = False ):
388391 """Shutdown a kernel by kernel_id"""
389392 self ._check_kernel_id (kernel_id )
390- self .stop_watching_activity (kernel_id )
391- self .stop_buffering (kernel_id )
392393
393394 # Decrease the metric of number of kernels
394395 # running for the relevant kernel type by 1
395396 KERNEL_CURRENTLY_RUNNING_TOTAL .labels (type = self ._kernels [kernel_id ].kernel_name ).dec ()
396397
398+ if kernel_id in self ._pending_kernel_tasks :
399+ task = self ._pending_kernel_tasks .pop (kernel_id )
400+ task .cancel ()
401+ else :
402+ self .stop_watching_activity (kernel_id )
403+ self .stop_buffering (kernel_id )
404+
397405 self .pinned_superclass .shutdown_kernel (self , kernel_id , now = now , restart = restart )
398406
399407 async def restart_kernel (self , kernel_id , now = False ):
@@ -533,7 +541,8 @@ def stop_watching_activity(self, kernel_id):
533541 """Stop watching IOPub messages on a kernel for activity."""
534542 kernel = self ._kernels [kernel_id ]
535543 if getattr (kernel , "_activity_stream" , None ):
536- kernel ._activity_stream .close ()
544+ if not kernel ._activity_stream .socket .closed :
545+ kernel ._activity_stream .close ()
537546 kernel ._activity_stream = None
538547
539548 def initialize_culler (self ):
@@ -638,19 +647,24 @@ def __init__(self, **kwargs):
638647 self .pinned_superclass = AsyncMultiKernelManager
639648 self .pinned_superclass .__init__ (self , ** kwargs )
640649 self .last_kernel_activity = utcnow ()
650+ self ._pending_kernel_tasks = {}
641651
642652 async def shutdown_kernel (self , kernel_id , now = False , restart = False ):
643653 """Shutdown a kernel by kernel_id"""
644654 self ._check_kernel_id (kernel_id )
645- self .stop_watching_activity (kernel_id )
646- self .stop_buffering (kernel_id )
647655
648656 # Decrease the metric of number of kernels
649657 # running for the relevant kernel type by 1
650658 KERNEL_CURRENTLY_RUNNING_TOTAL .labels (type = self ._kernels [kernel_id ].kernel_name ).dec ()
651659
660+ if kernel_id in self ._pending_kernel_tasks :
661+ task = self ._pending_kernel_tasks .pop (kernel_id )
662+ task .cancel ()
663+ else :
664+ self .stop_watching_activity (kernel_id )
665+ self .stop_buffering (kernel_id )
666+
652667 # Finish shutting down the kernel before clearing state to avoid a race condition.
653- ret = await self .pinned_superclass .shutdown_kernel (
668+ return await self .pinned_superclass .shutdown_kernel (
654669 self , kernel_id , now = now , restart = restart
655670 )
656- return ret
0 commit comments