Skip to content

Commit

Permalink
Add context test that opens an inter-task-channel that errors
Browse files Browse the repository at this point in the history
  • Loading branch information
goodboy committed Jul 14, 2022
1 parent efa7cb5 commit b767377
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 3 deletions.
1 change: 1 addition & 0 deletions examples/infected_asyncio_echo_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
async def aio_echo_server(
to_trio: trio.MemorySendChannel,
from_trio: asyncio.Queue,

) -> None:

# a first message must be sent **from** this ``asyncio``
Expand Down
97 changes: 94 additions & 3 deletions tests/test_infected_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,26 @@
import pytest
import trio
import tractor
from tractor import to_asyncio
from tractor import RemoteActorError
from tractor import (
to_asyncio,
RemoteActorError,
ContextCancelled
)
from tractor.trionics import BroadcastReceiver


async def sleep_and_err(sleep_for: float = 0.1):
async def sleep_and_err(
sleep_for: float = 0.1,

# just signature placeholders for compat with
# ``to_asyncio.open_channel_from()``
to_trio: Optional[trio.MemorySendChannel] = None,
from_trio: Optional[asyncio.Queue] = None,

):
if to_trio:
to_trio.send_nowait('start')

await asyncio.sleep(sleep_for)
assert 0

Expand Down Expand Up @@ -146,6 +160,81 @@ async def main():
trio.run(main)


@tractor.context
async def trio_ctx(
ctx: tractor.Context,
):

await ctx.started('start')

# this will block until the ``asyncio`` task sends a "first"
# message.
with trio.fail_after(0.5):
async with (
tractor.to_asyncio.open_channel_from(
sleep_and_err,
) as (first, chan),

trio.open_nursery() as n,
):

assert first == 'start'

# spawn another asyncio task for the cuck of it.
n.start_soon(
tractor.to_asyncio.run_task,
sleep_forever,
)
# await trio.sleep_forever()


@pytest.mark.parametrize(
'parent_cancels', [False, True],
ids='parent_actor_cancels_child={}'.format
)
def test_context_spawns_aio_task_that_errors(
arb_addr,
parent_cancels: bool,
):
'''
Verify that spawning a task via an intertask channel ctx mngr that
errors correctly propagates the error back from the `asyncio`-side
taksk.
'''
async def main():

async with tractor.open_nursery() as n:
p = await n.start_actor(
'aio_daemon',
enable_modules=[__name__],
infect_asyncio=True,
# debug_mode=True,
loglevel='cancel',
)
async with p.open_context(
trio_ctx,
) as (ctx, first):

assert first == 'start'

if parent_cancels:
await p.cancel_actor()

await trio.sleep_forever()


with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)

err = excinfo.value
assert isinstance(err, RemoteActorError)
if parent_cancels:
assert err.type == trio.Cancelled
else:
assert err.type == AssertionError


async def aio_cancel():
''''
Cancel urself boi.
Expand Down Expand Up @@ -385,6 +474,8 @@ async def aio_echo_server(
print('breaking aio echo loop')
break

print('exiting asyncio task')

async with to_asyncio.open_channel_from(
aio_echo_server,
) as (first, chan):
Expand Down

0 comments on commit b767377

Please sign in to comment.