Skip to content

Commit

Permalink
Move otel support to a separate package (#216)
Browse files Browse the repository at this point in the history
* Move otel support to a separate package

https://github.com/anna-money/opentelemetry-instrumentation-asyncpg-listen

* Remove otel from changelog

* Use enum.auto
  • Loading branch information
Pliner authored Sep 7, 2024
1 parent 1d023fe commit 10516bc
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 58 deletions.
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
## unreleased

* [Implement metrics/tracing on top of OpenTelemetry](https://github.com/anna-money/asyncpg-listen/pull/199)
* [Support python 3.12 and end support of 3.8](https://github.com/anna-money/asyncpg-listen/pull/211)
* [End support of python 3.9 and 3.10](https://github.com/anna-money/asyncpg-listen/pull/215)
* [Eagerly start notification processing with Python 3.12+](https://github.com/anna-money/asyncpg-listen/pull/212)
Expand Down
74 changes: 20 additions & 54 deletions asyncpg_listen/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,16 @@
import enum
import logging
import sys
import time
from typing import Any, Callable, Coroutine

import asyncpg
import opentelemetry.metrics
import opentelemetry.trace

logger = logging.getLogger(__package__)


class ListenPolicy(enum.StrEnum):
ALL = "ALL"
LAST = "LAST"
ALL = enum.auto()
LAST = enum.auto()


@dataclasses.dataclass(frozen=True, slots=True)
Expand Down Expand Up @@ -44,14 +41,14 @@ async def _connect() -> asyncpg.Connection:


class NotificationListener:
__slots__ = ("_connect", "_reconnect_delay", "_tracer", "_meter", "_notification_histogram")
__slots__ = (
"__connect",
"__reconnect_delay",
)

def __init__(self, connect: ConnectFunc, reconnect_delay: float = 5) -> None:
self._reconnect_delay = reconnect_delay
self._connect = connect
self._tracer = None
self._meter = None
self._notification_histogram = None
self.__reconnect_delay = reconnect_delay
self.__connect = connect

async def run(
self,
Expand All @@ -65,14 +62,14 @@ async def run(
}
async with asyncio.TaskGroup() as tg:
tg.create_task(
self._read_notifications(
self.__read_notifications(
queue_per_channel=queue_per_channel, check_interval=max(1.0, notification_timeout / 3.0)
),
name=__package__,
)
for channel, handler in handler_per_channel.items():
tg.create_task(
self._process_notifications(
self.__process_notifications(
channel,
notifications=queue_per_channel[channel],
handler=handler,
Expand All @@ -82,15 +79,17 @@ async def run(
name=f"{__package__}.{channel}",
)

async def _process_notifications(
self,
@staticmethod
async def __process_notifications(
channel: str,
*,
notifications: asyncio.Queue[Notification],
handler: NotificationHandler,
policy: ListenPolicy,
notification_timeout: float,
) -> None:
# to have independent async context per run
# to protect from misuse of contextvars
if sys.version_info >= (3, 12):
loop = asyncio.get_running_loop()

Expand Down Expand Up @@ -124,23 +123,21 @@ async def run_coro(c: Coroutine) -> None:
continue

try:
# to have independent async context per run
# to protect from misuse of contextvars
await run_coro(self._process_notification(handler, notification))
await run_coro(handler(notification))
except Exception:
logger.exception("Failed to handle %s", notification)

async def _read_notifications(
async def __read_notifications(
self, queue_per_channel: dict[str, asyncio.Queue[Notification]], check_interval: float
) -> None:
failed_connect_attempts = 0
while True:
try:
connection = await self._connect()
connection = await self.__connect()
failed_connect_attempts = 0
try:
for channel, queue in queue_per_channel.items():
await connection.add_listener(channel, self._get_push_callback(queue))
await connection.add_listener(channel, self.__get_push_callback(queue))

while True:
await asyncio.sleep(check_interval)
Expand All @@ -150,43 +147,12 @@ async def _read_notifications(
except Exception:
logger.exception("Connection was lost or not established")

await asyncio.sleep(self._reconnect_delay * failed_connect_attempts)
await asyncio.sleep(self.__reconnect_delay * failed_connect_attempts)
failed_connect_attempts += 1

@staticmethod
def _get_push_callback(queue: asyncio.Queue[Notification]) -> Callable[[Any, Any, Any, Any], None]:
def __get_push_callback(queue: asyncio.Queue[Notification]) -> Callable[[Any, Any, Any, Any], None]:
def _push(_: Any, __: Any, channel: Any, payload: Any) -> None:
queue.put_nowait(Notification(channel, payload))

return _push

async def _process_notification(self, handler: NotificationHandler, notification: NotificationOrTimeout) -> None:
if self._tracer is None:
self._tracer = opentelemetry.trace.get_tracer(__package__) # type: ignore
if self._meter is None:
self._meter = opentelemetry.metrics.get_meter(__package__) # type: ignore
if self._notification_histogram is None:
self._notification_histogram = self._meter.create_histogram("asyncpg_listener_latency")

start_time = time.time_ns()

with self._tracer.start_as_current_span(
name=self._get_span_name(notification),
kind=opentelemetry.trace.SpanKind.INTERNAL,
start_time=start_time,
attributes={"channel": notification.channel},
):
try:
await handler(notification)
finally:
elapsed = max(0, time.time_ns() - start_time)
self._notification_histogram.record(elapsed, {"channel": notification.channel})

@staticmethod
def _get_span_name(notification: NotificationOrTimeout) -> str:
if isinstance(notification, Timeout):
return f"Notification timeout #{notification.channel}"
if isinstance(notification, Notification):
return f"Notification #{notification.channel}"

raise TypeError(f"Unexpected notification type: {type(notification)}")
1 change: 0 additions & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,3 @@ asyncpg==0.29.0
setuptools==74.0.0
wheel==0.44.0
twine==5.1.1
opentelemetry-api>=1.19
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

install_requires = [
"asyncpg>=0.27.0",
"opentelemetry-api>=1.19",
]


Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async def _handle_client(

self.connections.add(server_writer)
self.connections.add(client_writer)

async with asyncio.TaskGroup() as tg:
tg.create_task(self._pipe(server_reader, client_writer))
tg.create_task(self._pipe(client_reader, server_writer))
Expand Down

0 comments on commit 10516bc

Please sign in to comment.