@@ -2,19 +2,27 @@ use rabbitmq_stream_client::error::StreamCreateError;
22use rabbitmq_stream_client:: types:: {
33 ByteCapacity , HashRoutingMurmurStrategy , Message , ResponseCode , RoutingStrategy ,
44} ;
5+ use std:: convert:: TryInto ;
56use std:: sync:: atomic:: { AtomicU32 , Ordering } ;
67use std:: sync:: Arc ;
78use tokio:: sync:: Notify ;
89
910fn hash_strategy_value_extractor ( message : & Message ) -> String {
10- String :: from_utf8 ( Vec :: from ( message. data ( ) . unwrap ( ) ) ) . expect ( "Found invalid UTF-8" )
11+ message
12+ . application_properties ( )
13+ . unwrap ( )
14+ . get ( "id" )
15+ . unwrap ( )
16+ . clone ( )
17+ . try_into ( )
18+ . unwrap ( )
1119}
1220
1321#[ tokio:: main]
1422async fn main ( ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
1523 use rabbitmq_stream_client:: Environment ;
1624 let environment = Environment :: builder ( ) . build ( ) . await ?;
17- let message_count = 100 ;
25+ let message_count = 1000000 ;
1826 let stream = "hello-rust-stream" ;
1927 let confirmed_messages = Arc :: new ( AtomicU32 :: new ( 0 ) ) ;
2028 let notify_on_send = Arc :: new ( Notify :: new ( ) ) ;
@@ -54,10 +62,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5462 . unwrap ( ) ;
5563
5664 for i in 0 ..message_count {
57- println ! ( "sending message {}" , i) ;
5865 let counter = confirmed_messages. clone ( ) ;
5966 let notifier = notify_on_send. clone ( ) ;
60- let msg = Message :: builder ( ) . body ( format ! ( "message{}" , i) ) . build ( ) ;
67+ let msg = Message :: builder ( )
68+ . body ( format ! ( "message{}" , i) )
69+ . application_properties ( )
70+ . insert ( "id" , i. to_string ( ) )
71+ . message_builder ( )
72+ . build ( ) ;
6173 super_stream_producer
6274 . send ( msg, move |_| {
6375 let inner_counter = counter. clone ( ) ;
0 commit comments