Skip to content

Commit

Permalink
Switch to asyncio.TaskGroup (#214)
Browse files Browse the repository at this point in the history
* Switch to asyncio.TaskGroup

* Fix
  • Loading branch information
Pliner authored Sep 4, 2024
1 parent c9183ea commit 1d023fe
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* [Support python 3.12 and end support of 3.8](https://github.com/anna-money/asyncpg-listen/pull/211)
* [End support of python 3.9 and 3.10](https://github.com/anna-money/asyncpg-listen/pull/215)
* [Eagerly start notification processing with Python 3.12+](https://github.com/anna-money/asyncpg-listen/pull/212)
* [Switch to asyncio.TaskGroup](https://github.com/anna-money/asyncpg-listen/pull/214)


## v0.0.6 (2022-11-02)
Expand Down
47 changes: 16 additions & 31 deletions asyncpg_listen/listener.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import contextlib
import dataclasses
import enum
import logging
Expand Down Expand Up @@ -64,38 +63,24 @@ async def run(
queue_per_channel: dict[str, asyncio.Queue[Notification]] = {
channel: asyncio.Queue() for channel in handler_per_channel.keys()
}

read_notifications_task = asyncio.create_task(
self._read_notifications(
queue_per_channel=queue_per_channel, check_interval=max(1.0, notification_timeout / 3.0)
),
name=__package__,
)
process_notifications_tasks = [
asyncio.create_task(
self._process_notifications(
channel,
notifications=queue_per_channel[channel],
handler=handler,
policy=policy,
notification_timeout=notification_timeout,
async with asyncio.TaskGroup() as tg:
tg.create_task(
self._read_notifications(
queue_per_channel=queue_per_channel, check_interval=max(1.0, notification_timeout / 3.0)
),
name=f"{__package__}.{channel}",
name=__package__,
)
for channel, handler in handler_per_channel.items()
]
try:
await asyncio.gather(read_notifications_task, *process_notifications_tasks)
finally:
await self._cancel_and_await_tasks([read_notifications_task, *process_notifications_tasks])

@staticmethod
async def _cancel_and_await_tasks(tasks: list[asyncio.Task[None]]) -> None:
for t in tasks:
t.cancel()
for t in tasks:
with contextlib.suppress(asyncio.CancelledError):
await t
for channel, handler in handler_per_channel.items():
tg.create_task(
self._process_notifications(
channel,
notifications=queue_per_channel[channel],
handler=handler,
policy=policy,
notification_timeout=notification_timeout,
),
name=f"{__package__}.{channel}",
)

async def _process_notifications(
self,
Expand Down
11 changes: 4 additions & 7 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,10 @@ async def _handle_client(

self.connections.add(server_writer)
self.connections.add(client_writer)

await asyncio.wait(
[
asyncio.ensure_future(self._pipe(server_reader, client_writer)),
asyncio.ensure_future(self._pipe(client_reader, server_writer)),
]
)

async with asyncio.TaskGroup() as tg:
tg.create_task(self._pipe(server_reader, client_writer))
tg.create_task(self._pipe(client_reader, server_writer))


@pytest.fixture
Expand Down

0 comments on commit 1d023fe

Please sign in to comment.