Skip to content

Commit 9daed84

Browse files
committed
adding super_stream_filtering tests
1 parent 0a3a6ec commit 9daed84

File tree

2 files changed

+127
-0
lines changed

2 files changed

+127
-0
lines changed

tests/integration/consumer_test.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use rabbitmq_stream_client::{
1414
use tokio::task;
1515
use tokio::time::sleep;
1616

17+
use crate::producer_test::routing_key_strategy_value_extractor;
1718
use rabbitmq_stream_client::types::{
1819
HashRoutingMurmurStrategy, RoutingKeyRoutingStrategy, RoutingStrategy,
1920
};
@@ -505,6 +506,88 @@ async fn consumer_test_with_filtering() {
505506
producer.close().await.unwrap();
506507
}
507508

509+
#[tokio::test(flavor = "multi_thread")]
510+
async fn super_stream_consumer_test_with_filtering() {
511+
let env = TestEnvironment::create_super_stream().await;
512+
let reference: String = Faker.fake();
513+
514+
let message_count = 10;
515+
let mut super_stream_producer = env
516+
.env
517+
.super_stream_producer(RoutingStrategy::RoutingKeyStrategy(
518+
RoutingKeyRoutingStrategy {
519+
routing_extractor: &routing_key_strategy_value_extractor,
520+
},
521+
))
522+
.filter_value_extractor(|_| "filtering".to_string())
523+
.build(&env.super_stream)
524+
.await
525+
.unwrap();
526+
527+
let filter_configuration = FilterConfiguration::new(vec!["filtering".to_string()], false)
528+
.post_filter(|message| {
529+
String::from_utf8(message.data().unwrap().to_vec()).unwrap_or("".to_string())
530+
== "filtering".to_string()
531+
});
532+
533+
let mut super_stream_consumer = env
534+
.env
535+
.super_stream_consumer()
536+
.offset(OffsetSpecification::First)
537+
.filter_input(Some(filter_configuration))
538+
.build(&env.super_stream)
539+
.await
540+
.unwrap();
541+
542+
for _ in 0..message_count {
543+
let _ = super_stream_producer
544+
.send(
545+
Message::builder().body("filtering").build(),
546+
|_| async move {},
547+
)
548+
.await;
549+
550+
let _ = super_stream_producer
551+
.send(
552+
Message::builder().body("filtering").build(),
553+
|_| async move {},
554+
)
555+
.await;
556+
}
557+
558+
let response = Arc::new(tokio::sync::Mutex::new(vec![]));
559+
let response_clone = Arc::clone(&response);
560+
561+
let task = tokio::task::spawn(async move {
562+
loop {
563+
let delivery = super_stream_consumer.next().await.unwrap();
564+
565+
let d = delivery.unwrap();
566+
let data = d
567+
.message()
568+
.data()
569+
.map(|data| String::from_utf8(data.to_vec()).unwrap())
570+
.unwrap();
571+
572+
let mut r = response_clone.lock().await;
573+
r.push(data);
574+
}
575+
});
576+
577+
let _ = tokio::time::timeout(tokio::time::Duration::from_secs(3), task).await;
578+
let repsonse_length = response.lock().await.len();
579+
let filtering_response_length = response
580+
.lock()
581+
.await
582+
.iter()
583+
.filter(|item| item == &&"filtering")
584+
.collect::<Vec<_>>()
585+
.len();
586+
587+
assert!(repsonse_length == filtering_response_length);
588+
super_stream_producer.close().await.unwrap();
589+
}
590+
508591
#[tokio::test(flavor = "multi_thread")]
509592
async fn consumer_test_with_filtering_match_unfiltered() {
510593
let env = TestEnvironment::create().await;

tests/integration/producer_test.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,3 +523,47 @@ async fn producer_send_filtering_message() {
523523
true
524524
);
525525
}
526+
527+
#[tokio::test(flavor = "multi_thread")]
528+
async fn super_stream_producer_send_filtering_message() {
529+
let env = TestEnvironment::create_super_stream().await;
530+
let mut super_stream_producer = env
531+
.env
532+
.super_stream_producer(RoutingStrategy::HashRoutingStrategy(
533+
HashRoutingMurmurStrategy {
534+
routing_extractor: &hash_strategy_value_extractor,
535+
},
536+
))
537+
.filter_value_extractor(|message| {
538+
let app_properties = message.application_properties();
539+
match app_properties {
540+
Some(properties) => {
541+
let value = properties.get("region").and_then(|item| match item {
542+
SimpleValue::String(s) => Some(s.clone()),
543+
_ => None,
544+
});
545+
value.unwrap_or(String::from(""))
546+
}
547+
None => String::from(""),
548+
}
549+
})
550+
.build(&env.super_stream)
551+
.await
552+
.unwrap();
553+
554+
let message_builder = Message::builder();
555+
let mut application_properties = message_builder.application_properties();
556+
application_properties = application_properties.insert("region", "emea");
557+
558+
let message = application_properties
559+
.message_builder()
560+
.body(b"message".to_vec())
561+
.build();
562+
563+
let closed = super_stream_producer.send(message, |_| async move {}).await;
564+
565+
match closed {
566+
Ok(_) => assert!(true),
567+
Err(_) => assert!(false),
568+
}
569+
}

0 commit comments

Comments
 (0)