@@ -6,7 +6,7 @@ use util::*;
66#[ cfg( feature = "test" ) ]
77#[ cfg_attr( feature = "runtime-tokio" , tokio:: test) ]
88#[ cfg_attr( feature = "runtime-async-std" , async_std:: test) ]
9- async fn main ( ) -> anyhow:: Result < ( ) > {
9+ async fn realtime_1 ( ) -> anyhow:: Result < ( ) > {
1010 use sea_streamer_redis:: {
1111 AutoStreamReset , RedisConnectOptions , RedisConsumerOptions , RedisStreamer ,
1212 } ;
@@ -17,7 +17,7 @@ async fn main() -> anyhow::Result<()> {
1717 } ;
1818 use std:: time:: Duration ;
1919
20- const TEST : & str = "realtime " ;
20+ const TEST : & str = "realtime_1 " ;
2121 env_logger:: init ( ) ;
2222 test ( false ) . await ?;
2323 test ( true ) . await ?;
@@ -135,3 +135,120 @@ async fn main() -> anyhow::Result<()> {
135135
136136 Ok ( ( ) )
137137}
138+
139+ #[ cfg( feature = "test" ) ]
140+ #[ cfg_attr( feature = "runtime-tokio" , tokio:: test) ]
141+ #[ cfg_attr( feature = "runtime-async-std" , async_std:: test) ]
142+ async fn realtime_2 ( ) -> anyhow:: Result < ( ) > {
143+ use sea_streamer_redis:: {
144+ AutoStreamReset , RedisConnectOptions , RedisConsumerOptions , RedisResult , RedisStreamer ,
145+ } ;
146+ use sea_streamer_runtime:: sleep;
147+ use sea_streamer_types:: {
148+ export:: futures:: { Stream , StreamExt } ,
149+ Buffer , ConsumerMode , ConsumerOptions , Message , Producer , ShardId , SharedMessage ,
150+ StreamKey , Streamer , StreamerUri , Timestamp ,
151+ } ;
152+ use std:: time:: Duration ;
153+
154+ const TEST : & str = "realtime_2" ;
155+ env_logger:: init ( ) ;
156+ test ( false ) . await ?;
157+
158+ async fn test ( enable_cluster : bool ) -> anyhow:: Result < ( ) > {
159+ println ! ( "Enable Cluster = {enable_cluster} ..." ) ;
160+
161+ let mut options = RedisConnectOptions :: default ( ) ;
162+ options. set_enable_cluster ( enable_cluster) ;
163+ let streamer = RedisStreamer :: connect (
164+ std:: env:: var ( "BROKERS_URL" )
165+ . unwrap_or_else ( |_| "redis://localhost" . to_owned ( ) )
166+ . parse :: < StreamerUri > ( )
167+ . unwrap ( ) ,
168+ options,
169+ )
170+ . await ?;
171+ println ! ( "Connect Streamer ... ok" ) ;
172+
173+ let now = Timestamp :: now_utc ( ) ;
174+ let stream_key = StreamKey :: new ( format ! (
175+ "{}-{}a" ,
176+ TEST ,
177+ now. unix_timestamp_nanos( ) / 1_000_000
178+ ) ) ?;
179+ let zero = ShardId :: new ( 0 ) ;
180+
181+ let mut producer = streamer. create_generic_producer ( Default :: default ( ) ) . await ?;
182+
183+ println ! ( "Producing 0..5 ..." ) ;
184+ let mut sequence = 0 ;
185+ for i in 0 ..5 {
186+ let message = format ! ( "{i}" ) ;
187+ let receipt = producer. send_to ( & stream_key, message) ?. await ?;
188+ assert_eq ! ( receipt. stream_key( ) , & stream_key) ;
189+ // should always increase
190+ assert ! ( receipt. sequence( ) > & sequence) ;
191+ sequence = * receipt. sequence ( ) ;
192+ assert_eq ! ( receipt. shard_id( ) , & zero) ;
193+ }
194+
195+ let mut options = RedisConsumerOptions :: new ( ConsumerMode :: RealTime ) ;
196+ options. set_auto_stream_reset ( AutoStreamReset :: Latest ) ;
197+
198+ let mut half = streamer
199+ . create_consumer ( & [ stream_key. clone ( ) ] , options. clone ( ) )
200+ . await ?
201+ . into_stream ( ) ;
202+
203+ // Why do we have to wait? We want consumer to have started reading
204+ // before producing any messages. While after `create` returns the consumer
205+ // is ready (connection opened), there is still a small delay to send an `XREAD`
206+ // operation to the server.
207+ sleep ( Duration :: from_millis ( 5 ) ) . await ;
208+
209+ println ! ( "Producing 5..10 ..." ) ;
210+ for i in 5 ..10 {
211+ let message = format ! ( "{i}" ) ;
212+ producer. send_to ( & stream_key, message) ?;
213+ }
214+
215+ println ! ( "Flush producer ..." ) ;
216+ producer. flush ( ) . await ?;
217+
218+ options. set_auto_stream_reset ( AutoStreamReset :: Earliest ) ;
219+ let mut full = streamer
220+ . create_consumer ( & [ stream_key. clone ( ) ] , options)
221+ . await ?
222+ . into_stream ( ) ;
223+
224+ let seq = stream_n ( & mut half, 5 ) . await ?;
225+ assert_eq ! ( seq, [ 5 , 6 , 7 , 8 , 9 ] ) ;
226+ println ! ( "Stream latest ... ok" ) ;
227+
228+ let seq = stream_n ( & mut full, 10 ) . await ?;
229+ assert_eq ! ( seq, [ 0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 ] ) ;
230+ println ! ( "Stream all ... ok" ) ;
231+
232+ println ! ( "End test case." ) ;
233+ Ok ( ( ) )
234+ }
235+
236+ async fn stream_n < S : Stream < Item = RedisResult < SharedMessage > > + std:: marker:: Unpin > (
237+ stream : & mut S ,
238+ num : usize ,
239+ ) -> anyhow:: Result < Vec < usize > > {
240+ let mut numbers = Vec :: new ( ) ;
241+ for _ in 0 ..num {
242+ match stream. next ( ) . await {
243+ Some ( mess) => {
244+ let mess = mess?;
245+ numbers. push ( mess. message ( ) . as_str ( ) . unwrap ( ) . parse :: < usize > ( ) . unwrap ( ) ) ;
246+ }
247+ None => panic ! ( "Stream ended?" ) ,
248+ }
249+ }
250+ Ok ( numbers)
251+ }
252+
253+ Ok ( ( ) )
254+ }
0 commit comments