Skip to content

Commit c86ca99

Browse files
authored
Added missing API methods. (#33)
1 parent 2309ebd commit c86ca99

File tree

9 files changed

+546
-9
lines changed

9 files changed

+546
-9
lines changed

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
name: "Testing package"
22

33
on:
4-
push:
4+
pull_request:
55

66
jobs:
77
py-lint:

python/natsrpy/_natsrpy_rs/js/managers.pyi

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from datetime import timedelta
22
from typing import final, overload
33

4+
from typing_extensions import Self
5+
46
from .consumers import (
57
PullConsumer,
68
PullConsumerConfig,
@@ -13,13 +15,63 @@ from .object_store import ObjectStore, ObjectStoreConfig
1315
from .stream import Stream, StreamConfig
1416

1517
__all__ = [
18+
"ConsumersIterator",
1619
"ConsumersManager",
20+
"ConsumersNamesIterator",
1721
"CountersManager",
1822
"KVManager",
1923
"ObjectStoreManager",
2024
"StreamsManager",
2125
]
2226

27+
@final
28+
class ConsumersIterator:
29+
"""Async iterator over consumers subscribed to a stream.
30+
31+
Returned by :meth:`ConsumersManager.list`.
32+
Consumers can be received using ``async for`` or by calling :meth:`next`
33+
directly.
34+
35+
Consumer type is identified by its config. If it has deliver_subject set,
36+
then PushConsumer is returned.
37+
"""
38+
39+
def __aiter__(self) -> Self: ...
40+
async def __anext__(self) -> PullConsumer | PushConsumer: ...
41+
async def next(
42+
self,
43+
timeout: float | timedelta | None = None,
44+
) -> PullConsumer | PushConsumer:
45+
"""Receive the next consumer from the stream.
46+
47+
:param timeout: maximum time to wait for a message in seconds
48+
or as a timedelta, defaults to None (wait indefinitely).
49+
:return: the next consumer.
50+
:raises StopAsyncIteration: when the subscription is drained or
51+
unsubscribed.
52+
"""
53+
54+
@final
55+
class ConsumersNamesIterator:
56+
"""Async iterator over names of consumers subscribed to a stream.
57+
58+
Returned by :meth:`ConsumersManager.list_names`.
59+
Consumer names can be received using ``async for`` or by calling :meth:`next`
60+
directly.
61+
"""
62+
63+
def __aiter__(self) -> Self: ...
64+
async def __anext__(self) -> str: ...
65+
async def next(self, timeout: float | timedelta | None = None) -> str:
66+
"""Receive the next consumer name from the stream.
67+
68+
:param timeout: maximum time to wait for a message in seconds
69+
or as a timedelta, defaults to None (wait indefinitely).
70+
:return: the next consumer name.
71+
:raises StopAsyncIteration: when the subscription is drained or
72+
unsubscribed.
73+
"""
74+
2375
@final
2476
class StreamsManager:
2577
"""Manager for JetStream stream CRUD operations."""
@@ -185,6 +237,27 @@ class ConsumersManager:
185237
:return: True if the consumer was resumed.
186238
"""
187239

240+
async def list(self) -> ConsumersIterator:
241+
"""List consumers subscribed to the stream.
242+
243+
This method iterates over all consumers on a
244+
stream and retunrns correct types, by looking
245+
at their config.
246+
247+
If you only need names, use :meth:`ConsumersManager.list_names` instead.
248+
249+
:return: an async iterator over consumers.
250+
"""
251+
252+
async def list_names(self) -> ConsumersNamesIterator:
253+
"""List names of consumers subscribed to the stream.
254+
255+
This method iterates over all consumer names on a
256+
stream.
257+
258+
:return: an async iterator over consumer names.
259+
"""
260+
188261
@final
189262
class ObjectStoreManager:
190263
"""Manager for object store bucket operations."""

python/natsrpy/_natsrpy_rs/js/stream.pyi

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,10 @@ class Stream:
467467
accessing messages in the stream, as well as managing consumers.
468468
"""
469469

470+
@property
471+
def consumers(self) -> ConsumersManager:
472+
"""Manager for consumers bound to this stream."""
473+
470474
async def direct_get(
471475
self,
472476
sequence: int,
@@ -479,6 +483,45 @@ class Stream:
479483
:return: the stream message.
480484
"""
481485

486+
async def direct_get_next_for_subject(
487+
self,
488+
subject: str,
489+
sequence: int | None = None,
490+
timeout: float | timedelta | None = None,
491+
) -> StreamMessage:
492+
"""Get the next message for a subject directly from the stream.
493+
494+
:param subject: subject to get the next message for.
495+
:param sequence: optional sequence number to start searching from.
496+
If not provided, starts from the beginning of the stream.
497+
:param timeout: operation timeout.
498+
:return: the next stream message matching the subject filter.
499+
"""
500+
501+
async def direct_get_first_for_subject(
502+
self,
503+
subject: str,
504+
timeout: float | timedelta | None = None,
505+
) -> StreamMessage:
506+
"""Get the first message for a subject directly from the stream.
507+
508+
:param subject: subject to get the first message for.
509+
:param timeout: operation timeout.
510+
:return: the first stream message matching the subject filter.
511+
"""
512+
513+
async def direct_get_last_for_subject(
514+
self,
515+
subject: str,
516+
timeout: float | timedelta | None = None,
517+
) -> StreamMessage:
518+
"""Get the last message for a subject directly from the stream.
519+
520+
:param subject: subject to get the last message for.
521+
:param timeout: operation timeout.
522+
:return: the last stream message matching the subject filter.
523+
"""
524+
482525
async def get_info(self, timeout: float | datetime | None = None) -> StreamInfo:
483526
"""Get information about the stream.
484527
@@ -505,6 +548,13 @@ class Stream:
505548
:return: number of messages purged.
506549
"""
507550

508-
@property
509-
def consumers(self) -> ConsumersManager:
510-
"""Manager for consumers bound to this stream."""
551+
async def delete_message(
552+
self,
553+
sequence: int,
554+
timeout: float | datetime | None = None,
555+
) -> None:
556+
"""Delete a message from the stream by sequence number.
557+
558+
:param sequence: sequence number of the message to delete.
559+
:param timeout: operation timeout.
560+
"""

python/natsrpy/js.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
KVOperation,
2121
KVStatus,
2222
)
23+
from ._natsrpy_rs.js.managers import ConsumersIterator, ConsumersNamesIterator
2324
from ._natsrpy_rs.js.object_store import (
2425
ObjectInfo,
2526
ObjectInfoIterator,
@@ -54,6 +55,8 @@
5455
"ClusterInfo",
5556
"Compression",
5657
"ConsumerLimits",
58+
"ConsumersIterator",
59+
"ConsumersNamesIterator",
5760
"CounterEntry",
5861
"Counters",
5962
"CountersConfig",
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
import uuid
2+
3+
from natsrpy.js import (
4+
JetStream,
5+
PullConsumer,
6+
PullConsumerConfig,
7+
PushConsumer,
8+
PushConsumerConfig,
9+
StreamConfig,
10+
)
11+
12+
13+
async def test_stream_direct_get_next_for_subject(js: JetStream) -> None:
14+
name = f"test-dgnfs-{uuid.uuid4().hex[:8]}"
15+
config = StreamConfig(name=name, subjects=[f"{name}.>"], allow_direct=True)
16+
stream = await js.streams.create(config)
17+
try:
18+
await js.publish(f"{name}.a", b"msg-a-1", wait=True)
19+
await js.publish(f"{name}.a", b"msg-a-2", wait=True)
20+
await js.publish(f"{name}.b", b"msg-b-1", wait=True)
21+
msg = await stream.direct_get_next_for_subject(f"{name}.a")
22+
assert msg.payload == b"msg-a-1"
23+
assert msg.subject == f"{name}.a"
24+
finally:
25+
await js.streams.delete(name)
26+
27+
28+
async def test_stream_direct_get_next_for_subject_with_sequence(
29+
js: JetStream,
30+
) -> None:
31+
name = f"test-dgnfss-{uuid.uuid4().hex[:8]}"
32+
config = StreamConfig(name=name, subjects=[f"{name}.>"], allow_direct=True)
33+
stream = await js.streams.create(config)
34+
try:
35+
await js.publish(f"{name}.a", b"msg-a-1", wait=True)
36+
await js.publish(f"{name}.a", b"msg-a-2", wait=True)
37+
await js.publish(f"{name}.b", b"msg-b-1", wait=True)
38+
msg = await stream.direct_get_next_for_subject(f"{name}.a", sequence=2)
39+
assert msg.payload == b"msg-a-2"
40+
finally:
41+
await js.streams.delete(name)
42+
43+
44+
async def test_stream_direct_get_first_for_subject(js: JetStream) -> None:
45+
name = f"test-dgffs-{uuid.uuid4().hex[:8]}"
46+
config = StreamConfig(name=name, subjects=[f"{name}.>"], allow_direct=True)
47+
stream = await js.streams.create(config)
48+
try:
49+
await js.publish(f"{name}.a", b"first-msg", wait=True)
50+
await js.publish(f"{name}.a", b"second-msg", wait=True)
51+
msg = await stream.direct_get_first_for_subject(f"{name}.a")
52+
assert msg.payload == b"first-msg"
53+
assert msg.subject == f"{name}.a"
54+
assert msg.sequence == 1
55+
finally:
56+
await js.streams.delete(name)
57+
58+
59+
async def test_stream_direct_get_last_for_subject(js: JetStream) -> None:
60+
name = f"test-dglfs-{uuid.uuid4().hex[:8]}"
61+
config = StreamConfig(name=name, subjects=[f"{name}.>"], allow_direct=True)
62+
stream = await js.streams.create(config)
63+
try:
64+
await js.publish(f"{name}.a", b"first-msg", wait=True)
65+
await js.publish(f"{name}.a", b"last-msg", wait=True)
66+
msg = await stream.direct_get_last_for_subject(f"{name}.a")
67+
assert msg.payload == b"last-msg"
68+
assert msg.subject == f"{name}.a"
69+
assert msg.sequence == 2
70+
finally:
71+
await js.streams.delete(name)
72+
73+
74+
async def test_stream_delete_message(js: JetStream) -> None:
75+
name = f"test-delmsg-{uuid.uuid4().hex[:8]}"
76+
subj = f"{name}.data"
77+
config = StreamConfig(name=name, subjects=[f"{name}.>"])
78+
stream = await js.streams.create(config)
79+
try:
80+
await js.publish(subj, b"msg-1", wait=True)
81+
await js.publish(subj, b"msg-2", wait=True)
82+
await js.publish(subj, b"msg-3", wait=True)
83+
info = await stream.get_info()
84+
assert info.state.messages == 3
85+
86+
await stream.delete_message(sequence=2)
87+
88+
info = await stream.get_info()
89+
assert info.state.messages == 2
90+
finally:
91+
await js.streams.delete(name)
92+
93+
94+
async def test_consumers_list(js: JetStream) -> None:
95+
name = f"test-clist-{uuid.uuid4().hex[:8]}"
96+
config = StreamConfig(name=name, subjects=[f"{name}.>"])
97+
stream = await js.streams.create(config)
98+
try:
99+
consumer_name1 = f"consumer-{uuid.uuid4().hex[:8]}"
100+
consumer_name2 = f"consumer-{uuid.uuid4().hex[:8]}"
101+
await stream.consumers.create(PullConsumerConfig(name=consumer_name1))
102+
await stream.consumers.create(PullConsumerConfig(name=consumer_name2))
103+
104+
consumers_iter = await stream.consumers.list()
105+
found = []
106+
async for consumer in consumers_iter:
107+
assert isinstance(consumer, (PullConsumer, PushConsumer))
108+
found.append(consumer)
109+
assert len(found) == 2
110+
finally:
111+
await js.streams.delete(name)
112+
113+
114+
async def test_consumers_list_returns_correct_types(js: JetStream) -> None:
115+
name = f"test-cltype-{uuid.uuid4().hex[:8]}"
116+
config = StreamConfig(name=name, subjects=[f"{name}.>"])
117+
stream = await js.streams.create(config)
118+
try:
119+
pull_name = f"pull-{uuid.uuid4().hex[:8]}"
120+
push_name = f"push-{uuid.uuid4().hex[:8]}"
121+
await stream.consumers.create(PullConsumerConfig(name=pull_name))
122+
deliver_subj = uuid.uuid4().hex
123+
await stream.consumers.create(
124+
PushConsumerConfig(deliver_subject=deliver_subj, name=push_name),
125+
)
126+
127+
consumers_iter = await stream.consumers.list()
128+
types_found: dict[str, type] = {}
129+
async for consumer in consumers_iter:
130+
if isinstance(consumer, PullConsumer):
131+
types_found["pull"] = type(consumer)
132+
elif isinstance(consumer, PushConsumer):
133+
types_found["push"] = type(consumer)
134+
assert types_found.get("pull") is PullConsumer
135+
assert types_found.get("push") is PushConsumer
136+
finally:
137+
await js.streams.delete(name)
138+
139+
140+
async def test_consumers_list_names(js: JetStream) -> None:
141+
name = f"test-clnames-{uuid.uuid4().hex[:8]}"
142+
config = StreamConfig(name=name, subjects=[f"{name}.>"])
143+
stream = await js.streams.create(config)
144+
try:
145+
consumer_name1 = f"consumer-{uuid.uuid4().hex[:8]}"
146+
consumer_name2 = f"consumer-{uuid.uuid4().hex[:8]}"
147+
await stream.consumers.create(PullConsumerConfig(name=consumer_name1))
148+
await stream.consumers.create(PullConsumerConfig(name=consumer_name2))
149+
150+
names_iter = await stream.consumers.list_names()
151+
found_names: list[str] = []
152+
async for cname in names_iter:
153+
assert isinstance(cname, str)
154+
found_names.append(cname)
155+
assert sorted(found_names) == sorted([consumer_name1, consumer_name2])
156+
finally:
157+
await js.streams.delete(name)
158+
159+
160+
async def test_consumers_list_empty(js: JetStream) -> None:
161+
name = f"test-clempty-{uuid.uuid4().hex[:8]}"
162+
config = StreamConfig(name=name, subjects=[f"{name}.>"])
163+
stream = await js.streams.create(config)
164+
try:
165+
consumers_iter = await stream.consumers.list()
166+
found = []
167+
async for consumer in consumers_iter:
168+
found.append(consumer)
169+
assert len(found) == 0
170+
finally:
171+
await js.streams.delete(name)
172+
173+
174+
async def test_consumers_list_names_empty(js: JetStream) -> None:
175+
name = f"test-clnempty-{uuid.uuid4().hex[:8]}"
176+
config = StreamConfig(name=name, subjects=[f"{name}.>"])
177+
stream = await js.streams.create(config)
178+
try:
179+
names_iter = await stream.consumers.list_names()
180+
found_names: list[str] = []
181+
async for cname in names_iter:
182+
found_names.append(cname)
183+
assert len(found_names) == 0
184+
finally:
185+
await js.streams.delete(name)

0 commit comments

Comments
 (0)