Skip to content

Commit 330f056

Browse files
committed
Added otlp instrumentation.
1 parent 48e27e2 commit 330f056

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1118
-485
lines changed

.github/workflows/test.yml

Lines changed: 10 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,25 @@ jobs:
1414
- black
1515
- ruff
1616
- mypy
17+
- stubtest
1718
runs-on: ubuntu-latest
1819
steps:
1920
- uses: actions/checkout@v6
2021
- name: Set up Python
2122
uses: actions/setup-python@v4
2223
with:
23-
python-version: "3.11"
24+
python-version: "3.x"
25+
- uses: actions-rs/toolchain@v1
26+
with:
27+
toolchain: stable
28+
components: clippy
29+
override: true
30+
- name: Install uv
31+
uses: astral-sh/setup-uv@v7
2432
- name: Run lint check
2533
uses: pre-commit/action@v3.0.0
2634
with:
27-
extra_args: -a ${{ matrix.cmd }}
35+
extra_args: -a -v ${{ matrix.cmd }}
2836
fmt:
2937
runs-on: ubuntu-latest
3038
steps:
@@ -52,33 +60,6 @@ jobs:
5260
with:
5361
token: ${{secrets.GITHUB_TOKEN}}
5462
deny: warnings
55-
stubtest:
56-
runs-on: ubuntu-latest
57-
steps:
58-
- uses: actions/checkout@v6
59-
- uses: actions-rs/toolchain@v1
60-
with:
61-
toolchain: stable
62-
components: clippy
63-
override: true
64-
- uses: actions/setup-python@v6
65-
with:
66-
python-version: 3.x
67-
- name: Install uv
68-
uses: astral-sh/setup-uv@v7
69-
- id: setup-venv
70-
name: Setup virtualenv
71-
run: python -m venv .venv
72-
- name: Build lib
73-
uses: PyO3/maturin-action@v1
74-
with:
75-
command: dev --uv
76-
sccache: true
77-
- name: Run stubtest
78-
run: |
79-
set -e
80-
source .venv/bin/activate
81-
stubtest --ignore-disjoint-bases natsrpy
8263
pytest:
8364
runs-on: ubuntu-latest
8465
steps:

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/target
2+
delete-me-*
23

34
# Byte-compiled / optimized / DLL files
45
__pycache__/

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ repos:
1818
name: python mypy
1919
always_run: true
2020
pass_filenames: false
21-
args: ["python"]
21+
args: ["python", "examples"]
2222
- repo: https://github.com/astral-sh/ruff-pre-commit
2323
rev: v0.15.7
2424
hooks:

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ async-nats = "0.46"
1818
bytes = "1.11.1"
1919
futures-util = "0.3.32"
2020
log = "0.4.29"
21-
pyo3 = { version = "0.28", features = ["experimental-inspect"] }
21+
pyo3 = { version = "0.28", features = ["abi3", "experimental-inspect"] }
2222
pyo3-async-runtimes = { version = "0.28", features = ["tokio-runtime"] }
2323
pyo3-log = "0.13.3"
2424
serde = { version = "1.0.228", features = ["derive"] }

examples/callback_subscriptions.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import asyncio
2+
3+
from natsrpy import Message, Nats
4+
5+
6+
async def main() -> None:
7+
"""Main function to run the example."""
8+
nats = Nats(["nats://localhost:4222"])
9+
await nats.startup()
10+
11+
async def callback(message: Message) -> None:
12+
print(f"[FROM_CALLBACK] {message.payload!r}") # noqa: T201
13+
14+
# For callback subscriptions you can use detatch method.
15+
#
16+
# This method does the same as __enter__, however since
17+
# it's a callback-based subscription, context managers
18+
# are ususally not needed.
19+
#
20+
# But please save the reference somewhere, since python garbage
21+
# collector might collect your detatched subscription and
22+
# stop receiving any new messages.
23+
cb_sub = await nats.subscribe("cb-subj", callback=callback).detatch()
24+
await cb_sub.unsubscribe(limit=1)
25+
26+
nats.publish("cb-subj", "message for callback")
27+
28+
# Waiting for subscriber to read all the messages.
29+
await cb_sub.wait()
30+
31+
# Don't forget to call shutdown.
32+
await nats.shutdown()
33+
34+
35+
if __name__ == "__main__":
36+
asyncio.run(main())

examples/consumers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,13 @@ async def main() -> None:
4444
# We use messages() to get async iterator which we
4545
# use to get messages for push_consumer.
4646
async for push_message in await push_consumer.messages():
47-
print(f"[FROM_PUSH] {push_message.payload}") # noqa: T201
47+
print(f"[FROM_PUSH] {push_message.payload!r}") # noqa: T201
4848
await push_message.ack()
4949
break
5050

5151
# Pull consumers have to request batches of messages.
5252
for pull_message in await pull_consumer.fetch(max_messages=10):
53-
print(f"[FROM_PULL] {pull_message.payload}") # noqa: T201
53+
print(f"[FROM_PULL] {pull_message.payload!r}") # noqa: T201
5454
await pull_message.ack()
5555

5656
# Cleanup

examples/kv.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ async def main() -> None:
3030

3131
await kv.delete("test-key")
3232

33-
# Alternatively you can
34-
# use await watcher.next()
3533
async for event in watcher:
3634
print("[EVENT]", event) # noqa: T201
3735
break

examples/request_reply.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,24 +12,22 @@ async def main() -> None:
1212
# Here we create responder, that will be
1313
# answering to our requests.
1414
async def responder(message: Message) -> None:
15-
print(f"[REQUEST]: {message.payload}, headers={message.headers}") # noqa: T201
15+
print(f"[REQUEST]: {message.payload!r}, headers={message.headers}") # noqa: T201
1616
if message.reply:
1717
await nats.publish(
1818
message.reply,
19-
f"reply to {message.payload}",
19+
f"reply to {message.payload!r}",
2020
headers=message.headers,
2121
)
2222

2323
# Start responder using callback-based subsciption.
24-
sub = await nats.subscribe(subj, callback=responder)
25-
# Send 3 concurrent requests.
26-
responses = await asyncio.gather(
27-
nats.request(subj, "request1"),
28-
nats.request(subj, "request2", headers={"header": "value"}),
29-
nats.request(subj, "request3", inbox="test-inbox"),
30-
)
31-
# Disconnect resonder.
32-
await sub.drain()
24+
async with nats.subscribe(subj, callback=responder):
25+
# Send 3 concurrent requests.
26+
responses = await asyncio.gather(
27+
nats.request(subj, "request1"),
28+
nats.request(subj, "request2", headers={"header": "value"}),
29+
nats.request(subj, "request3", inbox="test-inbox"),
30+
)
3331

3432
# Iterate over replies.
3533
for resp in responses:

examples/simple_publish.py

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,20 @@ async def main() -> None:
1111
# Here we initiate subscription.
1212
# We do it before sending messages,
1313
# in order to catch them once we will start reading.
14-
subscription = await nats.subscribe("hello")
15-
16-
# Publish accepts str | bytes | bytearray | memoryview
17-
await nats.publish("hello", "str world")
18-
await nats.publish("hello", b"bytes world")
19-
await nats.publish("hello", bytearray(b"bytearray world"))
20-
await nats.publish("hello", "headers", headers={"one": "two"})
21-
await nats.publish("hello", "headers", headers={"one": ["two", "three"]})
22-
23-
# Calling this method will unsubscribe us,
24-
# after `n` delivered messages.
25-
# or immediately if `n` is not provided.
26-
subscription.unsubscribe(limit=5)
27-
async for message in subscription:
28-
print(message) # noqa: T201
14+
async with nats.subscribe("hello") as subscription:
15+
# Publish accepts str | bytes | bytearray | memoryview
16+
await nats.publish("hello", "str world")
17+
await nats.publish("hello", b"bytes world")
18+
await nats.publish("hello", bytearray(b"bytearray world"))
19+
await nats.publish("hello", "headers", headers={"one": "two"})
20+
await nats.publish("hello", "headers", headers={"one": ["two", "three"]})
21+
22+
# Calling this method will unsubscribe us,
23+
# after `n` delivered messages.
24+
# or immediately if `n` is not provided.
25+
subscription.unsubscribe(limit=5)
26+
async for message in subscription:
27+
print(message) # noqa: T201
2928

3029
# Don't forget to call shutdown.
3130
await nats.shutdown()

examples/subscriptions.py

Lines changed: 31 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -11,42 +11,39 @@ async def main() -> None:
1111
cb_lock = asyncio.Event()
1212

1313
async def callback(message: Message) -> None:
14-
print(f"[FROM_CALLBACK] {message.payload}") # noqa: T201
14+
print(f"[FROM_CALLBACK] {message.payload!r}") # noqa: T201
1515
cb_lock.set()
1616

17-
# When subscribing you can set callback.
18-
# In that case CallbackSubscription is returned.
19-
# This type of subscription cannot be iterated.
20-
cb_sub = await nats.subscribe("cb-subj", callback=callback)
21-
22-
# When callback is not set, you get a subscription
23-
# that should be used along with `async for`
24-
# loop, or alternatively you can call
25-
# `await iter_sub.next()` to get a single message.
26-
iter_sub = await nats.subscribe("iter-subj")
27-
28-
# Subscriptions with queue argument create
29-
# subscription with a queue group to distribute
30-
# messages along all subscribers.
31-
queue_sub = await nats.subscribe("queue-subj", queue="example-queue")
32-
33-
await nats.publish("cb-subj", "message for callback")
34-
await nats.publish("iter-subj", "message for iterator")
35-
await nats.publish("queue-subj", "message for queue sub")
36-
37-
# We can unsubscribe after a particular amount of messages.
38-
await iter_sub.unsubscribe(limit=1)
39-
await cb_sub.unsubscribe(limit=1)
40-
await queue_sub.unsubscribe(limit=1)
41-
42-
async for message in iter_sub:
43-
print(f"[FROM_ITERATOR] {message.payload}") # noqa: T201
44-
45-
async for message in queue_sub:
46-
print(f"[FROM_QUEUED] {message.payload}") # noqa: T201
47-
48-
# Making sure that the message in callback is received.
49-
await cb_lock.wait()
17+
async with (
18+
# When subscribing you can set callback.
19+
# In that case CallbackSubscription is returned.
20+
# This type of subscription cannot be iterated.
21+
nats.subscribe("cb-subj", callback=callback) as cb_sub,
22+
# When callback is not set, you get a subscription
23+
# that should be used along with `async for`
24+
nats.subscribe("iter-subj") as iter_sub,
25+
# Subscriptions with queue argument create
26+
# subscription with a queue group to distribute
27+
# messages along all subscribers.
28+
nats.subscribe("queue-subj", queue="example-queue") as queue_sub,
29+
):
30+
await nats.publish("cb-subj", "message for callback")
31+
await nats.publish("iter-subj", "message for iterator")
32+
await nats.publish("queue-subj", "message for queue sub")
33+
34+
# We can unsubscribe after a particular amount of messages.
35+
await iter_sub.unsubscribe(limit=1)
36+
await cb_sub.unsubscribe(limit=1)
37+
await queue_sub.unsubscribe(limit=1)
38+
39+
async for message in iter_sub:
40+
print(f"[FROM_ITERATOR] {message.payload!r}") # noqa: T201
41+
42+
async for message in queue_sub:
43+
print(f"[FROM_QUEUED] {message.payload!r}") # noqa: T201
44+
45+
# Making sure that the message in callback is received.
46+
await cb_lock.wait()
5047

5148
# Don't forget to call shutdown.
5249
await nats.shutdown()

0 commit comments

Comments
 (0)