Skip to content

Commit

Permalink
Use private realay enpoint (#97)
Browse files Browse the repository at this point in the history
* feat: use private ws endpoint

* chore: bump `min_lnbits_version`

* fix: retry logic

* fix: restart logic

* chore: fux log message
  • Loading branch information
motorina0 authored Jan 22, 2024
1 parent 1fd9da8 commit bdf0b77
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 54 deletions.
1 change: 0 additions & 1 deletion __init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ async def _subscribe_to_nostr_client():
# wait for 'nostrclient' extension to initialize
await asyncio.sleep(10)
await nostr_client.run_forever()
raise ValueError("Must reconnect to websocket")

async def _wait_for_nostr_events():
# wait for this extension to initialize
Expand Down
2 changes: 1 addition & 1 deletion config.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
"short_description": "Nostr Webshop/market on LNbits",
"tile": "/nostrmarket/static/images/bitcoin-shop.png",
"contributors": [],
"min_lnbits_version": "0.11.0"
"min_lnbits_version": "0.12.0"
}
96 changes: 49 additions & 47 deletions nostr/nostr_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from websocket import WebSocketApp

from lnbits.app import settings
from lnbits.helpers import urlsafe_short_hash
from lnbits.helpers import encrypt_internal_message, urlsafe_short_hash

from .event import NostrEvent

Expand All @@ -19,23 +19,22 @@ def __init__(self):
self.send_req_queue: Queue = Queue()
self.ws: WebSocketApp = None
self.subscription_id = "nostrmarket-" + urlsafe_short_hash()[:32]
self.running = False

async def connect_to_nostrclient_ws(
self, on_open: Callable, on_message: Callable
) -> WebSocketApp:
def on_error(_, error):
logger.warning(error)
self.send_req_queue.put_nowait(ValueError("Websocket error."))
self.recieve_event_queue.put_nowait(ValueError("Websocket error."))
@property
def is_websocket_connected(self):
if not self.ws:
return False
return self.ws.keep_running

async def connect_to_nostrclient_ws(self) -> WebSocketApp:
logger.debug(f"Connecting to websockets for 'nostrclient' extension...")

def on_close(_, status_code, message):
logger.warning(f"Websocket closed: '{status_code}' '{message}'")
self.send_req_queue.put_nowait(ValueError("Websocket close."))
self.recieve_event_queue.put_nowait(ValueError("Websocket close."))

logger.debug(f"Subscribing to websockets for nostrclient extension")
relay_endpoint = encrypt_internal_message("relay")
on_open, on_message, on_error, on_close = self._ws_handlers()
ws = WebSocketApp(
f"ws://localhost:{settings.port}/nostrclient/api/v1/relay",
f"ws://localhost:{settings.port}/nostrclient/api/v1/{relay_endpoint}",
on_message=on_message,
on_open=on_open,
on_close=on_close,
Expand All @@ -48,42 +47,28 @@ def on_close(_, status_code, message):

return ws

async def get_event(self):
value = await self.recieve_event_queue.get()
if isinstance(value, ValueError):
raise value
return value

async def run_forever(self):
def on_open(_):
logger.info("Connected to 'nostrclient' websocket")

def on_message(_, message):
self.recieve_event_queue.put_nowait(message)

self._safe_ws_stop()
running = True

while running:
self.running = True
while self.running:
try:
req = None
if not self.ws:
self.ws = await self.connect_to_nostrclient_ws(on_open, on_message)
if not self.is_websocket_connected:
self.ws = await self.connect_to_nostrclient_ws()
# be sure the connection is open
await asyncio.sleep(3)
req = await self.send_req_queue.get()
await asyncio.sleep(5)

if isinstance(req, ValueError):
running = False
logger.warning(str(req))
else:
self.ws.send(json.dumps(req))
req = await self.send_req_queue.get()
self.ws.send(json.dumps(req))
except Exception as ex:
logger.warning(ex)
if req:
await self.send_req_queue.put(req)
self._safe_ws_stop()
await asyncio.sleep(5)
await asyncio.sleep(60)


async def get_event(self):
value = await self.recieve_event_queue.get()
if isinstance(value, ValueError):
raise value
return value


async def publish_nostr_event(self, e: NostrEvent):
await self.send_req_queue.put(["EVENT", e.dict()])
Expand All @@ -109,7 +94,7 @@ async def subscribe_merchants(
await self.send_req_queue.put(["REQ", self.subscription_id] + merchant_filters)

logger.debug(
f"Subscribed to events for: {len(public_keys)} keys. New subscription id: {self.subscription_id}"
f"Subscribing to events for: {len(public_keys)} keys. New subscription id: {self.subscription_id}"
)

async def merchant_temp_subscription(self, pk, duration=10):
Expand Down Expand Up @@ -190,19 +175,36 @@ def _safe_ws_stop(self):
pass
self.ws = None

def _ws_handlers(self):
def on_open(_):
logger.info("Connected to 'nostrclient' websocket")

def on_message(_, message):
self.recieve_event_queue.put_nowait(message)

def on_error(_, error):
logger.warning(error)

def on_close(x, status_code, message):
logger.warning(f"Websocket closed: {x}: '{status_code}' '{message}'")
# force re-subscribe
self.recieve_event_queue.put_nowait(ValueError("Websocket close."))

return on_open, on_message, on_error, on_close

async def restart(self):
await self.unsubscribe_merchants()
# Give some time for the CLOSE events to propagate before restarting
await asyncio.sleep(10)

logger.info("Restating NostrClient...")
await self.send_req_queue.put(ValueError("Restarting NostrClient..."))
logger.info("Restarting NostrClient...")
await self.recieve_event_queue.put(ValueError("Restarting NostrClient..."))

self._safe_ws_stop()

async def stop(self):
await self.unsubscribe_merchants()
self.running = False

# Give some time for the CLOSE events to propagate before closing the connection
await asyncio.sleep(10)
Expand Down
17 changes: 12 additions & 5 deletions tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from asyncio import Queue
import asyncio

from loguru import logger

from lnbits.core.models import Payment
from lnbits.tasks import register_invoice_listener
Expand Down Expand Up @@ -33,9 +36,13 @@ async def on_invoice_paid(payment: Payment) -> None:


async def wait_for_nostr_events(nostr_client: NostrClient):

await subscribe_to_all_merchants()

while True:
message = await nostr_client.get_event()
await process_nostr_message(message)
try:
await subscribe_to_all_merchants()

while True:
message = await nostr_client.get_event()
await process_nostr_message(message)
except Exception as e:
logger.warning(f"Subcription failed. Will retry in one minute: {e}")
await asyncio.sleep(10)

0 comments on commit bdf0b77

Please sign in to comment.