From 5fc8fa5f1d5c9fe49d09217cf52ba95004f3bdb8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 17 May 2023 14:31:06 -0400 Subject: [PATCH 1/8] Initial prototype for a one-cancels-one style supervisor, nursery thing.. --- tractor/trionics/_supervisor.py | 256 ++++++++++++++++++++++++++++++++ 1 file changed, 256 insertions(+) create mode 100644 tractor/trionics/_supervisor.py diff --git a/tractor/trionics/_supervisor.py b/tractor/trionics/_supervisor.py new file mode 100644 index 00000000..faeb7616 --- /dev/null +++ b/tractor/trionics/_supervisor.py @@ -0,0 +1,256 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Erlang-style (ish) "one-cancels-one" nursery. + +''' +from __future__ import annotations +from contextlib import ( + asynccontextmanager as acm, + contextmanager as cm, + nullcontext, +) +from typing import ContextManager + +from outcome import ( + Outcome, + acapture, +) +import pdbp +from msgspec import Struct +import trio +from trio._core._run import ( + Task, + CancelScope, + Nursery, +) + +class MaybeOutcome(Struct): + + _ready: Event = trio.Event() + _outcome: Outcome | None = None + _result: Any | None = None + + @property + def result(self) -> Any: + ''' + Either Any or None depending on whether the Outcome has compeleted. + + ''' + if self._outcome is None: + raise RuntimeError( + # f'Task {task.name} is not complete.\n' + f'Outcome is not complete.\n' + 'wait on `await MaybeOutcome.unwrap()` first!' + ) + return self._result + + def _set_outcome( + self, + outcome: Outcome, + ): + self._outcome = outcome + self._result = outcome.unwrap() + self._ready.set() + + # TODO: maybe a better name like, + # - .wait_and_unwrap() + # - .wait_unwrap() + # - .aunwrap() ? + async def unwrap(self) -> Any: + if self._ready.is_set(): + return self._result + + await self._ready.wait() + + out = self._outcome + if out is None: + raise ValueError(f'{out} is not an outcome!?') + + return self.result + + +class TaskHandle(Struct): + task: Task + cs: CancelScope + exited: Event | None = None + _outcome: Outcome | None = None + + +class ScopePerTaskNursery(Struct): + _n: Nursery + _scopes: dict[ + Task, + tuple[CancelScope, Outcome] + ] = {} + + scope_manager: ContextManager | None = None + + async def start_soon( + self, + async_fn, + *args, + + name=None, + scope_manager: ContextManager | None = None, + + ) -> tuple[CancelScope, Task]: + + # NOTE: internals of a nursery don't let you know what + # the most recently spawned task is by order.. so we'd + # have to either change that or do set ops. + # pre_start_tasks: set[Task] = n._children.copy() + # new_tasks = n._children - pre_start_Tasks + # assert len(new_tasks) == 1 + # task = new_tasks.pop() + + n: Nursery = self._n + cs = CancelScope() + new_task: Task | None = None + to_return: tuple[Any] | None = None + maybe_outcome = MaybeOutcome() + + sm = self.scope_manager + if sm is None: + mngr = nullcontext([cs]) + else: + mngr = sm( + nursery=n, + scope=cs, + maybe_outcome=maybe_outcome, + ) + + async def _start_wrapped_in_scope( + task_status: TaskStatus[ + tuple[CancelScope, Task] + ] = trio.TASK_STATUS_IGNORED, + + ) -> None: + nonlocal maybe_outcome + nonlocal to_return + + with cs: + + task = trio.lowlevel.current_task() + self._scopes[cs] = task + + # TODO: instead we should probably just use + # `Outcome.send(mngr)` here no and inside a custom + # decorator `@trio.cancel_scope_manager` enforce + # that it's a single yield generator? + with mngr as to_return: + + # TODO: relay through whatever the + # started task passes back via `.started()` ? + # seems like that won't work with also returning + # a "task handle"? + task_status.started() + + # invoke underlying func now that cs is entered. + outcome = await acapture(async_fn, *args) + + # TODO: instead, mngr.send(outcome) so that we don't + # tie this `.start_soon()` impl to the + # `MaybeOutcome` type? Use `Outcome.send(mngr)` + # right? + maybe_outcome._set_outcome(outcome) + + await n.start(_start_wrapped_in_scope) + assert to_return is not None + + # TODO: better way to concat the values delivered by the user + # provided `.scope_manager` and the outcome? + return tuple([maybe_outcome] + to_return) + + +# TODO: maybe just make this a generator with a single yield that also +# delivers a value (of some std type) from the yield expression? +# @trio.cancel_scope_manager +@cm +def add_task_handle_and_crash_handling( + nursery: Nursery, + scope: CancelScope, + maybe_outcome: MaybeOutcome, + +) -> Generator[None, list[Any]]: + + cs: CancelScope = CancelScope() + + # if you need it you can ask trio for the task obj + task: Task = trio.lowlevel.current_task() + print(f'Spawning task: {task.name}') + + try: + # yields back when task is terminated, cancelled, returns? + with cs: + # the yielded values here are what are returned to the + # nursery's `.start_soon()` caller + + # TODO: actually make this work so that `MaybeOutcome` isn't + # tied to the impl of `.start_soon()` on our custom nursery! + task_outcome: Outcome = yield [cs] + + except Exception as err: + # Adds "crash handling" from `pdbp` by entering + # a REPL on std errors. + pdbp.xpm() + raise + + +@acm +async def open_nursery( + scope_manager = None, + **kwargs, +): + async with trio.open_nursery(**kwargs) as nurse: + yield ScopePerTaskNursery( + nurse, + scope_manager=scope_manager, + ) + + +async def sleep_then_err(): + await trio.sleep(1) + assert 0 + + +async def sleep_then_return_val(val: str): + await trio.sleep(0.2) + return val + + +if __name__ == '__main__': + + async def main(): + async with open_nursery( + scope_manager=add_task_handle_and_crash_handling, + ) as sn: + for _ in range(3): + outcome, cs = await sn.start_soon(trio.sleep_forever) + + # extra task we want to engage in debugger post mortem. + err_outcome, *_ = await sn.start_soon(sleep_then_err) + + val: str = 'yoyoyo' + val_outcome, cs = await sn.start_soon(sleep_then_return_val, val) + res = await val_outcome.unwrap() + assert res == val + print(f'GOT EXPECTED TASK VALUE: {res}') + + print('WAITING FOR CRASH..') + + trio.run(main) From b4858710a9317344f993be299b86be25fd6da3b2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 17 May 2023 15:27:29 -0400 Subject: [PATCH 2/8] Alias to `@acm` in broadcaster mod --- tractor/trionics/_broadcast.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tractor/trionics/_broadcast.py b/tractor/trionics/_broadcast.py index 42b1704b..115509dc 100644 --- a/tractor/trionics/_broadcast.py +++ b/tractor/trionics/_broadcast.py @@ -22,7 +22,7 @@ from __future__ import annotations from abc import abstractmethod from collections import deque -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager as acm from functools import partial from operator import ne from typing import Optional, Callable, Awaitable, Any, AsyncIterator, Protocol @@ -385,7 +385,7 @@ async def receive(self) -> ReceiveType: return await self._receive_from_underlying(key, state) - @asynccontextmanager + @acm async def subscribe( self, raise_on_lag: bool = True, From 65c5d7da4ee9c1cf8aa7aa4ac02a2aaae6198969 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 19 May 2023 13:13:21 -0400 Subject: [PATCH 3/8] Do renaming, implement lowlevel `Outcome` sending As was listed in the many todos, this changes the `.start_soon()` impl to instead (manually) `.send()` into the user defined `@task_scope_manager` an `Outcome` from the spawned task. In this case the task manager wraps that in a user defined (and renamed) `TaskOutcome` and delivers that + a containing `trio.CancelScope` to the `.start_soon()` caller. Here the user defined `TaskOutcome` defines a `.wait_for_result()` method that can be used to await the task's exit and handle it's underlying returned value or raised error; the implementation could be different and subject to the user's own whims. Note that by default, if this was added to `trio`'s core, the `@task_scope_manager` would simply be implemented as either a `None` yielding single-yield-generator but more likely just entirely ignored by the runtime (as in no manual task outcome collecting, generator calling and sending is done at all) by default if the user does not provide the `task_scope_manager` to the nursery at open time. --- tractor/trionics/_supervisor.py | 151 +++++++++++++++++++------------- 1 file changed, 90 insertions(+), 61 deletions(-) diff --git a/tractor/trionics/_supervisor.py b/tractor/trionics/_supervisor.py index faeb7616..2a664a9f 100644 --- a/tractor/trionics/_supervisor.py +++ b/tractor/trionics/_supervisor.py @@ -39,11 +39,17 @@ Nursery, ) -class MaybeOutcome(Struct): - _ready: Event = trio.Event() - _outcome: Outcome | None = None - _result: Any | None = None +class TaskOutcome(Struct): + ''' + The outcome of a scheduled ``trio`` task which includes an interface + for synchronizing to the completion of the task's runtime and access + to the eventual boxed result/value or raised exception. + + ''' + _exited: Event = trio.Event() # as per `trio.Runner.task_exited()` + _outcome: Outcome | None = None # as per `outcome.Outcome` + _result: Any | None = None # the eventual maybe-returned-value @property def result(self) -> Any: @@ -55,7 +61,7 @@ def result(self) -> Any: raise RuntimeError( # f'Task {task.name} is not complete.\n' f'Outcome is not complete.\n' - 'wait on `await MaybeOutcome.unwrap()` first!' + 'wait on `await TaskOutcome.wait_for_result()` first!' ) return self._result @@ -63,19 +69,27 @@ def _set_outcome( self, outcome: Outcome, ): + ''' + Set the ``Outcome`` for this task. + + This method should only ever be called by the task's supervising + nursery implemenation. + + ''' self._outcome = outcome self._result = outcome.unwrap() - self._ready.set() - - # TODO: maybe a better name like, - # - .wait_and_unwrap() - # - .wait_unwrap() - # - .aunwrap() ? - async def unwrap(self) -> Any: - if self._ready.is_set(): + self._exited.set() + + async def wait_for_result(self) -> Any: + ''' + Unwind the underlying task's ``Outcome`` by async waiting for + the task to first complete and then unwrap it's result-value. + + ''' + if self._exited.is_set(): return self._result - await self._ready.wait() + await self._exited.wait() out = self._outcome if out is None: @@ -84,13 +98,6 @@ async def unwrap(self) -> Any: return self.result -class TaskHandle(Struct): - task: Task - cs: CancelScope - exited: Event | None = None - _outcome: Outcome | None = None - - class ScopePerTaskNursery(Struct): _n: Nursery _scopes: dict[ @@ -122,17 +129,14 @@ async def start_soon( cs = CancelScope() new_task: Task | None = None to_return: tuple[Any] | None = None - maybe_outcome = MaybeOutcome() sm = self.scope_manager if sm is None: mngr = nullcontext([cs]) else: - mngr = sm( - nursery=n, - scope=cs, - maybe_outcome=maybe_outcome, - ) + # NOTE: what do we enforce as a signature for the + # `@task_scope_manager` here? + mngr = sm(nursery=n, scope=cs) async def _start_wrapped_in_scope( task_status: TaskStatus[ @@ -140,55 +144,81 @@ async def _start_wrapped_in_scope( ] = trio.TASK_STATUS_IGNORED, ) -> None: - nonlocal maybe_outcome - nonlocal to_return + + # TODO: this was working before?! + # nonlocal to_return with cs: task = trio.lowlevel.current_task() self._scopes[cs] = task - # TODO: instead we should probably just use - # `Outcome.send(mngr)` here no and inside a custom - # decorator `@trio.cancel_scope_manager` enforce - # that it's a single yield generator? - with mngr as to_return: - - # TODO: relay through whatever the - # started task passes back via `.started()` ? - # seems like that won't work with also returning - # a "task handle"? - task_status.started() + # execute up to the first yield + try: + to_return: tuple[Any] = next(mngr) + except StopIteration: + raise RuntimeError("task manager didn't yield") from None + + # TODO: how do we support `.start()` style? + # - relay through whatever the + # started task passes back via `.started()` ? + # seems like that won't work with also returning + # a "task handle"? + # - we were previously binding-out this `to_return` to + # the parent's lexical scope, why isn't that working + # now? + task_status.started(to_return) + + # invoke underlying func now that cs is entered. + outcome = await acapture(async_fn, *args) + + # execute from the 1st yield to return and expect + # generator-mngr `@task_scope_manager` thinger to + # terminate! + try: + mngr.send(outcome) + + # NOTE: this will instead send the underlying + # `.value`? Not sure if that's better or not? + # I would presume it's better to have a handle to + # the `Outcome` entirely? This method sends *into* + # the mngr this `Outcome.value`; seems like kinda + # weird semantics for our purposes? + # outcome.send(mngr) + + except StopIteration: + return + else: + raise RuntimeError(f"{mngr} didn't stop!") + + to_return = await n.start(_start_wrapped_in_scope) + assert to_return is not None - # invoke underlying func now that cs is entered. - outcome = await acapture(async_fn, *args) + # TODO: use the fancy type-check-time type signature stuff from + # mypy i guess..to like, relay the type of whatever the + # generator yielded through? betcha that'll be un-grokable XD + return to_return - # TODO: instead, mngr.send(outcome) so that we don't - # tie this `.start_soon()` impl to the - # `MaybeOutcome` type? Use `Outcome.send(mngr)` - # right? - maybe_outcome._set_outcome(outcome) - await n.start(_start_wrapped_in_scope) - assert to_return is not None - # TODO: better way to concat the values delivered by the user - # provided `.scope_manager` and the outcome? - return tuple([maybe_outcome] + to_return) +# TODO: you could wrap your output task handle in this? +# class TaskHandle(Struct): +# task: Task +# cs: CancelScope +# outcome: TaskOutcome # TODO: maybe just make this a generator with a single yield that also # delivers a value (of some std type) from the yield expression? -# @trio.cancel_scope_manager -@cm +# @trio.task_scope_manager def add_task_handle_and_crash_handling( nursery: Nursery, scope: CancelScope, - maybe_outcome: MaybeOutcome, ) -> Generator[None, list[Any]]: cs: CancelScope = CancelScope() + task_outcome = TaskOutcome() # if you need it you can ask trio for the task obj task: Task = trio.lowlevel.current_task() @@ -197,12 +227,11 @@ def add_task_handle_and_crash_handling( try: # yields back when task is terminated, cancelled, returns? with cs: - # the yielded values here are what are returned to the - # nursery's `.start_soon()` caller - # TODO: actually make this work so that `MaybeOutcome` isn't - # tied to the impl of `.start_soon()` on our custom nursery! - task_outcome: Outcome = yield [cs] + # the yielded value(s) here are what are returned to the + # nursery's `.start_soon()` caller B) + lowlevel_outcome: Outcome = yield (task_outcome, cs) + task_outcome._set_outcome(lowlevel_outcome) except Exception as err: # Adds "crash handling" from `pdbp` by entering @@ -247,7 +276,7 @@ async def main(): val: str = 'yoyoyo' val_outcome, cs = await sn.start_soon(sleep_then_return_val, val) - res = await val_outcome.unwrap() + res = await val_outcome.wait_for_result() assert res == val print(f'GOT EXPECTED TASK VALUE: {res}') From f23b5b89dd090599ae53da1c1d25f14928cdbd0d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 19 May 2023 13:34:00 -0400 Subject: [PATCH 4/8] Facepalm, don't pass in unecessary cancel scope --- tractor/trionics/_supervisor.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tractor/trionics/_supervisor.py b/tractor/trionics/_supervisor.py index 2a664a9f..dcd20502 100644 --- a/tractor/trionics/_supervisor.py +++ b/tractor/trionics/_supervisor.py @@ -136,7 +136,7 @@ async def start_soon( else: # NOTE: what do we enforce as a signature for the # `@task_scope_manager` here? - mngr = sm(nursery=n, scope=cs) + mngr = sm(nursery=n) async def _start_wrapped_in_scope( task_status: TaskStatus[ @@ -213,11 +213,9 @@ async def _start_wrapped_in_scope( # @trio.task_scope_manager def add_task_handle_and_crash_handling( nursery: Nursery, - scope: CancelScope, ) -> Generator[None, list[Any]]: - cs: CancelScope = CancelScope() task_outcome = TaskOutcome() # if you need it you can ask trio for the task obj @@ -226,7 +224,7 @@ def add_task_handle_and_crash_handling( try: # yields back when task is terminated, cancelled, returns? - with cs: + with CancelScope() as cs: # the yielded value(s) here are what are returned to the # nursery's `.start_soon()` caller B) From 56882b680cc9b29cece16f8c6d8188304765b652 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 19 May 2023 14:03:07 -0400 Subject: [PATCH 5/8] Ensure user-allocated cancel scope just works! Turns out the nursery doesn't have to care about allocating a per task `CancelScope` since the user can just do that in the `@task_scope_manager` if desired B) So just mask all the nursery cs allocating with the intention of removal. Also add a test for per-task-cancellation by starting the crash task as a `trio.sleep_forever()` but then cancel it via the user allocated cs and ensure the crash propagates as expected :boom: --- tractor/trionics/_supervisor.py | 146 +++++++++++++++++++------------- 1 file changed, 89 insertions(+), 57 deletions(-) diff --git a/tractor/trionics/_supervisor.py b/tractor/trionics/_supervisor.py index dcd20502..d23e1df8 100644 --- a/tractor/trionics/_supervisor.py +++ b/tractor/trionics/_supervisor.py @@ -126,18 +126,23 @@ async def start_soon( # task = new_tasks.pop() n: Nursery = self._n - cs = CancelScope() - new_task: Task | None = None - to_return: tuple[Any] | None = None sm = self.scope_manager + # we do default behavior of a scope-per-nursery + # if the user did not provide a task manager. if sm is None: - mngr = nullcontext([cs]) - else: - # NOTE: what do we enforce as a signature for the - # `@task_scope_manager` here? - mngr = sm(nursery=n) + return n.start_soon(async_fn, *args, name=None) + # per_task_cs = CancelScope() + new_task: Task | None = None + to_return: tuple[Any] | None = None + + # NOTE: what do we enforce as a signature for the + # `@task_scope_manager` here? + mngr = sm( + nursery=n, + # scope=per_task_cs, + ) async def _start_wrapped_in_scope( task_status: TaskStatus[ tuple[CancelScope, Task] @@ -148,48 +153,49 @@ async def _start_wrapped_in_scope( # TODO: this was working before?! # nonlocal to_return - with cs: - - task = trio.lowlevel.current_task() - self._scopes[cs] = task - - # execute up to the first yield - try: - to_return: tuple[Any] = next(mngr) - except StopIteration: - raise RuntimeError("task manager didn't yield") from None - - # TODO: how do we support `.start()` style? - # - relay through whatever the - # started task passes back via `.started()` ? - # seems like that won't work with also returning - # a "task handle"? - # - we were previously binding-out this `to_return` to - # the parent's lexical scope, why isn't that working - # now? - task_status.started(to_return) - - # invoke underlying func now that cs is entered. - outcome = await acapture(async_fn, *args) - - # execute from the 1st yield to return and expect - # generator-mngr `@task_scope_manager` thinger to - # terminate! - try: - mngr.send(outcome) - - # NOTE: this will instead send the underlying - # `.value`? Not sure if that's better or not? - # I would presume it's better to have a handle to - # the `Outcome` entirely? This method sends *into* - # the mngr this `Outcome.value`; seems like kinda - # weird semantics for our purposes? - # outcome.send(mngr) - - except StopIteration: - return - else: - raise RuntimeError(f"{mngr} didn't stop!") + task = trio.lowlevel.current_task() + # self._scopes[per_task_cs] = task + + # NOTE: we actually don't need this since the user can + # just to it themselves inside mngr! + # with per_task_cs: + + # execute up to the first yield + try: + to_return: tuple[Any] = next(mngr) + except StopIteration: + raise RuntimeError("task manager didn't yield") from None + + # TODO: how do we support `.start()` style? + # - relay through whatever the + # started task passes back via `.started()` ? + # seems like that won't work with also returning + # a "task handle"? + # - we were previously binding-out this `to_return` to + # the parent's lexical scope, why isn't that working + # now? + task_status.started(to_return) + + # invoke underlying func now that cs is entered. + outcome = await acapture(async_fn, *args) + + # execute from the 1st yield to return and expect + # generator-mngr `@task_scope_manager` thinger to + # terminate! + try: + mngr.send(outcome) + + + # I would presume it's better to have a handle to + # the `Outcome` entirely? This method sends *into* + # the mngr this `Outcome.value`; seems like kinda + # weird semantics for our purposes? + # outcome.send(mngr) + + except StopIteration: + return + else: + raise RuntimeError(f"{mngr} didn't stop!") to_return = await n.start(_start_wrapped_in_scope) assert to_return is not None @@ -200,7 +206,6 @@ async def _start_wrapped_in_scope( return to_return - # TODO: you could wrap your output task handle in this? # class TaskHandle(Struct): # task: Task @@ -214,6 +219,11 @@ async def _start_wrapped_in_scope( def add_task_handle_and_crash_handling( nursery: Nursery, + # TODO: is this the only way we can have a per-task scope + # allocated or can we allow the user to somehow do it if + # they want below? + # scope: CancelScope, + ) -> Generator[None, list[Any]]: task_outcome = TaskOutcome() @@ -222,8 +232,12 @@ def add_task_handle_and_crash_handling( task: Task = trio.lowlevel.current_task() print(f'Spawning task: {task.name}') + # yields back when task is terminated, cancelled, returns. try: - # yields back when task is terminated, cancelled, returns? + # XXX: wait, this isn't doing anything right since we'd have to + # manually activate this scope using something like: + # `task._activate_cancel_status(cs._cancel_status)` ?? + # oh wait, but `.__enter__()` does all that already? with CancelScope() as cs: # the yielded value(s) here are what are returned to the @@ -260,6 +274,19 @@ async def sleep_then_return_val(val: str): return val +async def ensure_cancelled(): + try: + await trio.sleep_forever() + + except trio.Cancelled: + task = trio.lowlevel.current_task() + print(f'heyyo ONLY {task.name} was cancelled as expected B)') + assert 0 + + except BaseException: + raise RuntimeError("woa woa woa this ain't right!") + + if __name__ == '__main__': async def main(): @@ -267,17 +294,22 @@ async def main(): scope_manager=add_task_handle_and_crash_handling, ) as sn: for _ in range(3): - outcome, cs = await sn.start_soon(trio.sleep_forever) + outcome, _ = await sn.start_soon(trio.sleep_forever) # extra task we want to engage in debugger post mortem. - err_outcome, *_ = await sn.start_soon(sleep_then_err) + err_outcome, cs = await sn.start_soon(ensure_cancelled) val: str = 'yoyoyo' - val_outcome, cs = await sn.start_soon(sleep_then_return_val, val) + val_outcome, _ = await sn.start_soon( + sleep_then_return_val, + val, + ) res = await val_outcome.wait_for_result() assert res == val - print(f'GOT EXPECTED TASK VALUE: {res}') + print(f'{res} -> GOT EXPECTED TASK VALUE') - print('WAITING FOR CRASH..') + await trio.sleep(0.6) + print('Cancelling and waiting for CRASH..') + cs.cancel() trio.run(main) From 940e65fccf83dd4fb814c54519c4ad0708d90cda Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 19 May 2023 14:23:22 -0400 Subject: [PATCH 6/8] More refinements and proper typing - drop unneeded (and commented) internal cs allocating bits. - bypass all task manager stuff if no generator is provided by the caller; i.e. just call `.start_soon()` as normal. - fix `Generator` typing. - add some prints around task manager. - wrap in `TaskOutcome.lowlevel_task: Task`. --- tractor/trionics/_supervisor.py | 81 +++++++++++++++++---------------- 1 file changed, 43 insertions(+), 38 deletions(-) diff --git a/tractor/trionics/_supervisor.py b/tractor/trionics/_supervisor.py index d23e1df8..46a1ccdf 100644 --- a/tractor/trionics/_supervisor.py +++ b/tractor/trionics/_supervisor.py @@ -24,7 +24,10 @@ contextmanager as cm, nullcontext, ) -from typing import ContextManager +from typing import ( + Generator, + Any, +) from outcome import ( Outcome, @@ -47,6 +50,7 @@ class TaskOutcome(Struct): to the eventual boxed result/value or raised exception. ''' + lowlevel_task: Task _exited: Event = trio.Event() # as per `trio.Runner.task_exited()` _outcome: Outcome | None = None # as per `outcome.Outcome` _result: Any | None = None # the eventual maybe-returned-value @@ -105,7 +109,7 @@ class ScopePerTaskNursery(Struct): tuple[CancelScope, Outcome] ] = {} - scope_manager: ContextManager | None = None + scope_manager: Generator[Any, Outcome, None] | None = None async def start_soon( self, @@ -133,16 +137,13 @@ async def start_soon( if sm is None: return n.start_soon(async_fn, *args, name=None) - # per_task_cs = CancelScope() new_task: Task | None = None to_return: tuple[Any] | None = None # NOTE: what do we enforce as a signature for the # `@task_scope_manager` here? - mngr = sm( - nursery=n, - # scope=per_task_cs, - ) + mngr = sm(nursery=n) + async def _start_wrapped_in_scope( task_status: TaskStatus[ tuple[CancelScope, Task] @@ -153,13 +154,6 @@ async def _start_wrapped_in_scope( # TODO: this was working before?! # nonlocal to_return - task = trio.lowlevel.current_task() - # self._scopes[per_task_cs] = task - - # NOTE: we actually don't need this since the user can - # just to it themselves inside mngr! - # with per_task_cs: - # execute up to the first yield try: to_return: tuple[Any] = next(mngr) @@ -206,15 +200,9 @@ async def _start_wrapped_in_scope( return to_return -# TODO: you could wrap your output task handle in this? -# class TaskHandle(Struct): -# task: Task -# cs: CancelScope -# outcome: TaskOutcome - - -# TODO: maybe just make this a generator with a single yield that also -# delivers a value (of some std type) from the yield expression? +# TODO: define a decorator to runtime type check that this a generator +# with a single yield that also delivers a value (of some std type) from +# the yield expression? # @trio.task_scope_manager def add_task_handle_and_crash_handling( nursery: Nursery, @@ -224,20 +212,35 @@ def add_task_handle_and_crash_handling( # they want below? # scope: CancelScope, -) -> Generator[None, list[Any]]: +) -> Generator[ + Any, + Outcome, + None, +]: + ''' + A customizable, user defined "task scope manager". - task_outcome = TaskOutcome() + With this specially crafted single-yield generator function you can + add more granular controls around every task spawned by `trio` B) + ''' # if you need it you can ask trio for the task obj task: Task = trio.lowlevel.current_task() print(f'Spawning task: {task.name}') - # yields back when task is terminated, cancelled, returns. + # User defined "task handle" for more granular supervision + # of each spawned task as needed for their particular usage. + task_outcome = TaskOutcome(task) + + # NOTE: if wanted the user could wrap the output task handle however + # they want! + # class TaskHandle(Struct): + # task: Task + # cs: CancelScope + # outcome: TaskOutcome + + # this yields back when the task is terminated, cancelled or returns. try: - # XXX: wait, this isn't doing anything right since we'd have to - # manually activate this scope using something like: - # `task._activate_cancel_status(cs._cancel_status)` ?? - # oh wait, but `.__enter__()` does all that already? with CancelScope() as cs: # the yielded value(s) here are what are returned to the @@ -245,12 +248,16 @@ def add_task_handle_and_crash_handling( lowlevel_outcome: Outcome = yield (task_outcome, cs) task_outcome._set_outcome(lowlevel_outcome) + # Adds "crash handling" from `pdbp` by entering + # a REPL on std errors. except Exception as err: - # Adds "crash handling" from `pdbp` by entering - # a REPL on std errors. + print(f'{task.name} crashed, entering debugger!') pdbp.xpm() raise + finally: + print(f'{task.name} Exitted') + @acm async def open_nursery( @@ -264,11 +271,6 @@ async def open_nursery( ) -async def sleep_then_err(): - await trio.sleep(1) - assert 0 - - async def sleep_then_return_val(val: str): await trio.sleep(0.2) return val @@ -309,7 +311,10 @@ async def main(): print(f'{res} -> GOT EXPECTED TASK VALUE') await trio.sleep(0.6) - print('Cancelling and waiting for CRASH..') + print( + 'Cancelling and waiting on {err_outcome.lowlevel_task} ' + 'to CRASH..' + ) cs.cancel() trio.run(main) From e0c888fd5c822299ae4ba344c90ca10efc398f56 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 19 May 2023 14:49:10 -0400 Subject: [PATCH 7/8] Go all in on "task manager" naming --- tractor/trionics/_supervisor.py | 37 +++++++++++++-------------------- 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/tractor/trionics/_supervisor.py b/tractor/trionics/_supervisor.py index 46a1ccdf..2a5f73e1 100644 --- a/tractor/trionics/_supervisor.py +++ b/tractor/trionics/_supervisor.py @@ -22,7 +22,6 @@ from contextlib import ( asynccontextmanager as acm, contextmanager as cm, - nullcontext, ) from typing import ( Generator, @@ -51,7 +50,7 @@ class TaskOutcome(Struct): ''' lowlevel_task: Task - _exited: Event = trio.Event() # as per `trio.Runner.task_exited()` + _exited = trio.Event() # as per `trio.Runner.task_exited()` _outcome: Outcome | None = None # as per `outcome.Outcome` _result: Any | None = None # the eventual maybe-returned-value @@ -63,9 +62,8 @@ def result(self) -> Any: ''' if self._outcome is None: raise RuntimeError( - # f'Task {task.name} is not complete.\n' - f'Outcome is not complete.\n' - 'wait on `await TaskOutcome.wait_for_result()` first!' + f'Task {self.lowlevel_task.name} is not complete.\n' + 'First wait on `await TaskOutcome.wait_for_result()`!' ) return self._result @@ -102,14 +100,14 @@ async def wait_for_result(self) -> Any: return self.result -class ScopePerTaskNursery(Struct): +class TaskManagerNursery(Struct): _n: Nursery _scopes: dict[ Task, tuple[CancelScope, Outcome] ] = {} - scope_manager: Generator[Any, Outcome, None] | None = None + task_manager: Generator[Any, Outcome, None] | None = None async def start_soon( self, @@ -117,7 +115,7 @@ async def start_soon( *args, name=None, - scope_manager: ContextManager | None = None, + task_manager: Generator[Any, Outcome, None] | None = None ) -> tuple[CancelScope, Task]: @@ -131,7 +129,7 @@ async def start_soon( n: Nursery = self._n - sm = self.scope_manager + sm = self.task_manager # we do default behavior of a scope-per-nursery # if the user did not provide a task manager. if sm is None: @@ -151,7 +149,8 @@ async def _start_wrapped_in_scope( ) -> None: - # TODO: this was working before?! + # TODO: this was working before?! and, do we need something + # like it to implement `.start()`? # nonlocal to_return # execute up to the first yield @@ -203,15 +202,10 @@ async def _start_wrapped_in_scope( # TODO: define a decorator to runtime type check that this a generator # with a single yield that also delivers a value (of some std type) from # the yield expression? -# @trio.task_scope_manager +# @trio.task_manager def add_task_handle_and_crash_handling( nursery: Nursery, - # TODO: is this the only way we can have a per-task scope - # allocated or can we allow the user to somehow do it if - # they want below? - # scope: CancelScope, - ) -> Generator[ Any, Outcome, @@ -261,14 +255,11 @@ def add_task_handle_and_crash_handling( @acm async def open_nursery( - scope_manager = None, + task_manager = None, **kwargs, ): async with trio.open_nursery(**kwargs) as nurse: - yield ScopePerTaskNursery( - nurse, - scope_manager=scope_manager, - ) + yield TaskManagerNursery(nurse, task_manager=task_manager) async def sleep_then_return_val(val: str): @@ -293,7 +284,7 @@ async def ensure_cancelled(): async def main(): async with open_nursery( - scope_manager=add_task_handle_and_crash_handling, + task_manager=add_task_handle_and_crash_handling, ) as sn: for _ in range(3): outcome, _ = await sn.start_soon(trio.sleep_forever) @@ -312,7 +303,7 @@ async def main(): await trio.sleep(0.6) print( - 'Cancelling and waiting on {err_outcome.lowlevel_task} ' + f'Cancelling and waiting on {err_outcome.lowlevel_task} ' 'to CRASH..' ) cs.cancel() From c7e27ad09d77a9f20396295d85896d6ec4ebafa5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 19 May 2023 15:51:47 -0400 Subject: [PATCH 8/8] Add `debug_mode: bool` control to task mngr Allows dynamically importing `pdbp` when enabled and a way for eventually linking with `tractor`'s own debug mode flag. --- tractor/trionics/_supervisor.py | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/tractor/trionics/_supervisor.py b/tractor/trionics/_supervisor.py index 2a5f73e1..1012571d 100644 --- a/tractor/trionics/_supervisor.py +++ b/tractor/trionics/_supervisor.py @@ -23,6 +23,7 @@ asynccontextmanager as acm, contextmanager as cm, ) +from functools import partial from typing import ( Generator, Any, @@ -32,7 +33,6 @@ Outcome, acapture, ) -import pdbp from msgspec import Struct import trio from trio._core._run import ( @@ -206,6 +206,8 @@ async def _start_wrapped_in_scope( def add_task_handle_and_crash_handling( nursery: Nursery, + debug_mode: bool = False, + ) -> Generator[ Any, Outcome, @@ -246,7 +248,9 @@ def add_task_handle_and_crash_handling( # a REPL on std errors. except Exception as err: print(f'{task.name} crashed, entering debugger!') - pdbp.xpm() + if debug_mode: + import pdbp + pdbp.xpm() raise finally: @@ -255,11 +259,15 @@ def add_task_handle_and_crash_handling( @acm async def open_nursery( - task_manager = None, - **kwargs, + task_manager: Generator[Any, Outcome, None] | None = None, + + **lowlevel_nursery_kwargs, ): - async with trio.open_nursery(**kwargs) as nurse: - yield TaskManagerNursery(nurse, task_manager=task_manager) + async with trio.open_nursery(**lowlevel_nursery_kwargs) as nurse: + yield TaskManagerNursery( + nurse, + task_manager=task_manager, + ) async def sleep_then_return_val(val: str): @@ -284,7 +292,10 @@ async def ensure_cancelled(): async def main(): async with open_nursery( - task_manager=add_task_handle_and_crash_handling, + task_manager=partial( + add_task_handle_and_crash_handling, + debug_mode=True, + ), ) as sn: for _ in range(3): outcome, _ = await sn.start_soon(trio.sleep_forever)