Skip to content

Commit 9f574f7

Browse files
committed
implement filtering on super_stream
1 parent 2eef540 commit 9f574f7

File tree

4 files changed

+26
-9
lines changed

4 files changed

+26
-9
lines changed

src/environment.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ impl Environment {
5959
SuperStreamProducerBuilder {
6060
environment: self.clone(),
6161
data: PhantomData,
62-
//filter_value_extractor: None,
62+
filter_value_extractor: None,
6363
route_strategy: routing_strategy,
6464
}
6565
}
@@ -78,6 +78,7 @@ impl Environment {
7878
SuperStreamConsumerBuilder {
7979
environment: self.clone(),
8080
offset_specification: OffsetSpecification::Next,
81+
filter_configuration: None,
8182
}
8283
}
8384

src/producer.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,14 @@ impl<T> ProducerBuilder<T> {
242242
self.filter_value_extractor = Some(f);
243243
self
244244
}
245+
246+
pub fn filter_value_extractor_arc(
247+
mut self,
248+
filter_value_extractor: Option<FilterValueExtractor>,
249+
) -> Self {
250+
self.filter_value_extractor = filter_value_extractor;
251+
self
252+
}
245253
}
246254

247255
pub struct MessageAccumulator {

src/superstream_consumer.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::consumer::Delivery;
22
use crate::error::{ConsumerCloseError, ConsumerDeliveryError};
33
use crate::superstream::DefaultSuperStreamMetadata;
4-
use crate::{error::ConsumerCreateError, ConsumerHandle, Environment};
4+
use crate::{error::ConsumerCreateError, ConsumerHandle, Environment, FilterConfiguration};
55
use futures::task::AtomicWaker;
66
use futures::{Stream, StreamExt};
77
use rabbitmq_stream_protocol::commands::subscribe::OffsetSpecification;
@@ -13,8 +13,6 @@ use std::task::{Context, Poll};
1313
use tokio::sync::mpsc::{channel, Receiver};
1414
use tokio::task;
1515

16-
//type FilterPredicate = Option<Arc<dyn Fn(&Message) -> bool + Send + Sync>>;
17-
1816
/// API for consuming RabbitMQ stream messages
1917
pub struct SuperStreamConsumer {
2018
internal: Arc<SuperStreamConsumerInternal>,
@@ -25,12 +23,14 @@ struct SuperStreamConsumerInternal {
2523
closed: Arc<AtomicBool>,
2624
handlers: Vec<ConsumerHandle>,
2725
waker: AtomicWaker,
26+
filter_configuration: Option<FilterConfiguration>,
2827
}
2928

3029
/// Builder for [`Consumer`]
3130
pub struct SuperStreamConsumerBuilder {
3231
pub(crate) environment: Environment,
3332
pub(crate) offset_specification: OffsetSpecification,
33+
pub(crate) filter_configuration: Option<FilterConfiguration>,
3434
}
3535

3636
impl SuperStreamConsumerBuilder {
@@ -58,6 +58,7 @@ impl SuperStreamConsumerBuilder {
5858
.environment
5959
.consumer()
6060
.offset(self.offset_specification.clone())
61+
.filter_input(self.filter_configuration.clone())
6162
.build(partition.as_str())
6263
.await
6364
.unwrap();
@@ -75,6 +76,7 @@ impl SuperStreamConsumerBuilder {
7576
closed: Arc::new(AtomicBool::new(false)),
7677
handlers,
7778
waker: AtomicWaker::new(),
79+
filter_configuration: self.filter_configuration.clone(),
7880
};
7981

8082
Ok(SuperStreamConsumer {

src/superstream_producer.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::future::Future;
1212
use std::marker::PhantomData;
1313
use std::sync::Arc;
1414

15-
//type FilterValueExtractor = Arc<dyn Fn(&Message) -> String + 'static + Send + Sync>;
15+
type FilterValueExtractor = Arc<dyn Fn(&Message) -> String + 'static + Send + Sync>;
1616

1717
#[derive(Clone)]
1818
pub struct SuperStreamProducer<T>(
@@ -25,7 +25,7 @@ pub struct SuperStreamProducer<T>(
2525
/// Builder for [`SuperStreamProducer`]
2626
pub struct SuperStreamProducerBuilder<T> {
2727
pub(crate) environment: Environment,
28-
//pub filter_value_extractor: Option<FilterValueExtractor>,
28+
pub filter_value_extractor: Option<FilterValueExtractor>,
2929
pub route_strategy: RoutingStrategy,
3030
pub(crate) data: PhantomData<T>,
3131
}
@@ -34,7 +34,7 @@ pub struct SuperStreamProducerInternal {
3434
pub(crate) environment: Environment,
3535
client: Client,
3636
// TODO: implement filtering for superstream
37-
//filter_value_extractor: Option<FilterValueExtractor>,
37+
filter_value_extractor: Option<FilterValueExtractor>,
3838
routing_strategy: RoutingStrategy,
3939
}
4040

@@ -62,7 +62,13 @@ impl SuperStreamProducer<NoDedup> {
6262

6363
for route in routes.into_iter() {
6464
if !self.1.contains_key(route.as_str()) {
65-
let producer = self.0.environment.producer().build(route.as_str()).await;
65+
let producer = self
66+
.0
67+
.environment
68+
.producer()
69+
.filter_value_extractor_arc(self.0.filter_value_extractor.clone())
70+
.build(route.as_str())
71+
.await;
6672
self.1.insert(route.clone(), producer.unwrap());
6773
}
6874

@@ -115,7 +121,7 @@ impl<T> SuperStreamProducerBuilder<T> {
115121
let super_stream_producer = SuperStreamProducerInternal {
116122
environment: self.environment.clone(),
117123
client,
118-
//filter_value_extractor: self.filter_value_extractor,
124+
filter_value_extractor: self.filter_value_extractor,
119125
routing_strategy: self.route_strategy,
120126
};
121127

0 commit comments

Comments
 (0)