From 23328e5c1e3d79434cfea18080c5855ef71af476 Mon Sep 17 00:00:00 2001 From: Tan Date: Fri, 12 May 2023 18:11:29 +0700 Subject: [PATCH 1/5] add more multiplexing_test --- crates/fluvio-socket/src/multiplexing.rs | 120 ++++++++++++++++++++++- 1 file changed, 116 insertions(+), 4 deletions(-) diff --git a/crates/fluvio-socket/src/multiplexing.rs b/crates/fluvio-socket/src/multiplexing.rs index 9d883277ec..9a93257af8 100644 --- a/crates/fluvio-socket/src/multiplexing.rs +++ b/crates/fluvio-socket/src/multiplexing.rs @@ -531,6 +531,8 @@ impl SharedMsg { mod tests { use std::time::Duration; + use std::io::ErrorKind; + use std::fmt::Debug; use async_trait::async_trait; use futures_util::future::{join, join3}; @@ -545,6 +547,7 @@ mod tests { use fluvio_protocol::api::RequestMessage; use super::MultiplexerSocket; + use super::SocketError; use crate::test_request::*; use crate::ExclusiveFlvSink; use crate::FluvioSocket; @@ -578,7 +581,21 @@ mod tests { } } - async fn test_server(addr: &str, mut handler: A) { + fn get_error_kind( + result: Result, + ) -> Option { + match result.unwrap_err() { + SocketError::Io { source, .. } => Some(source.kind()), + _ => None, + } + } + + async fn test_server( + addr: &str, + mut handler: A, + nb_iter: usize, + timeout: u64, + ) { let listener = TcpListener::bind(addr).await.expect("binding"); debug!("server is running"); let mut incoming = listener.incoming(); @@ -593,7 +610,7 @@ mod tests { let mut api_stream = stream.api_stream::(); - for i in 0..4u16 { + for i in 0..nb_iter { debug!("server: waiting for next msg: {}", i); let msg = api_stream.next().await.expect("msg").expect("unwrap"); debug!("server: msg received: {:#?}", msg); @@ -606,6 +623,7 @@ mod tests { debug!("server: received slow msg"); spawn(async move { sleep(Duration::from_millis(500)).await; + sleep(Duration::from_secs(timeout)).await; let resp = echo_request.new_response(EchoResponse::new("slow".to_owned())); debug!("server send slow response"); @@ -755,6 +773,77 @@ mod tests { assert_eq!(response.msg, "hello"); } + async fn test_client_closed_socket(addr: &str, mut handler: C) { + use std::time::SystemTime; + + sleep(Duration::from_millis(20)).await; + debug!("client: trying to connect"); + let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail"); + let socket = handler.connect(tcp_stream).await; + debug!("client: connected to test server and waiting..."); + sleep(Duration::from_millis(20)).await; + let multiplexer: std::sync::Arc = MultiplexerSocket::shared(socket); + + let multiplexor2 = multiplexer.clone(); + + let (slow, fast) = join( + async move { + debug!("trying to send slow"); + // this message was send first but since there is delay of 500ms, it will return slower than fast + let request = RequestMessage::new_request(EchoRequest::new("slow".to_owned())); + let response = multiplexer.send_and_receive(request).await; + assert!(response.is_err()); + + let err_kind = get_error_kind(response); + let expected = ErrorKind::UnexpectedEof; + assert_eq!(expected, err_kind.unwrap()); + debug!("client: socket was closed"); + + SystemTime::now() + }, + async move { + // this message will be send later than slow but since there is no delay, it should get earlier than first + sleep(Duration::from_millis(20)).await; + debug!("trying to send fast"); + let request = RequestMessage::new_request(EchoRequest::new("fast".to_owned())); + let response = multiplexor2 + .send_and_receive(request) + .await + .expect("send success"); + debug!("received fast response"); + assert_eq!(response.msg, "hello"); + multiplexor2.terminate.notify(usize::MAX); //close multiplexor2 + SystemTime::now() + }, + ) + .await; + assert!(slow > fast); + } + + async fn test_client_time_out(addr: &str, mut handler: C) { + sleep(Duration::from_millis(20)).await; + debug!("client: trying to connect"); + let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail"); + let socket = handler.connect(tcp_stream).await; + debug!("client: connected to test server and waiting..."); + sleep(Duration::from_millis(20)).await; + let multiplexer: std::sync::Arc = MultiplexerSocket::shared(socket); + + let fut = async move { + debug!("trying to send slow"); + + let request = RequestMessage::new_request(EchoRequest::new("slow".to_owned())); + let response = multiplexer.send_and_receive(request).await; + assert!(response.is_err()); + + let err_kind = get_error_kind(response); + let expected: ErrorKind = ErrorKind::TimedOut; + assert_eq!(expected, err_kind.unwrap()); + debug!("client: socket was timeout"); + }; + fut.await; + } + #[fluvio_future::test(ignore)] async fn test_multiplexing() { debug!("start testing"); @@ -762,11 +851,34 @@ mod tests { let _r = join( test_client(addr, TcpStreamHandler {}), - test_server(addr, TcpStreamHandler {}), + test_server(addr, TcpStreamHandler {}, 4, 0), ) .await; } + #[fluvio_future::test(ignore)] + async fn test_multiplexing_close_socket() { + debug!("start test_multiplexing_close_socket"); + let addr = "127.0.0.1:6000"; + + let _r = join( + test_client_closed_socket(addr, TcpStreamHandler {}), + test_server(addr, TcpStreamHandler {}, 2, 0), + ) + .await; + } + + #[fluvio_future::test(ignore)] + async fn test_multiplexing_time_out() { + debug!("start test_multiplexing_timeout"); + let addr = "127.0.0.1:6000"; + + let _r = join( + test_client_time_out(addr, TcpStreamHandler {}), + test_server(addr, TcpStreamHandler {}, 1, 60), + ) + .await; + } #[cfg(unix)] mod tls_test { use std::os::unix::io::AsRawFd; @@ -856,7 +968,7 @@ mod tests { let _r = join( test_client(addr, TlsConnectorHandler::new()), - test_server(addr, TlsAcceptorHandler::new()), + test_server(addr, TlsAcceptorHandler::new(), 4, 0), ) .await; } From 1e6206a4c4c0e5b113f9aadc6f19a6a3df183e29 Mon Sep 17 00:00:00 2001 From: Tan Date: Sun, 14 May 2023 16:52:44 +0700 Subject: [PATCH 2/5] Update multiplexing.rs fix clippy issue --- crates/fluvio-socket/src/multiplexing.rs | 32 +++++++++++++----------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/crates/fluvio-socket/src/multiplexing.rs b/crates/fluvio-socket/src/multiplexing.rs index 9a93257af8..35a0d24000 100644 --- a/crates/fluvio-socket/src/multiplexing.rs +++ b/crates/fluvio-socket/src/multiplexing.rs @@ -532,8 +532,7 @@ mod tests { use std::time::Duration; use std::io::ErrorKind; - use std::fmt::Debug; - + use async_trait::async_trait; use futures_util::future::{join, join3}; use futures_util::io::{AsyncRead, AsyncWrite}; @@ -563,6 +562,9 @@ mod tests { #[allow(unused)] const X509_CLIENT_KEY: &str = "certs/certs/client.key"; + #[allow(unused)] + const SLEEP_MS: u64 = 10; + #[async_trait] trait AcceptorHandler { type Stream: AsyncRead + AsyncWrite + Unpin + Send; @@ -622,7 +624,7 @@ mod tests { if echo_request.request().msg == "slow" { debug!("server: received slow msg"); spawn(async move { - sleep(Duration::from_millis(500)).await; + sleep(Duration::from_millis(SLEEP_MS*50)).await; sleep(Duration::from_secs(timeout)).await; let resp = echo_request.new_response(EchoResponse::new("slow".to_owned())); @@ -649,7 +651,7 @@ mod tests { debug!("server: received async status msg"); let mut reply_sink = shared_sink.clone(); spawn(async move { - sleep(Duration::from_millis(30)).await; + sleep(Duration::from_millis(SLEEP_MS*3)).await; let resp = status_request.new_response(AsyncStatusResponse { status: status_request.request.count * 2, }); @@ -658,7 +660,7 @@ mod tests { .await .expect("send succeed"); debug!("server: send back status first"); - sleep(Duration::from_millis(100)).await; + sleep(Duration::from_millis(SLEEP_MS*10)).await; let resp = status_request.new_response(AsyncStatusResponse { status: status_request.request.count * 4, }); @@ -694,12 +696,12 @@ mod tests { async fn test_client(addr: &str, mut handler: C) { use std::time::SystemTime; - sleep(Duration::from_millis(20)).await; + sleep(Duration::from_millis(SLEEP_MS*2)).await; debug!("client: trying to connect"); let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail"); let socket = handler.connect(tcp_stream).await; debug!("client: connected to test server and waiting..."); - sleep(Duration::from_millis(20)).await; + sleep(Duration::from_millis(SLEEP_MS*2)).await; let multiplexer = MultiplexerSocket::shared(socket); // create async status @@ -727,7 +729,7 @@ mod tests { }, async move { // this message will be send later than slow but since there is no delay, it should get earlier than first - sleep(Duration::from_millis(20)).await; + sleep(Duration::from_millis(SLEEP_MS*2)).await; debug!("trying to send fast"); let request = RequestMessage::new_request(EchoRequest::new("fast".to_owned())); let response = multiplexor2 @@ -739,7 +741,7 @@ mod tests { SystemTime::now() }, async move { - sleep(Duration::from_millis(100)).await; + sleep(Duration::from_millis(SLEEP_MS*10)).await; let response = status_response .next() .await @@ -776,12 +778,12 @@ mod tests { async fn test_client_closed_socket(addr: &str, mut handler: C) { use std::time::SystemTime; - sleep(Duration::from_millis(20)).await; + sleep(Duration::from_millis(SLEEP_MS*2)).await; debug!("client: trying to connect"); let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail"); let socket = handler.connect(tcp_stream).await; debug!("client: connected to test server and waiting..."); - sleep(Duration::from_millis(20)).await; + sleep(Duration::from_millis(SLEEP_MS*2)).await; let multiplexer: std::sync::Arc = MultiplexerSocket::shared(socket); let multiplexor2 = multiplexer.clone(); @@ -803,7 +805,7 @@ mod tests { }, async move { // this message will be send later than slow but since there is no delay, it should get earlier than first - sleep(Duration::from_millis(20)).await; + sleep(Duration::from_millis(SLEEP_MS*2)).await; debug!("trying to send fast"); let request = RequestMessage::new_request(EchoRequest::new("fast".to_owned())); let response = multiplexor2 @@ -821,12 +823,12 @@ mod tests { } async fn test_client_time_out(addr: &str, mut handler: C) { - sleep(Duration::from_millis(20)).await; + sleep(Duration::from_millis(SLEEP_MS*2)).await; debug!("client: trying to connect"); let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail"); let socket = handler.connect(tcp_stream).await; debug!("client: connected to test server and waiting..."); - sleep(Duration::from_millis(20)).await; + sleep(Duration::from_millis(SLEEP_MS*2)).await; let multiplexer: std::sync::Arc = MultiplexerSocket::shared(socket); let fut = async move { @@ -875,7 +877,7 @@ mod tests { let _r = join( test_client_time_out(addr, TcpStreamHandler {}), - test_server(addr, TcpStreamHandler {}, 1, 60), + test_server(addr, TcpStreamHandler {}, 1, 60),//MAX_WAIT_TIME is 60 second ) .await; } From 7a8539bf0d4bfa9df70d7163f7988d2952f4775c Mon Sep 17 00:00:00 2001 From: Tan Date: Sun, 14 May 2023 17:33:09 +0700 Subject: [PATCH 3/5] Update multiplexing.rs fix format issue --- crates/fluvio-socket/src/multiplexing.rs | 30 ++++++++++++------------ 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/crates/fluvio-socket/src/multiplexing.rs b/crates/fluvio-socket/src/multiplexing.rs index 35a0d24000..b73704f33a 100644 --- a/crates/fluvio-socket/src/multiplexing.rs +++ b/crates/fluvio-socket/src/multiplexing.rs @@ -532,7 +532,7 @@ mod tests { use std::time::Duration; use std::io::ErrorKind; - + use async_trait::async_trait; use futures_util::future::{join, join3}; use futures_util::io::{AsyncRead, AsyncWrite}; @@ -563,7 +563,7 @@ mod tests { const X509_CLIENT_KEY: &str = "certs/certs/client.key"; #[allow(unused)] - const SLEEP_MS: u64 = 10; + const SLEEP_MS: u64 = 10; #[async_trait] trait AcceptorHandler { @@ -624,7 +624,7 @@ mod tests { if echo_request.request().msg == "slow" { debug!("server: received slow msg"); spawn(async move { - sleep(Duration::from_millis(SLEEP_MS*50)).await; + sleep(Duration::from_millis(SLEEP_MS * 50)).await; sleep(Duration::from_secs(timeout)).await; let resp = echo_request.new_response(EchoResponse::new("slow".to_owned())); @@ -651,7 +651,7 @@ mod tests { debug!("server: received async status msg"); let mut reply_sink = shared_sink.clone(); spawn(async move { - sleep(Duration::from_millis(SLEEP_MS*3)).await; + sleep(Duration::from_millis(SLEEP_MS * 3)).await; let resp = status_request.new_response(AsyncStatusResponse { status: status_request.request.count * 2, }); @@ -660,7 +660,7 @@ mod tests { .await .expect("send succeed"); debug!("server: send back status first"); - sleep(Duration::from_millis(SLEEP_MS*10)).await; + sleep(Duration::from_millis(SLEEP_MS * 10)).await; let resp = status_request.new_response(AsyncStatusResponse { status: status_request.request.count * 4, }); @@ -696,12 +696,12 @@ mod tests { async fn test_client(addr: &str, mut handler: C) { use std::time::SystemTime; - sleep(Duration::from_millis(SLEEP_MS*2)).await; + sleep(Duration::from_millis(SLEEP_MS * 2)).await; debug!("client: trying to connect"); let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail"); let socket = handler.connect(tcp_stream).await; debug!("client: connected to test server and waiting..."); - sleep(Duration::from_millis(SLEEP_MS*2)).await; + sleep(Duration::from_millis(SLEEP_MS * 2)).await; let multiplexer = MultiplexerSocket::shared(socket); // create async status @@ -729,7 +729,7 @@ mod tests { }, async move { // this message will be send later than slow but since there is no delay, it should get earlier than first - sleep(Duration::from_millis(SLEEP_MS*2)).await; + sleep(Duration::from_millis(SLEEP_MS * 2)).await; debug!("trying to send fast"); let request = RequestMessage::new_request(EchoRequest::new("fast".to_owned())); let response = multiplexor2 @@ -741,7 +741,7 @@ mod tests { SystemTime::now() }, async move { - sleep(Duration::from_millis(SLEEP_MS*10)).await; + sleep(Duration::from_millis(SLEEP_MS * 10)).await; let response = status_response .next() .await @@ -778,12 +778,12 @@ mod tests { async fn test_client_closed_socket(addr: &str, mut handler: C) { use std::time::SystemTime; - sleep(Duration::from_millis(SLEEP_MS*2)).await; + sleep(Duration::from_millis(SLEEP_MS * 2)).await; debug!("client: trying to connect"); let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail"); let socket = handler.connect(tcp_stream).await; debug!("client: connected to test server and waiting..."); - sleep(Duration::from_millis(SLEEP_MS*2)).await; + sleep(Duration::from_millis(SLEEP_MS * 2)).await; let multiplexer: std::sync::Arc = MultiplexerSocket::shared(socket); let multiplexor2 = multiplexer.clone(); @@ -805,7 +805,7 @@ mod tests { }, async move { // this message will be send later than slow but since there is no delay, it should get earlier than first - sleep(Duration::from_millis(SLEEP_MS*2)).await; + sleep(Duration::from_millis(SLEEP_MS * 2)).await; debug!("trying to send fast"); let request = RequestMessage::new_request(EchoRequest::new("fast".to_owned())); let response = multiplexor2 @@ -823,12 +823,12 @@ mod tests { } async fn test_client_time_out(addr: &str, mut handler: C) { - sleep(Duration::from_millis(SLEEP_MS*2)).await; + sleep(Duration::from_millis(SLEEP_MS * 2)).await; debug!("client: trying to connect"); let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail"); let socket = handler.connect(tcp_stream).await; debug!("client: connected to test server and waiting..."); - sleep(Duration::from_millis(SLEEP_MS*2)).await; + sleep(Duration::from_millis(SLEEP_MS * 2)).await; let multiplexer: std::sync::Arc = MultiplexerSocket::shared(socket); let fut = async move { @@ -877,7 +877,7 @@ mod tests { let _r = join( test_client_time_out(addr, TcpStreamHandler {}), - test_server(addr, TcpStreamHandler {}, 1, 60),//MAX_WAIT_TIME is 60 second + test_server(addr, TcpStreamHandler {}, 1, 60), //MAX_WAIT_TIME is 60 second ) .await; } From fd12c09d5d5250752160332d13c98b799d1c356e Mon Sep 17 00:00:00 2001 From: Tan Date: Tue, 16 May 2023 10:37:13 +0700 Subject: [PATCH 4/5] small update in test_time_out --- crates/fluvio-socket/src/multiplexing.rs | 31 ++++++++++++------------ 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/crates/fluvio-socket/src/multiplexing.rs b/crates/fluvio-socket/src/multiplexing.rs index b73704f33a..a34237de21 100644 --- a/crates/fluvio-socket/src/multiplexing.rs +++ b/crates/fluvio-socket/src/multiplexing.rs @@ -586,8 +586,8 @@ mod tests { fn get_error_kind( result: Result, ) -> Option { - match result.unwrap_err() { - SocketError::Io { source, .. } => Some(source.kind()), + match result { + Err(SocketError::Io { source, .. }) => Some(source.kind()), _ => None, } } @@ -625,7 +625,7 @@ mod tests { debug!("server: received slow msg"); spawn(async move { sleep(Duration::from_millis(SLEEP_MS * 50)).await; - sleep(Duration::from_secs(timeout)).await; + sleep(Duration::from_secs(timeout)).await; //simulate more waiting time from server while receiving slow msg let resp = echo_request.new_response(EchoResponse::new("slow".to_owned())); debug!("server send slow response"); @@ -796,9 +796,9 @@ mod tests { let response = multiplexer.send_and_receive(request).await; assert!(response.is_err()); - let err_kind = get_error_kind(response); + let err_kind = get_error_kind(response).expect("Get right Error Kind"); let expected = ErrorKind::UnexpectedEof; - assert_eq!(expected, err_kind.unwrap()); + assert_eq!(expected, err_kind); debug!("client: socket was closed"); SystemTime::now() @@ -831,19 +831,18 @@ mod tests { sleep(Duration::from_millis(SLEEP_MS * 2)).await; let multiplexer: std::sync::Arc = MultiplexerSocket::shared(socket); - let fut = async move { - debug!("trying to send slow"); + let expected: ErrorKind = ErrorKind::TimedOut; - let request = RequestMessage::new_request(EchoRequest::new("slow".to_owned())); - let response = multiplexer.send_and_receive(request).await; - assert!(response.is_err()); + debug!("trying to send slow"); - let err_kind = get_error_kind(response); - let expected: ErrorKind = ErrorKind::TimedOut; - assert_eq!(expected, err_kind.unwrap()); - debug!("client: socket was timeout"); - }; - fut.await; + let request = RequestMessage::new_request(EchoRequest::new("slow".to_owned())); + let response = multiplexer.send_and_receive(request).await; + assert!(response.is_err()); + + let err_kind = get_error_kind(response).expect("Get right Error Kind"); + + assert_eq!(expected, err_kind); + debug!("client: socket was timeout"); } #[fluvio_future::test(ignore)] From 8126742e176999f07b21b61c7a56da1b808b646b Mon Sep 17 00:00:00 2001 From: Tan Date: Tue, 16 May 2023 20:25:36 +0700 Subject: [PATCH 5/5] increate fluvio-socket version --- Cargo.toml | 2 +- crates/fluvio-socket/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5981e725b6..5fd7f26caa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -149,7 +149,7 @@ fluvio-protocol = { version = "0.9.3", path = "crates/fluvio-protocol" } fluvio-spu-schema = { version = "0.14.0", path = "crates/fluvio-spu-schema", default-features = false } fluvio-sc-schema = { version = "0.19.0", path = "crates/fluvio-sc-schema", default-features = false } fluvio-service = { path = "crates/fluvio-service" } -fluvio-socket = { version = "0.14.2", path = "crates/fluvio-socket", default-features = false } +fluvio-socket = { version = "0.14.3", path = "crates/fluvio-socket", default-features = false } fluvio-smartengine = { version = "0.7.0", path = "crates/fluvio-smartengine", default-features = false } fluvio-smartmodule = { version = "0.5.0", path = "crates/fluvio-smartmodule", default-features = false } fluvio-storage = { path = "crates/fluvio-storage" } diff --git a/crates/fluvio-socket/Cargo.toml b/crates/fluvio-socket/Cargo.toml index 14f6e4e680..722acbf0cd 100644 --- a/crates/fluvio-socket/Cargo.toml +++ b/crates/fluvio-socket/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fluvio-socket" -version = "0.14.2" +version = "0.14.3" edition = "2021" authors = ["Fluvio Contributors "] description = "Provide TCP socket wrapper for fluvio protocol"