Skip to content

Commit 3325f06

Browse files
committed
improve super_stream send example
1 parent 153432e commit 3325f06

File tree

1 file changed

+16
-4
lines changed

1 file changed

+16
-4
lines changed

examples/send_super_stream.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,27 @@ use rabbitmq_stream_client::error::StreamCreateError;
22
use rabbitmq_stream_client::types::{
33
ByteCapacity, HashRoutingMurmurStrategy, Message, ResponseCode, RoutingStrategy,
44
};
5+
use std::convert::TryInto;
56
use std::sync::atomic::{AtomicU32, Ordering};
67
use std::sync::Arc;
78
use tokio::sync::Notify;
89

910
fn hash_strategy_value_extractor(message: &Message) -> String {
10-
String::from_utf8(Vec::from(message.data().unwrap())).expect("Found invalid UTF-8")
11+
message
12+
.application_properties()
13+
.unwrap()
14+
.get("id")
15+
.unwrap()
16+
.clone()
17+
.try_into()
18+
.unwrap()
1119
}
1220

1321
#[tokio::main]
1422
async fn main() -> Result<(), Box<dyn std::error::Error>> {
1523
use rabbitmq_stream_client::Environment;
1624
let environment = Environment::builder().build().await?;
17-
let message_count = 100;
25+
let message_count = 1000000;
1826
let stream = "hello-rust-stream";
1927
let confirmed_messages = Arc::new(AtomicU32::new(0));
2028
let notify_on_send = Arc::new(Notify::new());
@@ -54,10 +62,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5462
.unwrap();
5563

5664
for i in 0..message_count {
57-
println!("sending message {}", i);
5865
let counter = confirmed_messages.clone();
5966
let notifier = notify_on_send.clone();
60-
let msg = Message::builder().body(format!("message{}", i)).build();
67+
let msg = Message::builder()
68+
.body(format!("message{}", i))
69+
.application_properties()
70+
.insert("id", i.to_string())
71+
.message_builder()
72+
.build();
6173
super_stream_producer
6274
.send(msg, move |_| {
6375
let inner_counter = counter.clone();

0 commit comments

Comments
 (0)