diff --git a/__init__.py b/__init__.py index 32d8718..5e1e46d 100644 --- a/__init__.py +++ b/__init__.py @@ -41,7 +41,6 @@ async def _subscribe_to_nostr_client(): # wait for 'nostrclient' extension to initialize await asyncio.sleep(10) await nostr_client.run_forever() - raise ValueError("Must reconnect to websocket") async def _wait_for_nostr_events(): # wait for this extension to initialize diff --git a/config.json b/config.json index 2f6c72b..54b3494 100644 --- a/config.json +++ b/config.json @@ -3,5 +3,5 @@ "short_description": "Nostr Webshop/market on LNbits", "tile": "/nostrmarket/static/images/bitcoin-shop.png", "contributors": [], - "min_lnbits_version": "0.11.0" + "min_lnbits_version": "0.12.0" } diff --git a/nostr/nostr_client.py b/nostr/nostr_client.py index b7a1a9e..7353e44 100644 --- a/nostr/nostr_client.py +++ b/nostr/nostr_client.py @@ -8,7 +8,7 @@ from websocket import WebSocketApp from lnbits.app import settings -from lnbits.helpers import urlsafe_short_hash +from lnbits.helpers import encrypt_internal_message, urlsafe_short_hash from .event import NostrEvent @@ -19,23 +19,22 @@ def __init__(self): self.send_req_queue: Queue = Queue() self.ws: WebSocketApp = None self.subscription_id = "nostrmarket-" + urlsafe_short_hash()[:32] + self.running = False - async def connect_to_nostrclient_ws( - self, on_open: Callable, on_message: Callable - ) -> WebSocketApp: - def on_error(_, error): - logger.warning(error) - self.send_req_queue.put_nowait(ValueError("Websocket error.")) - self.recieve_event_queue.put_nowait(ValueError("Websocket error.")) + @property + def is_websocket_connected(self): + if not self.ws: + return False + return self.ws.keep_running + + async def connect_to_nostrclient_ws(self) -> WebSocketApp: + logger.debug(f"Connecting to websockets for 'nostrclient' extension...") - def on_close(_, status_code, message): - logger.warning(f"Websocket closed: '{status_code}' '{message}'") - self.send_req_queue.put_nowait(ValueError("Websocket close.")) - self.recieve_event_queue.put_nowait(ValueError("Websocket close.")) - logger.debug(f"Subscribing to websockets for nostrclient extension") + relay_endpoint = encrypt_internal_message("relay") + on_open, on_message, on_error, on_close = self._ws_handlers() ws = WebSocketApp( - f"ws://localhost:{settings.port}/nostrclient/api/v1/relay", + f"ws://localhost:{settings.port}/nostrclient/api/v1/{relay_endpoint}", on_message=on_message, on_open=on_open, on_close=on_close, @@ -48,42 +47,28 @@ def on_close(_, status_code, message): return ws - async def get_event(self): - value = await self.recieve_event_queue.get() - if isinstance(value, ValueError): - raise value - return value - async def run_forever(self): - def on_open(_): - logger.info("Connected to 'nostrclient' websocket") - - def on_message(_, message): - self.recieve_event_queue.put_nowait(message) - - self._safe_ws_stop() - running = True - - while running: + self.running = True + while self.running: try: - req = None - if not self.ws: - self.ws = await self.connect_to_nostrclient_ws(on_open, on_message) + if not self.is_websocket_connected: + self.ws = await self.connect_to_nostrclient_ws() # be sure the connection is open - await asyncio.sleep(3) - req = await self.send_req_queue.get() + await asyncio.sleep(5) - if isinstance(req, ValueError): - running = False - logger.warning(str(req)) - else: - self.ws.send(json.dumps(req)) + req = await self.send_req_queue.get() + self.ws.send(json.dumps(req)) except Exception as ex: logger.warning(ex) - if req: - await self.send_req_queue.put(req) - self._safe_ws_stop() - await asyncio.sleep(5) + await asyncio.sleep(60) + + + async def get_event(self): + value = await self.recieve_event_queue.get() + if isinstance(value, ValueError): + raise value + return value + async def publish_nostr_event(self, e: NostrEvent): await self.send_req_queue.put(["EVENT", e.dict()]) @@ -109,7 +94,7 @@ async def subscribe_merchants( await self.send_req_queue.put(["REQ", self.subscription_id] + merchant_filters) logger.debug( - f"Subscribed to events for: {len(public_keys)} keys. New subscription id: {self.subscription_id}" + f"Subscribing to events for: {len(public_keys)} keys. New subscription id: {self.subscription_id}" ) async def merchant_temp_subscription(self, pk, duration=10): @@ -190,19 +175,36 @@ def _safe_ws_stop(self): pass self.ws = None + def _ws_handlers(self): + def on_open(_): + logger.info("Connected to 'nostrclient' websocket") + + def on_message(_, message): + self.recieve_event_queue.put_nowait(message) + + def on_error(_, error): + logger.warning(error) + + def on_close(x, status_code, message): + logger.warning(f"Websocket closed: {x}: '{status_code}' '{message}'") + # force re-subscribe + self.recieve_event_queue.put_nowait(ValueError("Websocket close.")) + + return on_open, on_message, on_error, on_close + async def restart(self): await self.unsubscribe_merchants() # Give some time for the CLOSE events to propagate before restarting await asyncio.sleep(10) - logger.info("Restating NostrClient...") - await self.send_req_queue.put(ValueError("Restarting NostrClient...")) + logger.info("Restarting NostrClient...") await self.recieve_event_queue.put(ValueError("Restarting NostrClient...")) self._safe_ws_stop() async def stop(self): await self.unsubscribe_merchants() + self.running = False # Give some time for the CLOSE events to propagate before closing the connection await asyncio.sleep(10) diff --git a/tasks.py b/tasks.py index 0813936..4f50ebe 100644 --- a/tasks.py +++ b/tasks.py @@ -1,4 +1,7 @@ from asyncio import Queue +import asyncio + +from loguru import logger from lnbits.core.models import Payment from lnbits.tasks import register_invoice_listener @@ -33,9 +36,13 @@ async def on_invoice_paid(payment: Payment) -> None: async def wait_for_nostr_events(nostr_client: NostrClient): - - await subscribe_to_all_merchants() - while True: - message = await nostr_client.get_event() - await process_nostr_message(message) \ No newline at end of file + try: + await subscribe_to_all_merchants() + + while True: + message = await nostr_client.get_event() + await process_nostr_message(message) + except Exception as e: + logger.warning(f"Subcription failed. Will retry in one minute: {e}") + await asyncio.sleep(10)