Skip to content

Commit 2b0d441

Browse files
committed
Some jetstream updates.
1 parent 5d9a923 commit 2b0d441

File tree

5 files changed

+103
-23
lines changed

5 files changed

+103
-23
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ async-nats = "0.46"
1313
bytes = "1.11.1"
1414
futures-util = "0.3.32"
1515
log = "0.4.29"
16-
pyo3 = "0.28"
16+
pyo3 = { version = "0.28", features = ["abi3"] }
1717
pyo3-async-runtimes = { version = "0.28", features = ["tokio-runtime"] }
1818
pyo3-log = "0.13.3"
1919
thiserror = "2.0.18"

src/jetstream/jetstream.rs

Lines changed: 90 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
use std::ops::Deref;
12
use std::sync::Arc;
3+
use std::time::Duration;
24

35
use async_nats::Subject;
46
use async_nats::client::traits::Publisher;
57
use async_nats::connection::State;
6-
use pyo3::Py;
78
use pyo3::types::{PyBytesMethods, PyDict};
89
use pyo3::{Bound, PyAny, Python, pyclass, pymethods, types::PyBytes};
910
use tokio::sync::RwLock;
@@ -13,6 +14,44 @@ use crate::jetstream::kv::KeyValue;
1314
use crate::utils::headers::NatsrpyHeadermapExt;
1415
use crate::{exceptions::rust_err::NatsrpyResult, utils::natsrpy_future};
1516

17+
#[pyclass]
18+
pub enum StorageType {
19+
File,
20+
Memory,
21+
}
22+
23+
impl From<&StorageType> for async_nats::jetstream::stream::StorageType {
24+
fn from(value: &StorageType) -> Self {
25+
match value {
26+
StorageType::File => Self::File,
27+
StorageType::Memory => Self::Memory,
28+
}
29+
}
30+
}
31+
32+
#[pyclass]
33+
pub struct Republish {
34+
pub source: String,
35+
pub destination: String,
36+
pub headers_only: bool,
37+
}
38+
impl From<&Republish> for async_nats::jetstream::stream::Republish {
39+
fn from(value: &Republish) -> Self {
40+
Self {
41+
source: value.source.clone(),
42+
destination: value.destination.clone(),
43+
headers_only: value.headers_only.clone(),
44+
}
45+
}
46+
}
47+
48+
#[pyclass]
49+
pub struct Source {
50+
pub name: String,
51+
pub filter_subject: Option<String>,
52+
pub external: bool,
53+
}
54+
1655
#[pyclass]
1756
pub struct JetStream {
1857
ctx: Arc<RwLock<async_nats::jetstream::Context>>,
@@ -29,11 +68,11 @@ impl JetStream {
2968
#[pymethods]
3069
impl JetStream {
3170
#[pyo3(signature = (
32-
subject,
33-
payload,
34-
*,
35-
headers=None,
36-
reply=None,
71+
subject,
72+
payload,
73+
*,
74+
headers=None,
75+
reply=None,
3776
err_on_disconnect = false
3877
))]
3978
pub fn publish<'a>(
@@ -70,22 +109,58 @@ impl JetStream {
70109
&self,
71110
py: Python<'a>,
72111
bucket: String,
112+
description: Option<String>,
113+
max_value_size: Option<i32>,
114+
history: Option<i64>,
115+
max_age: Option<f32>,
116+
max_bytes: Option<i64>,
117+
storage: Option<Bound<'a, StorageType>>,
118+
num_replicas: Option<usize>,
119+
republish: Option<Bound<'a, Republish>>,
120+
mirror: Option<Bound<'a, Source>>,
121+
// sources: Option<Vec<Source>>,
122+
mirror_direct: Option<bool>,
123+
compression: Option<bool>,
124+
// placement: Option<Place>,
125+
limit_markers: Option<f32>,
73126
) -> NatsrpyResult<Bound<'a, PyAny>> {
74127
let ctx = self.ctx.clone();
128+
let storage = storage.map(|val| val.borrow().deref().into());
129+
let republish = republish.map(|val| val.borrow().deref().into());
75130
natsrpy_future(py, async move {
76131
let js = ctx.read().await;
77-
Ok(KeyValue::new(js.create_key_value(async_nats::jetstream::kv::Config{
78-
bucket: bucket,
79-
// todo!("Add other config options")
132+
let mut config = async_nats::jetstream::kv::Config {
133+
bucket: bucket.clone(),
80134
..Default::default()
81-
}).await?))
135+
};
136+
description
137+
.into_iter()
138+
.for_each(|descr| config.description = descr);
139+
max_value_size
140+
.into_iter()
141+
.for_each(|val| config.max_value_size = val);
142+
history.into_iter().for_each(|val| config.history = val);
143+
max_age
144+
.into_iter()
145+
.for_each(|val| config.max_age = Duration::from_secs_f32(val));
146+
max_bytes.into_iter().for_each(|val| config.max_bytes = val);
147+
num_replicas
148+
.into_iter()
149+
.for_each(|val| config.num_replicas = val);
150+
mirror_direct
151+
.into_iter()
152+
.for_each(|val| config.mirror_direct = val);
153+
compression
154+
.into_iter()
155+
.for_each(|val| config.compression = val);
156+
storage.into_iter().for_each(|val| config.storage = val);
157+
config.republish = republish;
158+
config.limit_markers = limit_markers.map(Duration::from_secs_f32);
159+
160+
Ok(KeyValue::new(js.create_key_value(config).await?))
82161
})
83162
}
84-
pub fn get_kv<'a>(
85-
&self,
86-
py: Python<'a>,
87-
bucket: String,
88-
) -> NatsrpyResult<Bound<'a, PyAny>> {
163+
pub fn get_kv<'a>(&self, py: Python<'a>, bucket: String) -> NatsrpyResult<Bound<'a, PyAny>> {
89164
let ctx = self.ctx.clone();
90165
natsrpy_future(py, async move {
91166
let js = ctx.read().await;

src/jetstream/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,7 @@ use pyo3::pymodule;
99
pub mod pymod {
1010
#[pymodule_export]
1111
use super::jetstream::JetStream;
12+
13+
#[pymodule_export]
14+
use super::jetstream::StorageType;
1215
}

src/nats_cls.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -205,11 +205,11 @@ impl NatsCls {
205205
}
206206

207207
#[pyo3(signature = (
208-
*,
209-
domain=None,
208+
*,
209+
domain=None,
210210
api_prefix=None,
211-
timeout=None,
212-
ack_timeout=None,
211+
timeout=None,
212+
ack_timeout=None,
213213
concurrency_limit = None,
214214
max_ack_inflight=None,
215215
backpressure_on_inflight=None,
@@ -247,11 +247,13 @@ impl NatsCls {
247247
builder = builder.backpressure_on_inflight(backpressure_on_inflight);
248248
}
249249
if domain.is_some() && api_prefix.is_some() {
250-
return Err(NatsrpyError::InvalidArgument(String::from("Either domain or api_prefix should be specified, not both.")));
250+
return Err(NatsrpyError::InvalidArgument(String::from(
251+
"Either domain or api_prefix should be specified, not both.",
252+
)));
251253
}
252254
let js = if let Some(api_prefix) = api_prefix {
253255
builder.api_prefix(api_prefix).build(session.clone())
254-
} else if let Some(domain) = domain{
256+
} else if let Some(domain) = domain {
255257
builder.domain(domain).build(session.clone())
256258
} else {
257259
builder.build(session.clone())

src/subscription.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::{sync::Arc, time::Duration};
44

55
use pyo3::Py;
66
use pyo3::types::PyBytes;
7-
use pyo3::{Bound, IntoPyObjectExt, PyAny, PyRef, Python, pyclass, pymethods};
7+
use pyo3::{Bound, PyAny, PyRef, Python, pyclass, pymethods};
88
use tokio::sync::Mutex;
99

1010
use crate::exceptions::rust_err::NatsrpyError;

0 commit comments

Comments
 (0)