From ac55fad7a943b1807a7e91e95808a48d08b1f0dc Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Thu, 5 Oct 2023 11:26:24 +0100 Subject: [PATCH] Fix creating consumer group against nonexisting stream. --- src/integration.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/integration.rs b/src/integration.rs index 307edbd..8473190 100644 --- a/src/integration.rs +++ b/src/integration.rs @@ -171,6 +171,7 @@ impl Integration { .arg(&key) .arg(&self.consumer_group) .arg(0) + .arg("MKSTREAM") .query_async(&mut redis_conn) .await { @@ -445,6 +446,7 @@ mod test { async fn test_integration() { let redis_url = env::var("TEST_REDIS_URL").unwrap_or("redis://127.0.0.1/1".to_string()); + setup_log(&Configuration::default()).unwrap(); register(Box::new(MockIntegration {})).await; let conf = Configuration { @@ -458,7 +460,7 @@ mod test { }; tokio::spawn(start(conf)); - sleep(Duration::from_millis(500)).await; + sleep(Duration::from_millis(100)).await; let redis_client = redis::Client::open(redis_url).unwrap(); let mut redis_conn = redis_client.get_async_connection().await.unwrap(); @@ -478,7 +480,7 @@ mod test { .await .unwrap(); - sleep(Duration::from_millis(500)).await; + sleep(Duration::from_millis(100)).await; let pl_recv = UPLINK_EVENTS .write() @@ -506,7 +508,7 @@ mod test { .await .unwrap(); - sleep(Duration::from_millis(500)).await; + sleep(Duration::from_millis(100)).await; let pl_recv = JOIN_EVENTS .write() @@ -534,7 +536,7 @@ mod test { .await .unwrap(); - sleep(Duration::from_millis(500)).await; + sleep(Duration::from_millis(100)).await; let pl_recv = ACK_EVENTS .write() @@ -562,7 +564,7 @@ mod test { .await .unwrap(); - sleep(Duration::from_millis(500)).await; + sleep(Duration::from_millis(100)).await; let pl_recv = TXACK_EVENTS .write() @@ -590,7 +592,7 @@ mod test { .await .unwrap(); - sleep(Duration::from_millis(500)).await; + sleep(Duration::from_millis(100)).await; let pl_recv = LOG_EVENTS .write() @@ -618,7 +620,7 @@ mod test { .await .unwrap(); - sleep(Duration::from_millis(500)).await; + sleep(Duration::from_millis(100)).await; let pl_recv = STATUS_EVENTS .write() @@ -646,7 +648,7 @@ mod test { .await .unwrap(); - sleep(Duration::from_millis(500)).await; + sleep(Duration::from_millis(100)).await; let pl_recv = LOCATION_EVENTS .write() @@ -674,7 +676,7 @@ mod test { .await .unwrap(); - sleep(Duration::from_millis(500)).await; + sleep(Duration::from_millis(100)).await; let pl_recv = INTEGRATION_EVENTS .write()