From 9e23d75850155bf2e8a196be4b52f6a1c1fe60e7 Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Mon, 24 Jul 2023 17:02:27 +0200 Subject: [PATCH 1/9] chg: [serializer] add support for phoenix version 2 serialization --- realtime/channel.py | 6 +++++- realtime/connection.py | 31 +++++++++++++++++++++---------- realtime/message.py | 3 ++- 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/realtime/channel.py b/realtime/channel.py index e4ac908..ba59a71 100644 --- a/realtime/channel.py +++ b/realtime/channel.py @@ -3,6 +3,7 @@ import asyncio import json from typing import Any, List, Dict, TYPE_CHECKING, NamedTuple +from realtime.message import * from realtime.types import Callback @@ -51,8 +52,11 @@ async def _join(self) -> None: Coroutine that attempts to join Phoenix Realtime server via a certain topic :return: None """ - join_req = dict(topic=self.topic, event="phx_join", + if self.socket.version == 1: + join_req = dict(topic=self.topic, event="phx_join", payload={}, ref=None) + elif self.socket.version == 2: + join_req = [None, None, self.topic, ChannelEvents.join, {}] try: await self.socket.ws_connection.send(json.dumps(join_req)) diff --git a/realtime/connection.py b/realtime/connection.py index cc017a3..37f9b04 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -31,14 +31,15 @@ def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> T_Retval: class Socket: - def __init__(self, url: str, auto_reconnect: bool = False, params: Dict[str, Any] = {}, hb_interval: int = 5) -> None: + def __init__(self, url: str, auto_reconnect: bool = False, params: Dict[str, Any] = {}, hb_interval: int = 30, version: int = 2) -> None: """ `Socket` is the abstraction for an actual socket connection that receives and 'reroutes' `Message` according to its `topic` and `event`. Socket-Channel has a 1-many relationship. Socket-Topic has a 1-many relationship. :param url: Websocket URL of the Realtime server. starts with `ws://` or `wss://` :param params: Optional parameters for connection. - :param hb_interval: WS connection is kept alive by sending a heartbeat message. Optional, defaults to 5. + :param hb_interval: WS connection is kept alive by sending a heartbeat message. Optional, defaults to 30. + :param version: phoenix JSON serializer version. """ self.url = url self.channels = defaultdict(list) @@ -48,6 +49,7 @@ def __init__(self, url: str, auto_reconnect: bool = False, params: Dict[str, Any self.ws_connection: websockets.client.WebSocketClientProtocol self.kept_alive = False self.auto_reconnect = auto_reconnect + self.version = version self.channels: DefaultDict[str, List[Channel]] = defaultdict(list) @@ -70,7 +72,11 @@ async def _listen(self) -> None: while True: try: msg = await self.ws_connection.recv() - msg = Message(**json.loads(msg)) + if self.version == 1 : + msg = Message(**json.loads(msg)) + elif self.version == 2: + msg_array = json.loads(msg) + msg = Message(chanid=msg_array[0], ref= msg_array[1], topic=msg_array[2], event= msg_array[3], payload= msg_array[4]) if msg.event == ChannelEvents.reply: continue @@ -115,12 +121,17 @@ async def _keep_alive(self) -> None: """ while True: try: - data = dict( - topic=PHOENIX_CHANNEL, - event=ChannelEvents.heartbeat, - payload=HEARTBEAT_PAYLOAD, - ref=None, - ) + if self.version == 1 : + data = dict( + topic=PHOENIX_CHANNEL, + event=ChannelEvents.heartbeat, + payload=HEARTBEAT_PAYLOAD, + ref=None, + ) + elif self.version == 2 : + # [null,"4","phoenix","heartbeat",{}] + data = [None, None, PHOENIX_CHANNEL, ChannelEvents.heartbeat, HEARTBEAT_PAYLOAD] + await self.ws_connection.send(json.dumps(data)) await asyncio.sleep(self.hb_interval) except websockets.exceptions.ConnectionClosed: @@ -150,4 +161,4 @@ def summary(self) -> None: for topic, chans in self.channels.items(): for chan in chans: print( - f"Topic: {topic} | Events: {[e for e, _ in chan.callbacks]}]") + f"Topic: {topic} | Events: {[e for e, _ in chan.listeners]}]") diff --git a/realtime/message.py b/realtime/message.py index 9909d4d..df890b6 100644 --- a/realtime/message.py +++ b/realtime/message.py @@ -11,6 +11,7 @@ class Message: event: str payload: Dict[str, Any] ref: Any + chanid: Any topic: str def __hash__(self): @@ -32,4 +33,4 @@ class ChannelEvents(str, Enum): PHOENIX_CHANNEL = "phoenix" -HEARTBEAT_PAYLOAD = {"msg": "ping"} +HEARTBEAT_PAYLOAD = {} From e5bd3330cd06bb8ca922279cfc8948a4c4b94cf5 Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Wed, 4 Oct 2023 12:44:39 +0200 Subject: [PATCH 2/9] add: [send] sending to the channel poc --- realtime/channel.py | 61 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/realtime/channel.py b/realtime/channel.py index ba59a71..eb5d9c1 100644 --- a/realtime/channel.py +++ b/realtime/channel.py @@ -53,7 +53,7 @@ async def _join(self) -> None: :return: None """ if self.socket.version == 1: - join_req = dict(topic=self.topic, event="phx_join", + join_req = dict(topic=self.topic, event=ChannelEvents.join, payload={}, ref=None) elif self.socket.version == 2: join_req = [None, None, self.topic, ChannelEvents.join, {}] @@ -64,6 +64,33 @@ async def _join(self) -> None: print(str(e)) # TODO: better error propagation return + def leave(self) -> Channel: + """ + Wrapper for async def _leave() to expose a non-async interface + Essentially gets the only event loop and attempt leaving a topic + :return: Channel + """ + loop = asyncio.get_event_loop() # TODO: replace with get_running_loop + loop.run_until_complete(self._join()) + return self + + async def _leave(self) -> None: + """ + Coroutine that attempts to join Phoenix Realtime server via a certain topic + :return: None + """ + if self.socket.version == 1: + join_req = dict(topic=self.topic, event=ChannelEvents.leave, + payload={}, ref=None) + elif self.socket.version == 2: + join_req = [None, None, self.topic, ChannelEvents.leave, {}] + + try: + await self.socket.ws_connection.send(json.dumps(join_req)) + except Exception as e: + print(str(e)) # TODO: better error propagation + return + def on(self, event: str, callback: Callback) -> Channel: """ :param event: A specific event will have a specific callback @@ -81,3 +108,35 @@ def off(self, event: str) -> None: """ self.listeners = [ callback for callback in self.listeners if callback.event != event] + + def send(self, event_name: str, payload:str) -> None: + """ + Wrapper for async def _send() to expose a non-async interface + Essentially gets the only event loop and attempt sending a payload + to a topic + :param event_name: The event_name: it must match the first argument of a handle_in function on the server channel module. + :param payload: The payload to be sent to the phoenix server + :return: None + """ + loop = asyncio.get_event_loop() # TODO: replace with get_running_loop + loop.run_until_complete(self._send(event_name, payload)) + return self + + async def _send(self, event_name: str, payload:str) -> None: + """ + Coroutine that attempts to join Phoenix Realtime server via a certain topic + :param event_name: The event_name: it must match the first argument of a handle_in function on the server channel module. + :param payload: The payload to be sent to the phoenix server + :return: None + """ + if self.socket.version == 1: + msg = dict(topic=self.topic, event=event_name, + payload=payload, ref=None) + elif self.socket.version == 2: + msg = [3, 3, self.topic, event_name, payload] + + try: + await self.socket.ws_connection.send(json.dumps(msg)) + except Exception as e: + print(str(e)) # TODO: better error propagation + return From 93bf4987c853b8e72a5e57821d8f2b2d7f8f00f7 Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Wed, 4 Oct 2023 16:57:51 +0200 Subject: [PATCH 3/9] add: [send] listener on a msg ref --- poetry.lock | 14 ++++++++++++-- pyproject.toml | 1 + realtime/channel.py | 32 +++++++++++++++++++++----------- realtime/connection.py | 17 +++++++++++++---- realtime/message.py | 2 +- 5 files changed, 48 insertions(+), 18 deletions(-) diff --git a/poetry.lock b/poetry.lock index a74d67a..029988b 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. [[package]] name = "colorama" @@ -131,6 +131,16 @@ files = [ {file = "typing_extensions-4.7.1.tar.gz", hash = "sha256:b75ddc264f0ba5615db7ba217daeb99701ad295353c45f9e95963337ceeeffb2"}, ] +[[package]] +name = "uuid" +version = "1.30" +description = "UUID object and generation functions (Python 2.3 or higher)" +optional = false +python-versions = "*" +files = [ + {file = "uuid-1.30.tar.gz", hash = "sha256:1f87cc004ac5120466f36c5beae48b4c48cc411968eed0eaecd3da82aa96193f"}, +] + [[package]] name = "websockets" version = "11.0.3" @@ -213,4 +223,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.8" -content-hash = "542d5628e562e1d06aba9216b8b4308a7d5723a64a146f5311aae7b18a9e69e5" +content-hash = "d5bdcceb9e4ab6423b4c727ea2a0b4cde830fa00fa0a9f9d72f4e0d9fad77f9e" diff --git a/pyproject.toml b/pyproject.toml index 59adaef..0fc4d8e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ python = "^3.8" websockets = "^11.0" python-dateutil = "^2.8.1" typing-extensions = "^4.2.0" +uuid = "^1.30" [tool.poetry.dev-dependencies] pytest = "^7.2.0" diff --git a/realtime/channel.py b/realtime/channel.py index eb5d9c1..3413d34 100644 --- a/realtime/channel.py +++ b/realtime/channel.py @@ -2,6 +2,7 @@ import asyncio import json +import uuid from typing import Any, List, Dict, TYPE_CHECKING, NamedTuple from realtime.message import * @@ -14,6 +15,7 @@ class CallbackListener(NamedTuple): """A tuple with `event` and `callback` """ event: str + ref: str callback: Callback @@ -21,6 +23,7 @@ class Channel: """ `Channel` is an abstraction for a topic listener for an existing socket connection. Each Channel has its own topic and a list of event-callbacks that responds to messages. + A client can also send messages to a channel and register callback when expecting replies. Should only be instantiated through `connection.Socket().set_channel(topic)` Topic-Channel has a 1-many relationship. """ @@ -36,6 +39,8 @@ def __init__(self, socket: Socket, topic: str, params: Dict[str, Any] = {}) -> N self.topic = topic self.listeners: List[CallbackListener] = [] self.joined = False + self.join_ref = str(uuid.uuid4()) + self.join_msg_ref = str(uuid.uuid4()) def join(self) -> Channel: """ @@ -56,7 +61,8 @@ async def _join(self) -> None: join_req = dict(topic=self.topic, event=ChannelEvents.join, payload={}, ref=None) elif self.socket.version == 2: - join_req = [None, None, self.topic, ChannelEvents.join, {}] + #[join_reference, message_reference, topic_name, event_name, payload] + join_req = [self.join_ref, self.join_msg_ref, self.topic, ChannelEvents.join, {}] try: await self.socket.ws_connection.send(json.dumps(join_req)) @@ -83,7 +89,7 @@ async def _leave(self) -> None: join_req = dict(topic=self.topic, event=ChannelEvents.leave, payload={}, ref=None) elif self.socket.version == 2: - join_req = [None, None, self.topic, ChannelEvents.leave, {}] + join_req = [self.join_ref, None, self.topic, ChannelEvents.leave, {}] try: await self.socket.ws_connection.send(json.dumps(join_req)) @@ -91,52 +97,56 @@ async def _leave(self) -> None: print(str(e)) # TODO: better error propagation return - def on(self, event: str, callback: Callback) -> Channel: + def on(self, event: str, ref: str, callback: Callback) -> Channel: """ :param event: A specific event will have a specific callback + :param ref: A specific reference that will have a specific callback :param callback: Callback that takes msg payload as its first argument :return: Channel """ - cl = CallbackListener(event=event, callback=callback) + cl = CallbackListener(event=event, ref=ref, callback=callback) self.listeners.append(cl) return self - def off(self, event: str) -> None: + def off(self, event: str, ref: str) -> None: """ :param event: Stop responding to a certain event + :param event: Stop responding to a certain reference :return: None """ self.listeners = [ - callback for callback in self.listeners if callback.event != event] + callback for callback in self.listeners if (callback.event != event and callback.ref != ref)] - def send(self, event_name: str, payload:str) -> None: + def send(self, event_name: str, payload:str, ref: uuid = str(uuid.uuid4())) -> None: """ Wrapper for async def _send() to expose a non-async interface Essentially gets the only event loop and attempt sending a payload to a topic :param event_name: The event_name: it must match the first argument of a handle_in function on the server channel module. :param payload: The payload to be sent to the phoenix server + :param ref: The message reference that the server will use for replying - if none is set, generates the string repr of a uuidv4 :return: None """ loop = asyncio.get_event_loop() # TODO: replace with get_running_loop - loop.run_until_complete(self._send(event_name, payload)) + loop.run_until_complete(self._send(event_name, payload, ref)) return self - async def _send(self, event_name: str, payload:str) -> None: + async def _send(self, event_name: str, payload: str, ref: str) -> None: """ Coroutine that attempts to join Phoenix Realtime server via a certain topic :param event_name: The event_name: it must match the first argument of a handle_in function on the server channel module. :param payload: The payload to be sent to the phoenix server + :param ref: The message reference that the server will use for replying :return: None """ if self.socket.version == 1: msg = dict(topic=self.topic, event=event_name, payload=payload, ref=None) elif self.socket.version == 2: - msg = [3, 3, self.topic, event_name, payload] + msg = [None, ref, self.topic, event_name, payload] try: await self.socket.ws_connection.send(json.dumps(msg)) except Exception as e: print(str(e)) # TODO: better error propagation - return + return \ No newline at end of file diff --git a/realtime/connection.py b/realtime/connection.py index 37f9b04..2f101f4 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -1,6 +1,7 @@ import asyncio import json import logging +import pdb from collections import defaultdict from functools import wraps from typing import Any, Callable, List, Dict, TypeVar, DefaultDict @@ -76,15 +77,23 @@ async def _listen(self) -> None: msg = Message(**json.loads(msg)) elif self.version == 2: msg_array = json.loads(msg) - msg = Message(chanid=msg_array[0], ref= msg_array[1], topic=msg_array[2], event= msg_array[3], payload= msg_array[4]) + msg = Message(join_ref=msg_array[0], ref= msg_array[1], topic=msg_array[2], event= msg_array[3], payload= msg_array[4]) if msg.event == ChannelEvents.reply: - continue + for channel in self.channels.get(msg.topic, []): + if msg.ref == channel.join_msg_ref : + logging.info(f"Successfully joined {msg.topic}") + continue + else: + for cl in channel.listeners: + if cl.ref in ["*", msg.ref]: + cl.callback(msg.payload) for channel in self.channels.get(msg.topic, []): for cl in channel.listeners: if cl.event in ["*", msg.event]: cl.callback(msg.payload) + except websockets.exceptions.ConnectionClosed: if self.auto_reconnect: logging.info("Connection with server closed, trying to reconnect...") @@ -155,10 +164,10 @@ def set_channel(self, topic: str) -> Channel: def summary(self) -> None: """ - Prints a list of topics and event the socket is listening to + Prints a list of topics and event, and reference that the socket is listening to :return: None """ for topic, chans in self.channels.items(): for chan in chans: print( - f"Topic: {topic} | Events: {[e for e, _ in chan.listeners]}]") + f"Topic: {topic} | Events: {[e for e, _, _ in chan.listeners]} | References: {[r for _, r, _ in chan.listeners]}]") diff --git a/realtime/message.py b/realtime/message.py index df890b6..87da6e0 100644 --- a/realtime/message.py +++ b/realtime/message.py @@ -11,7 +11,7 @@ class Message: event: str payload: Dict[str, Any] ref: Any - chanid: Any + join_ref: Any topic: str def __hash__(self): From 4b3215bd67df1ecd165c87126b24f6a537fb96ea Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Thu, 5 Oct 2023 11:27:02 +0200 Subject: [PATCH 4/9] add: [leave] leaving a channel --- realtime/channel.py | 17 +++++++++-------- realtime/connection.py | 8 +++++++- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/realtime/channel.py b/realtime/channel.py index 3413d34..8fd673e 100644 --- a/realtime/channel.py +++ b/realtime/channel.py @@ -40,7 +40,7 @@ def __init__(self, socket: Socket, topic: str, params: Dict[str, Any] = {}) -> N self.listeners: List[CallbackListener] = [] self.joined = False self.join_ref = str(uuid.uuid4()) - self.join_msg_ref = str(uuid.uuid4()) + self.control_msg_ref = "" def join(self) -> Channel: """ @@ -62,7 +62,8 @@ async def _join(self) -> None: payload={}, ref=None) elif self.socket.version == 2: #[join_reference, message_reference, topic_name, event_name, payload] - join_req = [self.join_ref, self.join_msg_ref, self.topic, ChannelEvents.join, {}] + self.control_msg_ref = str(uuid.uuid4()) + join_req = [self.join_ref, self.control_msg_ref, self.topic, ChannelEvents.join, {}] try: await self.socket.ws_connection.send(json.dumps(join_req)) @@ -70,14 +71,14 @@ async def _join(self) -> None: print(str(e)) # TODO: better error propagation return - def leave(self) -> Channel: + def leave(self) -> None: """ Wrapper for async def _leave() to expose a non-async interface Essentially gets the only event loop and attempt leaving a topic - :return: Channel + :return: None """ loop = asyncio.get_event_loop() # TODO: replace with get_running_loop - loop.run_until_complete(self._join()) + loop.run_until_complete(self._leave()) return self async def _leave(self) -> None: @@ -86,13 +87,13 @@ async def _leave(self) -> None: :return: None """ if self.socket.version == 1: - join_req = dict(topic=self.topic, event=ChannelEvents.leave, + leave_req = dict(topic=self.topic, event=ChannelEvents.leave, payload={}, ref=None) elif self.socket.version == 2: - join_req = [self.join_ref, None, self.topic, ChannelEvents.leave, {}] + leave_req = [self.join_ref, None, self.topic, ChannelEvents.leave, {}] try: - await self.socket.ws_connection.send(json.dumps(join_req)) + await self.socket.ws_connection.send(json.dumps(leave_req)) except Exception as e: print(str(e)) # TODO: better error propagation return diff --git a/realtime/connection.py b/realtime/connection.py index 2f101f4..3518749 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -81,13 +81,19 @@ async def _listen(self) -> None: if msg.event == ChannelEvents.reply: for channel in self.channels.get(msg.topic, []): - if msg.ref == channel.join_msg_ref : + if msg.ref == channel.control_msg_ref : logging.info(f"Successfully joined {msg.topic}") continue else: for cl in channel.listeners: if cl.ref in ["*", msg.ref]: cl.callback(msg.payload) + + if msg.event == ChannelEvents.close: + for channel in self.channels.get(msg.topic, []): + if msg.join_ref == channel.join_ref : + logging.info(f"Successfully left {msg.topic}") + continue for channel in self.channels.get(msg.topic, []): for cl in channel.listeners: From ca06e8ed518732a623b8fa950fc130f4e8fe1b81 Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Tue, 17 Oct 2023 10:54:55 +0200 Subject: [PATCH 5/9] chg: [logging] handling control msg --- realtime/connection.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/realtime/connection.py b/realtime/connection.py index 3518749..ff8c038 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -82,8 +82,15 @@ async def _listen(self) -> None: if msg.event == ChannelEvents.reply: for channel in self.channels.get(msg.topic, []): if msg.ref == channel.control_msg_ref : - logging.info(f"Successfully joined {msg.topic}") - continue + if msg.payload == "{'status': 'error', 'response': {'reason': 'sink not found'}}": + logging.info(f"{msg.topic} not found") + break + elif msg.payload == "{'status': 'error', 'response': {'reason': 'unauthorized'}}": + logging.info(f"{msg.topic} unauthorized") + break + elif msg.payload == "{'status': 'ok', 'response': {}}": + logging.info(f"Successfully joined {msg.topic}") + continue else: for cl in channel.listeners: if cl.ref in ["*", msg.ref]: From 172f9c1732f3d97d2548d7152ee327555cc7eb40 Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Tue, 17 Oct 2023 11:24:31 +0200 Subject: [PATCH 6/9] chg: [logging] handling control msg - again --- realtime/connection.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/realtime/connection.py b/realtime/connection.py index ff8c038..e270ba8 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -82,13 +82,10 @@ async def _listen(self) -> None: if msg.event == ChannelEvents.reply: for channel in self.channels.get(msg.topic, []): if msg.ref == channel.control_msg_ref : - if msg.payload == "{'status': 'error', 'response': {'reason': 'sink not found'}}": - logging.info(f"{msg.topic} not found") + if msg.payload["status"] == "error": + logging.info(f"Error joining channel: {msg.topic} - {msg.payload['response']['reason']}") break - elif msg.payload == "{'status': 'error', 'response': {'reason': 'unauthorized'}}": - logging.info(f"{msg.topic} unauthorized") - break - elif msg.payload == "{'status': 'ok', 'response': {}}": + elif msg.payload["status"] == "ok": logging.info(f"Successfully joined {msg.topic}") continue else: From ccc9e7873a9c341b8dbbbdb4aebb1cb4b80ac73d Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Wed, 18 Oct 2023 11:30:18 +0200 Subject: [PATCH 7/9] chg: [connection] leave on kb interrupt --- realtime/channel.py | 2 +- realtime/connection.py | 12 ++++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/realtime/channel.py b/realtime/channel.py index 8fd673e..360ce01 100644 --- a/realtime/channel.py +++ b/realtime/channel.py @@ -83,7 +83,7 @@ def leave(self) -> None: async def _leave(self) -> None: """ - Coroutine that attempts to join Phoenix Realtime server via a certain topic + Coroutine that attempts to leave Phoenix Realtime server via a certain topic :return: None """ if self.socket.version == 1: diff --git a/realtime/connection.py b/realtime/connection.py index e270ba8..3b5b2aa 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -62,8 +62,16 @@ def listen(self) -> None: :return: None """ loop = asyncio.get_event_loop() # TODO: replace with get_running_loop - loop.run_until_complete(asyncio.gather( - self._listen(), self._keep_alive())) + loop.create_task(self._listen()) + loop.create_task(self._keep_alive()) + try: + loop.run_forever() + except KeyboardInterrupt: + # we leave all channels properly + for channel in self.channels: + for chan in self.channels.get(channel, []): + chan.leave() + async def _listen(self) -> None: """ From 3e4424db461573f2cc9df5db36338bdc7b8fce60 Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Fri, 20 Oct 2023 23:18:18 +0200 Subject: [PATCH 8/9] chg: [async] remove non-async API --- async-usage.py | 55 ++++++++++++++++++++++++++++++++++++++++++ realtime/channel.py | 42 +++----------------------------- realtime/connection.py | 49 ++++++++++++++----------------------- 3 files changed, 77 insertions(+), 69 deletions(-) create mode 100644 async-usage.py diff --git a/async-usage.py b/async-usage.py new file mode 100644 index 0000000..880ddd0 --- /dev/null +++ b/async-usage.py @@ -0,0 +1,55 @@ +from realtime.connection import Socket +import asyncio +import uuid + +def callback1(payload): + print(f"c1: {payload}") + +def callback2(payload): + print(f"c2: {payload}") + + +async def main(): + + TOKEN = "" + URLsink = f"ws://127.0.0.1:4000/socket/websocket?token={TOKEN}&vsn=2.0.0" + + client = Socket(URLsink) + + await client.connect() + + # fire and forget the listening routine + listen_task = asyncio.ensure_future(client.listen()) + + channel_s = client.set_channel("yourchannel") + await channel_s.join() + channel_s.on("test_event", None, callback1) + + # non sense elixir handler, we would not have an event on a reply + #def handle_in("request_ping", payload, socket) do + # push(socket, "test_event", %{body: payload}) + # {:noreply, socket} + #end + + await channel_s.send("request_ping", "this is my payload 1", None) + await channel_s.send("request_ping", "this is my payload 2", None) + await channel_s.send("request_ping", "this is my payload 3", None) + + # proper relpy elixir handler + #def handle_in("ping", payload, socket) do + # {:reply, {:ok, payload}, socket} + #end + + ref = str(uuid.uuid4()) + channel_s.on(None, ref, callback2) + await channel_s.send("ping", "this is my ping payload", ref) + + # we give it some time to complete + await asyncio.sleep(15) + + # proper shut down + listen_task.cancel() + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + loop.run_until_complete(main()) diff --git a/realtime/channel.py b/realtime/channel.py index 360ce01..a8bbed2 100644 --- a/realtime/channel.py +++ b/realtime/channel.py @@ -1,6 +1,6 @@ from __future__ import annotations -import asyncio +import logging import json import uuid from typing import Any, List, Dict, TYPE_CHECKING, NamedTuple @@ -42,17 +42,7 @@ def __init__(self, socket: Socket, topic: str, params: Dict[str, Any] = {}) -> N self.join_ref = str(uuid.uuid4()) self.control_msg_ref = "" - def join(self) -> Channel: - """ - Wrapper for async def _join() to expose a non-async interface - Essentially gets the only event loop and attempt joining a topic - :return: Channel - """ - loop = asyncio.get_event_loop() # TODO: replace with get_running_loop - loop.run_until_complete(self._join()) - return self - - async def _join(self) -> None: + async def join(self) -> None: """ Coroutine that attempts to join Phoenix Realtime server via a certain topic :return: None @@ -71,17 +61,7 @@ async def _join(self) -> None: print(str(e)) # TODO: better error propagation return - def leave(self) -> None: - """ - Wrapper for async def _leave() to expose a non-async interface - Essentially gets the only event loop and attempt leaving a topic - :return: None - """ - loop = asyncio.get_event_loop() # TODO: replace with get_running_loop - loop.run_until_complete(self._leave()) - return self - - async def _leave(self) -> None: + async def leave(self) -> None: """ Coroutine that attempts to leave Phoenix Realtime server via a certain topic :return: None @@ -118,21 +98,7 @@ def off(self, event: str, ref: str) -> None: self.listeners = [ callback for callback in self.listeners if (callback.event != event and callback.ref != ref)] - def send(self, event_name: str, payload:str, ref: uuid = str(uuid.uuid4())) -> None: - """ - Wrapper for async def _send() to expose a non-async interface - Essentially gets the only event loop and attempt sending a payload - to a topic - :param event_name: The event_name: it must match the first argument of a handle_in function on the server channel module. - :param payload: The payload to be sent to the phoenix server - :param ref: The message reference that the server will use for replying - if none is set, generates the string repr of a uuidv4 - :return: None - """ - loop = asyncio.get_event_loop() # TODO: replace with get_running_loop - loop.run_until_complete(self._send(event_name, payload, ref)) - return self - - async def _send(self, event_name: str, payload: str, ref: str) -> None: + async def send(self, event_name: str, payload: str, ref: str) -> None: """ Coroutine that attempts to join Phoenix Realtime server via a certain topic :param event_name: The event_name: it must match the first argument of a handle_in function on the server channel module. diff --git a/realtime/connection.py b/realtime/connection.py index 3b5b2aa..25b3ed8 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -48,36 +48,20 @@ def __init__(self, url: str, auto_reconnect: bool = False, params: Dict[str, Any self.params = params self.hb_interval = hb_interval self.ws_connection: websockets.client.WebSocketClientProtocol - self.kept_alive = False + self.kept_alive = set() self.auto_reconnect = auto_reconnect self.version = version self.channels: DefaultDict[str, List[Channel]] = defaultdict(list) @ensure_connection - def listen(self) -> None: - """ - Wrapper for async def _listen() to expose a non-async interface - In most cases, this should be the last method executed as it starts an infinite listening loop. - :return: None - """ - loop = asyncio.get_event_loop() # TODO: replace with get_running_loop - loop.create_task(self._listen()) - loop.create_task(self._keep_alive()) - try: - loop.run_forever() - except KeyboardInterrupt: - # we leave all channels properly - for channel in self.channels: - for chan in self.channels.get(channel, []): - chan.leave() - - - async def _listen(self) -> None: + async def listen(self) -> None: """ An infinite loop that keeps listening. :return: None """ + self.kept_alive.add(asyncio.ensure_future(self.keep_alive())) + while True: try: msg = await self.ws_connection.recv() @@ -86,7 +70,6 @@ async def _listen(self) -> None: elif self.version == 2: msg_array = json.loads(msg) msg = Message(join_ref=msg_array[0], ref= msg_array[1], topic=msg_array[2], event= msg_array[3], payload= msg_array[4]) - if msg.event == ChannelEvents.reply: for channel in self.channels.get(msg.topic, []): if msg.ref == channel.control_msg_ref : @@ -123,25 +106,29 @@ async def _listen(self) -> None: logging.exception("Connection with the server closed.") break - def connect(self) -> None: - """ - Wrapper for async def _connect() to expose a non-async interface - """ - loop = asyncio.get_event_loop() # TODO: replace with get_running - loop.run_until_complete(self._connect()) - self.connected = True + except asyncio.CancelledError: + logging.info("Listen task was cancelled.") + await self.leave_all() + + except Exception as e: + logging.error(f"Unexpected error in listen: {e}") - async def _connect(self) -> None: + async def connect(self) -> None: ws_connection = await websockets.connect(self.url) if ws_connection.open: - logging.info("Connection was successful") self.ws_connection = ws_connection self.connected = True + logging.info("Connection was successful") else: raise Exception("Connection Failed") + + async def leave_all(self) -> None: + for channel in self.channels: + for chan in self.channels.get(channel, []): + await chan.leave() - async def _keep_alive(self) -> None: + async def keep_alive(self) -> None: """ Sending heartbeat to server every 5 seconds Ping - pong messages to verify connection is alive From 302a60c3eeffed15fb7e573c321234be541defe2 Mon Sep 17 00:00:00 2001 From: Sourcery AI <> Date: Fri, 20 Oct 2023 21:19:46 +0000 Subject: [PATCH 9/9] 'Refactored by Sourcery' --- realtime/channel.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/realtime/channel.py b/realtime/channel.py index a8bbed2..dc47164 100644 --- a/realtime/channel.py +++ b/realtime/channel.py @@ -58,7 +58,7 @@ async def join(self) -> None: try: await self.socket.ws_connection.send(json.dumps(join_req)) except Exception as e: - print(str(e)) # TODO: better error propagation + print(e) return async def leave(self) -> None: @@ -75,7 +75,7 @@ async def leave(self) -> None: try: await self.socket.ws_connection.send(json.dumps(leave_req)) except Exception as e: - print(str(e)) # TODO: better error propagation + print(e) return def on(self, event: str, ref: str, callback: Callback) -> Channel: @@ -115,5 +115,5 @@ async def send(self, event_name: str, payload: str, ref: str) -> None: try: await self.socket.ws_connection.send(json.dumps(msg)) except Exception as e: - print(str(e)) # TODO: better error propagation + print(e) return \ No newline at end of file