Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add @background_with_channel #3197

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
6 changes: 5 additions & 1 deletion docs/source/reference-core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1607,7 +1607,11 @@ the numbers 0 through 9 with a 1-second delay before each one:

trio.run(use_it)

Trio supports async generators, with some caveats described in this section.
Trio supports async generators, but there's several caveats and it's very
hard to handle them properly. Therefore Trio bundles a helper,
`trio.background_with_channel` that does it for you.

The details on those problems are described in the following sections.

Finalization
~~~~~~~~~~~~
Expand Down
9 changes: 9 additions & 0 deletions docs/source/reference-io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,15 @@ Abstract base classes
.. currentmodule:: trio


Converting Async Generators to use streams
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Using async generators is very handy, but also very treacherous. See
:ref:`async-generators`. Therefore Trio provides a handy helper that
solves that!

.. autofunction:: trio.background_with_channel

Generic stream tools
~~~~~~~~~~~~~~~~~~~~

Expand Down
1 change: 1 addition & 0 deletions newsfragments/3197.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add :func:`@trio.background_with_channel <trio.background_with_channel>`, a wrapper that can be used to make async generators safe. See :ref:`async-generators`, `ASYNC900 <https://flake8-async.readthedocs.io/en/latest/rules.html#async900>`_, :pep:`789`, and :pep:`533`.
1 change: 1 addition & 0 deletions src/trio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
MemoryChannelStatistics as MemoryChannelStatistics,
MemoryReceiveChannel as MemoryReceiveChannel,
MemorySendChannel as MemorySendChannel,
background_with_channel as background_with_channel,
open_memory_channel as open_memory_channel,
)
from ._core import (
Expand Down
128 changes: 127 additions & 1 deletion src/trio/_channel.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
from __future__ import annotations

import sys
from collections import OrderedDict, deque
from contextlib import AbstractAsyncContextManager, asynccontextmanager
from functools import wraps
from math import inf
from typing import (
TYPE_CHECKING,
Generic,
Protocol,
TypeVar,
)

import attrs
Expand All @@ -17,9 +22,31 @@
from ._util import NoPublicConstructor, final, generic_function

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Awaitable, Callable
from types import TracebackType

from typing_extensions import Self
from typing_extensions import ParamSpec, Self

P = ParamSpec("P")

if sys.version_info >= (3, 10):
from contextlib import aclosing # new in Python 3.10
else:

class _SupportsAclose(Protocol):
def aclose(self) -> Awaitable[object]: ...

_SupportsAcloseT = TypeVar("_SupportsAcloseT", bound=_SupportsAclose)

class aclosing(AbstractAsyncContextManager[_SupportsAcloseT, None]):
def __init__(self, thing: _SupportsAcloseT) -> None:
self._aiter = thing

async def __aenter__(self) -> _SupportsAcloseT:
return self._aiter

async def __aexit__(self, *exc_info: object) -> None:
await self._aiter.aclose()


def _open_memory_channel(
Expand Down Expand Up @@ -440,3 +467,102 @@ async def aclose(self) -> None:
See `MemoryReceiveChannel.close`."""
self.close()
await trio.lowlevel.checkpoint()


def background_with_channel(max_buffer_size: float = 0) -> Callable[
[
Callable[P, AsyncGenerator[T, None]],
],
Callable[P, AbstractAsyncContextManager[trio.MemoryReceiveChannel[T]]],
]:
"""Decorate an async generator function to make it cancellation-safe.

The ``yield`` keyword offers a very convenient way to write iterators...
which makes it really unfortunate that async generators are so difficult
to call correctly. Yielding from the inside of a cancel scope or a nursery
to the outside `violates structured concurrency <https://xkcd.com/292/>`_
with consequences explained in :pep:`789`. Even then, resource cleanup
errors remain common (:pep:`533`) unless you wrap every call in
:func:`~contextlib.aclosing`.

This decorator gives you the best of both worlds: with careful exception
handling and a background task we preserve structured concurrency by
offering only the safe interface, and you can still write your iterables
with the convenience of ``yield``. For example::

@background_with_channel()
async def my_async_iterable(arg, *, kwarg=True):
while ...:
item = await ...
yield item

async with my_async_iterable(...) as recv_chan:
async for item in recv_chan:
...

While the combined async-with-async-for can be inconvenient at first,
the context manager is indispensable for both correctness and for prompt
cleanup of resources.

.. note::

With 'raw' async generators, code in the generator will never run
concurrently with that in the body of the ``async for`` loop - the
generator is resumed to compute each element on request.
Even with ``max_buffer_size=0``, a ``@background_with_channel()``
function will 'precompute' each element in a background task, and
send it to the internal channel, where it will wait until requested
by the loop.

This is rarely a problem, so we've avoided the performance cost
of exactly replicating the behavior of raw generators. If
concurrent execution would cause problems, we recommend using a
:class:`trio.Lock` around the critical sections.
"""
jakkdl marked this conversation as resolved.
Show resolved Hide resolved
# Perhaps a future PEP will adopt `async with for` syntax, like
# https://coconut.readthedocs.io/en/master/DOCS.html#async-with-for

def decorator(
fn: Callable[P, AsyncGenerator[T, None]],
) -> Callable[P, AbstractAsyncContextManager[trio.MemoryReceiveChannel[T]]]:
@asynccontextmanager
@wraps(fn)
async def context_manager(
*args: P.args, **kwargs: P.kwargs
) -> AsyncGenerator[trio.MemoryReceiveChannel[T], None]:
send_chan, recv_chan = trio.open_memory_channel[T](max_buffer_size)
async with trio.open_nursery() as nursery:
agen = fn(*args, **kwargs)
# `nursery.start` to make sure that we will clean up send_chan & ait
# If this errors we don't close `recv_chan`, but the caller
# never gets access to it, so that's not a problem.
await nursery.start(_move_elems_to_channel, agen, send_chan)
# `async with recv_chan` could eat exceptions, so use sync cm
with recv_chan:
yield recv_chan
# Return promptly, without waiting for the generator to yield the
# next value
nursery.cancel_scope.cancel()

return context_manager

async def _move_elems_to_channel(
agen: AsyncGenerator[T, None],
send_chan: trio.MemorySendChannel[T],
task_status: trio.TaskStatus,
) -> None:
# `async with send_chan` will eat exceptions,
# see https://github.com/python-trio/trio/issues/1559
with send_chan:
async with aclosing(agen):
task_status.started()
async for value in agen:
try:
# Send the value to the channel
await send_chan.send(value)
except trio.BrokenResourceError:
# Closing the corresponding receive channel should cause
# a clean shutdown of the generator.
return
Comment on lines +560 to +566
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop in my prototype evolved from this one by @oremanj, via @belm0 here.

If we could trigger it, it'd have to be a non-Cancelled non-BrokenResourceError, which occurred while waiting in await send_chan.send(value). I think we could in principle get a few things here (e.g. KeyboardInterrupt, GC-related errors, etc), but in each case calling .aclose() on the generator and raising from this function without ever throwing the error into the generator seems like a reasonable response to me.

So... I might be wrong, but this simpler code looks good to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah since your version had excepts for BrokenResourceError and Cancelled it's only niche stuff that could get sent. And I don't see any reason why the generator should generate another value after the send gets cancelled or broken since it'll just fail again.

Given that send has KI protection I don't even think it can raise KeyboardInterrupt (unless that gets raised just as the decorator exits? idk details how that works)


return decorator
113 changes: 110 additions & 3 deletions src/trio/_tests/test_channel.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from __future__ import annotations

from typing import Union
from typing import TYPE_CHECKING, Union

import pytest

import trio
from trio import EndOfChannel, open_memory_channel
from trio import EndOfChannel, background_with_channel, open_memory_channel

from ..testing import assert_checkpoints, wait_all_tasks_blocked
from ..testing import RaisesGroup, assert_checkpoints, wait_all_tasks_blocked

if TYPE_CHECKING:
from collections.abc import AsyncGenerator


async def test_channel() -> None:
Expand Down Expand Up @@ -411,3 +414,107 @@ async def do_send(s: trio.MemorySendChannel[int], v: int) -> None:
assert await r.receive() == 1
with pytest.raises(trio.WouldBlock):
r.receive_nowait()


async def test_background_with_channel() -> None:
@background_with_channel()
async def agen() -> AsyncGenerator[int]:
yield 1
await trio.sleep_forever() # simulate deadlock

async with agen() as recv_chan:
async for x in recv_chan:
assert x == 1
break # exit, cleanup should be quick


async def test_background_with_channel_exhaust() -> None:
@background_with_channel()
async def agen() -> AsyncGenerator[int]:
yield 1

async with agen() as recv_chan:
async for x in recv_chan:
assert x == 1


async def test_background_with_channel_broken_resource() -> None:
@background_with_channel()
async def agen() -> AsyncGenerator[int]:
yield 1
yield 2

async with agen() as recv_chan:
assert await recv_chan.__anext__() == 1

# close the receiving channel
await recv_chan.aclose()

# trying to get the next element errors
with pytest.raises(trio.ClosedResourceError):
await recv_chan.__anext__()

# but we don't get an error on exit of the cm


async def test_background_with_channel_cancelled() -> None:
with trio.CancelScope() as cs:

@background_with_channel()
async def agen() -> AsyncGenerator[int]:
yield 1
yield 1

async with agen():
cs.cancel()


async def test_background_with_channel_no_race() -> None:
# this previously led to a race condition due to
# https://github.com/python-trio/trio/issues/1559
@background_with_channel()
async def agen() -> AsyncGenerator[int]:
yield 1
raise ValueError("oae")

with RaisesGroup(ValueError):
async with agen() as recv_chan:
async for x in recv_chan:
assert x == 1


async def test_background_with_channel_buffer_size_too_small(
autojump_clock: trio.testing.MockClock,
) -> None:
@background_with_channel(0)
async def agen() -> AsyncGenerator[int]:
yield 1
yield 2
raise AssertionError(
"buffer size 0 means we shouldn't be asked for another value"
) # pragma: no cover

with trio.move_on_after(5):
async with agen() as recv_chan:
async for x in recv_chan:
assert x == 1
await trio.sleep_forever()


async def test_background_with_channel_buffer_size_just_right(
autojump_clock: trio.testing.MockClock,
) -> None:
event = trio.Event()

@background_with_channel(2)
async def agen() -> AsyncGenerator[int]:
yield 1
yield 2
event.set()

async with agen() as recv_chan:
await event.wait()
assert await recv_chan.__anext__() == 1
assert await recv_chan.__anext__() == 2
with pytest.raises(StopAsyncIteration):
await recv_chan.__anext__()