Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 55 additions & 48 deletions pykern/pkasyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ActionLoop:

def __init__(self):
# All these attributes must exist even after destroy()
self.destroyed = False
self.__destroyed = False
self.__lock = threading.Lock()
self.__actions = queue.Queue()
# You can join the thread to block another (e.g. main) thread from exiting
Expand All @@ -68,9 +68,7 @@ def action(self, method, arg):
"""Queue ``method`` to be called in loop thread.

Actions are methods that (by convention) begin with
``action_`` and are called sequentially inside `_start`. A
lock is used to prevent `destroy` being called during the
action and serializing activities within a single action.
``action_`` and are called sequentially inside `_start`.

Actions return ``None`` to continue on to the next
action. `_LOOP_END` should be returned to terminate `_start`
Expand Down Expand Up @@ -106,30 +104,28 @@ def action(self, method, arg):
)

def destroy(self):
"""Stops thread and calls subclass `_destroy`
"""Stops the thread.

THREADING: subclasses should not call destroy directly. They should
return `_LOOP_END` instead. External callbacks may call destroy, because
_ActionLoop does not hold lock during external callbacks.
`_start` always calls the `_handle_destroy` callback at the end of the
loop.

Subclasses should not reimplement. To handle behavior on destroy,
call `_handle_destroy`.

THREADING: subclasses and external callbacks may call destroy directly.
"""
try:
with self.__lock:
if self.destroyed:
return
self.destroyed = True
self.__actions.put_nowait((None, None))
self._destroy()
except Exception as e:
pkdlog("error={} {} stack={}", e, self, pkdexc(simplify=True))
with self.__lock:
if self.__destroyed:
return
self.__destroyed = True
self.__actions.put_nowait((None, None))

def _dispatch_action(self, method, arg):
"""Calls method with arg.

Subclasses may re-implement. This function will remain a
very simple wrapper for ``return method(arg)``.

This function is called inside the lock.

Args:
method (callable): to be called
arg (object): to be passed
Expand All @@ -144,13 +140,33 @@ def _dispatch_callback(self, callback):
Subclasses may re-implement. This method will remain a very
simple wrapper for ``callback()``.

This function is called outside the lock.

Args:
callback (callable): to be called
"""
callback()

def _handle_destroy(self):
"""Callback on destroy and loop end.

Subclasses may re-implement.

THREADING: subclasses should not call directly. This callback is
handled by the thread. Instead, call `destroy`.
"""
pass

def _handle_exception(self, exc):
"""Exception handler for `_start`.

`_handle_exception` is called when there's an exception in `_start`.

Subclasses may reimplement.

Args:
exc (Exception): Captured Exception.
"""
pass

def _on_loop_timeout(self):
"""Called when a loop timeout occurs.

Expand All @@ -159,12 +175,6 @@ def _on_loop_timeout(self):
# `__init__` prevents this from happening, but good to document.
raise NotImplementedError("ActionLoop._on_loop_timeout")

def __repr__(self):
def _destroyed():
return " DESTROYED" if self.destroyed else ""

return f"<{self.__class__.__name__}{_destroyed()} self._repr()>"

def _start(self):
"""Loops over actions and exits on `_LOOP_END` or on unhandled exception.

Expand All @@ -174,25 +184,34 @@ def _start(self):
"""
while True:
with self.__lock:
if self.destroyed:
return
if self.__destroyed:
break
try:
m, a = self.__actions.get(**self.__get_args)
self.__actions.task_done()
except queue.Empty:
m, a = self._on_loop_timeout(), None
with self.__lock:
if self.destroyed:
return
if self.__destroyed:
break
# Do not need to check m, because only invalid when destroyed is True
if (m := self._dispatch_action(m, a)) is self._LOOP_END:
return
if (m := self._dispatch_action(m, a)) is self._LOOP_END:
break
# Will be true if destroy called inside action (m)
if self.destroyed:
return
# Action returned an external callback, which must occur outside lock
with self.__lock:
if self.__destroyed:
break
# Action returned an external callback
if m:
self._dispatch_callback(m)
# Callback on destroy and loop end.
self._handle_destroy()

def __repr__(self):
def _destroyed():
return " DESTROYED" if self.__destroyed else ""

return f"<{self.__class__.__name__}{_destroyed()} self._repr()>"

def __target(self):
"""Thread's target function"""
Expand All @@ -212,18 +231,6 @@ def __target(self):
finally:
self.destroy()

def _handle_exception(self, exc):
"""Exception handler for `_start`.

`_handle_exception` is called when there's an exception in `_start`.

Subclasses may reimplement.

Args:
exc (Exception): Captured Exception.
"""
pass


class Loop:
"""HTTP Server loop"""
Expand Down