Skip to content

Commit 9e2e95e

Browse files
committed
Add nats-subscribe for NATS PubSub messages
Signed-off-by: Chris Boot <[email protected]>
1 parent 30c3a37 commit 9e2e95e

File tree

4 files changed

+166
-0
lines changed

4 files changed

+166
-0
lines changed

nats-subscribe/Dockerfile

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
FROM debian:bookworm-slim
2+
3+
WORKDIR /app
4+
5+
ADD requirements.txt .
6+
7+
# Doing all this with a debian:bookworm-slim image leads to a ~190MB image.
8+
# With python:3.11 and "pip install -r requirements.txt" you get an image
9+
# that's over 1GB. python:3.11-slim is missing gcc which is needed for ed25519,
10+
# which is required for NKEY / JWT authentication support.
11+
RUN apt-get update && \
12+
DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \
13+
build-essential \
14+
python3-dev \
15+
python3-pip \
16+
&& \
17+
pip install --break-system-packages -r requirements.txt && \
18+
apt-get purge -y --auto-remove --purge \
19+
build-essential \
20+
python3-dev \
21+
&& \
22+
find \
23+
/var/cache/apt \
24+
/var/cache/ldconfig \
25+
/var/lib/apt/lists \
26+
-mindepth 1 -delete && \
27+
rm -rf /root/.cache
28+
29+
ADD main.py .
30+
31+
CMD ["python3", "main.py"]

nats-subscribe/README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Usage
2+
3+
Listen on a NATS PubSub subject and send the webhook to a URL.
4+
5+
```
6+
NATS_URL=nats://localhost:4222 SUBJECT=mysubject WEBHOOK=https://app.windmill.dev/api/w/github-sync-example/jobs/run/p/f/examples/query_postgres python3 nats-subscribe/main.py
7+
```
8+
9+
To test, run the following in a shell:
10+
11+
```
12+
NATS_URL=nats://localhost:4222 nats publish mysubject '{"foo": 42}'
13+
```
14+
15+
Note that the subject of the received message will be included in an
16+
`X-NATS-Subject` header. This can be useful in combination with Windmill's
17+
[request headers](https://www.windmill.dev/docs/core_concepts/webhooks#request-headers)
18+
functionality.

nats-subscribe/main.py

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
#!/usr/bin/env python3
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
# Copyright 2023 Tiger Computing Ltd
5+
# Author: Chris Boot <[email protected]>
6+
7+
"""
8+
NATS PubSub to HTTP Webhook adapter.
9+
"""
10+
11+
import asyncio
12+
import os
13+
import re
14+
import ssl
15+
from functools import partial
16+
from types import SimpleNamespace
17+
18+
import aiohttp
19+
20+
import nats
21+
22+
DEFAULT_URL = "nats://localhost:4222"
23+
DEFAULT_SUBJECT = "test"
24+
DEFAULT_WEBHOOK = "http://localhost:8080"
25+
26+
cfg = SimpleNamespace()
27+
28+
29+
async def configure() -> None:
30+
envs = {
31+
"NATS_URL",
32+
"NATS_USER",
33+
"NATS_PASSWORD",
34+
"NATS_CREDS",
35+
"NATS_NKEY",
36+
"NATS_CERT",
37+
"NATS_KEY",
38+
"NATS_CA",
39+
"NATS_TIMEOUT",
40+
"SUBJECT",
41+
"WEBHOOK",
42+
}
43+
44+
for env in envs:
45+
setattr(cfg, env.lower(), os.environ.get(env))
46+
47+
48+
async def connect() -> nats.NATS:
49+
options = {
50+
"servers": re.split(r"[, ]+", cfg.nats_url or DEFAULT_URL),
51+
"user": cfg.nats_user,
52+
"password": cfg.nats_password,
53+
"user_credentials": cfg.nats_creds,
54+
"nkeys_seed": cfg.nats_nkey,
55+
}
56+
57+
if cfg.nats_cert or cfg.nats_key or cfg.nats_ca:
58+
tls = ssl.create_default_context(cafile=cfg.nats_ca)
59+
60+
if cfg.nats_cert or cfg.nats_key:
61+
tls.load_cert_chain(cfg.nats_cert, cfg.nats_key)
62+
63+
if cfg.nats_timeout is not None:
64+
options["connect_timeout"] = int(cfg.nats_timeout)
65+
66+
return await nats.connect(**options)
67+
68+
69+
async def handle_message(
70+
msg: nats.aio.msg.Msg,
71+
session: aiohttp.ClientSession,
72+
) -> None:
73+
subject = msg.subject
74+
data = msg.data.decode()
75+
76+
print(f"Received on '{subject}'")
77+
print(data)
78+
79+
async with session.post(
80+
cfg.webhook or DEFAULT_WEBHOOK,
81+
data=msg.data,
82+
headers={
83+
"Content-Type": "application/json",
84+
"X-NATS-Subject": subject,
85+
},
86+
) as resp:
87+
print(f"Webhook status: {resp.status} {resp.reason}")
88+
89+
async for _ in resp.content.iter_chunks():
90+
# just throw away the response data
91+
pass
92+
93+
94+
async def main() -> None:
95+
await configure()
96+
nc = await connect()
97+
98+
async with aiohttp.ClientSession() as session:
99+
sub = await nc.subscribe(
100+
cfg.subject or DEFAULT_SUBJECT,
101+
cb=partial(handle_message, session=session),
102+
)
103+
print(f"Subscribed on '{sub.subject}'")
104+
105+
try:
106+
# Wait forever
107+
await asyncio.wait_for(asyncio.Future(), timeout=None)
108+
except asyncio.CancelledError:
109+
pass
110+
finally:
111+
await nc.drain()
112+
113+
114+
if __name__ == "__main__":
115+
asyncio.run(main())

nats-subscribe/requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
aiohttp[speedups] ~= 3.8
2+
nats-py[nkeys] ~= 2.3

0 commit comments

Comments
 (0)