diff --git a/Cargo.toml b/Cargo.toml index 5460a502f2..8f5e7a4eb8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -149,7 +149,7 @@ fluvio-protocol = { version = "0.9.3", path = "crates/fluvio-protocol" } fluvio-spu-schema = { version = "0.14.0", path = "crates/fluvio-spu-schema", default-features = false } fluvio-sc-schema = { version = "0.19.0", path = "crates/fluvio-sc-schema", default-features = false } fluvio-service = { path = "crates/fluvio-service" } -fluvio-socket = { version = "0.14.2", path = "crates/fluvio-socket", default-features = false } +fluvio-socket = { version = "0.14.3", path = "crates/fluvio-socket", default-features = false } fluvio-smartengine = { version = "0.7.0", path = "crates/fluvio-smartengine", default-features = false } fluvio-smartmodule = { version = "0.5.0", path = "crates/fluvio-smartmodule", default-features = false } fluvio-storage = { path = "crates/fluvio-storage" } diff --git a/crates/fluvio-socket/Cargo.toml b/crates/fluvio-socket/Cargo.toml index 14f6e4e680..722acbf0cd 100644 --- a/crates/fluvio-socket/Cargo.toml +++ b/crates/fluvio-socket/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fluvio-socket" -version = "0.14.2" +version = "0.14.3" edition = "2021" authors = ["Fluvio Contributors "] description = "Provide TCP socket wrapper for fluvio protocol" diff --git a/crates/fluvio-socket/src/multiplexing.rs b/crates/fluvio-socket/src/multiplexing.rs index 9d883277ec..a34237de21 100644 --- a/crates/fluvio-socket/src/multiplexing.rs +++ b/crates/fluvio-socket/src/multiplexing.rs @@ -531,6 +531,7 @@ impl SharedMsg { mod tests { use std::time::Duration; + use std::io::ErrorKind; use async_trait::async_trait; use futures_util::future::{join, join3}; @@ -545,6 +546,7 @@ mod tests { use fluvio_protocol::api::RequestMessage; use super::MultiplexerSocket; + use super::SocketError; use crate::test_request::*; use crate::ExclusiveFlvSink; use crate::FluvioSocket; @@ -560,6 +562,9 @@ mod tests { #[allow(unused)] const X509_CLIENT_KEY: &str = "certs/certs/client.key"; + #[allow(unused)] + const SLEEP_MS: u64 = 10; + #[async_trait] trait AcceptorHandler { type Stream: AsyncRead + AsyncWrite + Unpin + Send; @@ -578,7 +583,21 @@ mod tests { } } - async fn test_server(addr: &str, mut handler: A) { + fn get_error_kind( + result: Result, + ) -> Option { + match result { + Err(SocketError::Io { source, .. }) => Some(source.kind()), + _ => None, + } + } + + async fn test_server( + addr: &str, + mut handler: A, + nb_iter: usize, + timeout: u64, + ) { let listener = TcpListener::bind(addr).await.expect("binding"); debug!("server is running"); let mut incoming = listener.incoming(); @@ -593,7 +612,7 @@ mod tests { let mut api_stream = stream.api_stream::(); - for i in 0..4u16 { + for i in 0..nb_iter { debug!("server: waiting for next msg: {}", i); let msg = api_stream.next().await.expect("msg").expect("unwrap"); debug!("server: msg received: {:#?}", msg); @@ -605,7 +624,8 @@ mod tests { if echo_request.request().msg == "slow" { debug!("server: received slow msg"); spawn(async move { - sleep(Duration::from_millis(500)).await; + sleep(Duration::from_millis(SLEEP_MS * 50)).await; + sleep(Duration::from_secs(timeout)).await; //simulate more waiting time from server while receiving slow msg let resp = echo_request.new_response(EchoResponse::new("slow".to_owned())); debug!("server send slow response"); @@ -631,7 +651,7 @@ mod tests { debug!("server: received async status msg"); let mut reply_sink = shared_sink.clone(); spawn(async move { - sleep(Duration::from_millis(30)).await; + sleep(Duration::from_millis(SLEEP_MS * 3)).await; let resp = status_request.new_response(AsyncStatusResponse { status: status_request.request.count * 2, }); @@ -640,7 +660,7 @@ mod tests { .await .expect("send succeed"); debug!("server: send back status first"); - sleep(Duration::from_millis(100)).await; + sleep(Duration::from_millis(SLEEP_MS * 10)).await; let resp = status_request.new_response(AsyncStatusResponse { status: status_request.request.count * 4, }); @@ -676,12 +696,12 @@ mod tests { async fn test_client(addr: &str, mut handler: C) { use std::time::SystemTime; - sleep(Duration::from_millis(20)).await; + sleep(Duration::from_millis(SLEEP_MS * 2)).await; debug!("client: trying to connect"); let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail"); let socket = handler.connect(tcp_stream).await; debug!("client: connected to test server and waiting..."); - sleep(Duration::from_millis(20)).await; + sleep(Duration::from_millis(SLEEP_MS * 2)).await; let multiplexer = MultiplexerSocket::shared(socket); // create async status @@ -709,7 +729,7 @@ mod tests { }, async move { // this message will be send later than slow but since there is no delay, it should get earlier than first - sleep(Duration::from_millis(20)).await; + sleep(Duration::from_millis(SLEEP_MS * 2)).await; debug!("trying to send fast"); let request = RequestMessage::new_request(EchoRequest::new("fast".to_owned())); let response = multiplexor2 @@ -721,7 +741,7 @@ mod tests { SystemTime::now() }, async move { - sleep(Duration::from_millis(100)).await; + sleep(Duration::from_millis(SLEEP_MS * 10)).await; let response = status_response .next() .await @@ -755,6 +775,76 @@ mod tests { assert_eq!(response.msg, "hello"); } + async fn test_client_closed_socket(addr: &str, mut handler: C) { + use std::time::SystemTime; + + sleep(Duration::from_millis(SLEEP_MS * 2)).await; + debug!("client: trying to connect"); + let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail"); + let socket = handler.connect(tcp_stream).await; + debug!("client: connected to test server and waiting..."); + sleep(Duration::from_millis(SLEEP_MS * 2)).await; + let multiplexer: std::sync::Arc = MultiplexerSocket::shared(socket); + + let multiplexor2 = multiplexer.clone(); + + let (slow, fast) = join( + async move { + debug!("trying to send slow"); + // this message was send first but since there is delay of 500ms, it will return slower than fast + let request = RequestMessage::new_request(EchoRequest::new("slow".to_owned())); + let response = multiplexer.send_and_receive(request).await; + assert!(response.is_err()); + + let err_kind = get_error_kind(response).expect("Get right Error Kind"); + let expected = ErrorKind::UnexpectedEof; + assert_eq!(expected, err_kind); + debug!("client: socket was closed"); + + SystemTime::now() + }, + async move { + // this message will be send later than slow but since there is no delay, it should get earlier than first + sleep(Duration::from_millis(SLEEP_MS * 2)).await; + debug!("trying to send fast"); + let request = RequestMessage::new_request(EchoRequest::new("fast".to_owned())); + let response = multiplexor2 + .send_and_receive(request) + .await + .expect("send success"); + debug!("received fast response"); + assert_eq!(response.msg, "hello"); + multiplexor2.terminate.notify(usize::MAX); //close multiplexor2 + SystemTime::now() + }, + ) + .await; + assert!(slow > fast); + } + + async fn test_client_time_out(addr: &str, mut handler: C) { + sleep(Duration::from_millis(SLEEP_MS * 2)).await; + debug!("client: trying to connect"); + let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail"); + let socket = handler.connect(tcp_stream).await; + debug!("client: connected to test server and waiting..."); + sleep(Duration::from_millis(SLEEP_MS * 2)).await; + let multiplexer: std::sync::Arc = MultiplexerSocket::shared(socket); + + let expected: ErrorKind = ErrorKind::TimedOut; + + debug!("trying to send slow"); + + let request = RequestMessage::new_request(EchoRequest::new("slow".to_owned())); + let response = multiplexer.send_and_receive(request).await; + assert!(response.is_err()); + + let err_kind = get_error_kind(response).expect("Get right Error Kind"); + + assert_eq!(expected, err_kind); + debug!("client: socket was timeout"); + } + #[fluvio_future::test(ignore)] async fn test_multiplexing() { debug!("start testing"); @@ -762,11 +852,34 @@ mod tests { let _r = join( test_client(addr, TcpStreamHandler {}), - test_server(addr, TcpStreamHandler {}), + test_server(addr, TcpStreamHandler {}, 4, 0), + ) + .await; + } + + #[fluvio_future::test(ignore)] + async fn test_multiplexing_close_socket() { + debug!("start test_multiplexing_close_socket"); + let addr = "127.0.0.1:6000"; + + let _r = join( + test_client_closed_socket(addr, TcpStreamHandler {}), + test_server(addr, TcpStreamHandler {}, 2, 0), ) .await; } + #[fluvio_future::test(ignore)] + async fn test_multiplexing_time_out() { + debug!("start test_multiplexing_timeout"); + let addr = "127.0.0.1:6000"; + + let _r = join( + test_client_time_out(addr, TcpStreamHandler {}), + test_server(addr, TcpStreamHandler {}, 1, 60), //MAX_WAIT_TIME is 60 second + ) + .await; + } #[cfg(unix)] mod tls_test { use std::os::unix::io::AsRawFd; @@ -856,7 +969,7 @@ mod tests { let _r = join( test_client(addr, TlsConnectorHandler::new()), - test_server(addr, TlsAcceptorHandler::new()), + test_server(addr, TlsAcceptorHandler::new(), 4, 0), ) .await; }