-
-
Notifications
You must be signed in to change notification settings - Fork 970
Add support for NATS JetStream as a transport #2299
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
joeriddles
wants to merge
17
commits into
celery:main
Choose a base branch
from
joeriddles:nats
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 13 commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
5514e2a
Implement basic NATS JetStream transport
joeriddles 5a43ee2
Add NATS demo server to examples
joeriddles 943f310
Rename module to nats.py
joeriddles 793fb97
Use range for nats dependency
joeriddles 9daa834
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] b147b45
Add NATS integration tests
joeriddles 0b21e60
Fix default NATS max_msgs_per_subject
joeriddles 1bacb07
Fix NATS healthcheck_retries
joeriddles 1a320ab
Fix NATS integration test Dockerfile
joeriddles 722111a
Improve NATS event loop logic
joeriddles e78076c
Add NATS doc
joeriddles a12fac6
Fix codespell
joeriddles f45fbfc
Fix nats integration test not running in tox
joeriddles 2e67938
Merge branch 'main' into nats
auvipy 0428d5a
Add NATS back to tox
joeriddles ee076ad
Fix flake8 blank line contains whitespace
joeriddles b254371
fix tox
joeriddles File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| ================================================ | ||
| NATS Transport - ``kombu.transport.nats`` | ||
| ================================================ | ||
|
|
||
| .. currentmodule:: kombu.transport.nats | ||
|
|
||
| .. automodule:: kombu.transport.nats | ||
|
|
||
| .. contents:: | ||
| :local: | ||
|
|
||
| Transport | ||
| --------- | ||
|
|
||
| .. autoclass:: Transport | ||
| :members: | ||
| :undoc-members: | ||
|
|
||
| Channel | ||
| ------- | ||
|
|
||
| .. autoclass:: Channel | ||
| :members: | ||
| :undoc-members: | ||
|
|
||
| Message | ||
| ------- | ||
|
|
||
| .. autoclass:: Message | ||
| :members: | ||
| :undoc-members: |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import sys | ||
| from pprint import pformat | ||
|
|
||
| from kombu import Connection, Consumer, Exchange, Queue, eventloop | ||
|
|
||
| LOCAL_SERVER = "localhost" | ||
| DEMO_SERVER = "demo.nats.io" | ||
|
|
||
| server = LOCAL_SERVER | ||
| use_demo_server = len(sys.argv) > 1 and sys.argv[1] == "--demo" | ||
| if use_demo_server: | ||
| server = DEMO_SERVER | ||
|
|
||
|
|
||
| exchange = Exchange("exchange", "direct", durable=False) | ||
| msg_queue = Queue("kombu_demo", exchange=exchange, routing_key="messages") | ||
|
|
||
|
|
||
| def pretty(obj): | ||
| return pformat(obj, indent=4) | ||
|
|
||
|
|
||
| def process_msg(body, message): | ||
| print(f"Received message: {body!r}") | ||
| print(f" properties:\n{pretty(message.properties)}") | ||
| print(f" delivery_info:\n{pretty(message.delivery_info)}") | ||
| message.ack() | ||
|
|
||
|
|
||
| with Connection(f"nats://{server}:4222") as connection: | ||
| with Consumer(connection, msg_queue, callbacks=[process_msg]) as consumer: | ||
| for msg in eventloop(connection): | ||
| pass |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import sys | ||
|
|
||
| from nats.js.api import StorageType | ||
|
|
||
| from kombu import Connection, Exchange, Queue | ||
|
|
||
| LOCAL_SERVER = "localhost" | ||
| DEMO_SERVER = "demo.nats.io" | ||
|
|
||
| server = LOCAL_SERVER | ||
| use_demo_server = len(sys.argv) > 1 and sys.argv[1] == "--demo" | ||
| if use_demo_server: | ||
| server = DEMO_SERVER | ||
|
|
||
|
|
||
| exchange = Exchange("exchange", "direct", durable=False) | ||
| msg_queue = Queue("kombu_demo", exchange=exchange, routing_key="messages") | ||
|
|
||
|
|
||
| with Connection(f"nats://{server}:4222", transport_options={ | ||
| "stream_config": { | ||
| "storage": StorageType.FILE, | ||
| } | ||
| }) as conn: | ||
| producer = conn.Producer() | ||
| producer.publish( | ||
| "hello world", exchange=exchange, routing_key="messages", declare=[msg_queue] | ||
| ) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.