Skip to content

Commit

Permalink
add more multiplexing_test (#3249)
Browse files Browse the repository at this point in the history
This is an attempt to add more integration test for #3174 . 
Need to be update more case + fuzz test
  • Loading branch information
TanNgocDo committed May 16, 2023
1 parent 0377bae commit c39f2df
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 13 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ fluvio-protocol = { version = "0.9.3", path = "crates/fluvio-protocol" }
fluvio-spu-schema = { version = "0.14.0", path = "crates/fluvio-spu-schema", default-features = false }
fluvio-sc-schema = { version = "0.19.0", path = "crates/fluvio-sc-schema", default-features = false }
fluvio-service = { path = "crates/fluvio-service" }
fluvio-socket = { version = "0.14.2", path = "crates/fluvio-socket", default-features = false }
fluvio-socket = { version = "0.14.3", path = "crates/fluvio-socket", default-features = false }
fluvio-smartengine = { version = "0.7.0", path = "crates/fluvio-smartengine", default-features = false }
fluvio-smartmodule = { version = "0.5.0", path = "crates/fluvio-smartmodule", default-features = false }
fluvio-storage = { path = "crates/fluvio-storage" }
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-socket/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio-socket"
version = "0.14.2"
version = "0.14.3"
edition = "2021"
authors = ["Fluvio Contributors <[email protected]>"]
description = "Provide TCP socket wrapper for fluvio protocol"
Expand Down
135 changes: 124 additions & 11 deletions crates/fluvio-socket/src/multiplexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ impl SharedMsg {
mod tests {

use std::time::Duration;
use std::io::ErrorKind;

use async_trait::async_trait;
use futures_util::future::{join, join3};
Expand All @@ -545,6 +546,7 @@ mod tests {
use fluvio_protocol::api::RequestMessage;

use super::MultiplexerSocket;
use super::SocketError;
use crate::test_request::*;
use crate::ExclusiveFlvSink;
use crate::FluvioSocket;
Expand All @@ -560,6 +562,9 @@ mod tests {
#[allow(unused)]
const X509_CLIENT_KEY: &str = "certs/certs/client.key";

#[allow(unused)]
const SLEEP_MS: u64 = 10;

#[async_trait]
trait AcceptorHandler {
type Stream: AsyncRead + AsyncWrite + Unpin + Send;
Expand All @@ -578,7 +583,21 @@ mod tests {
}
}

async fn test_server<A: AcceptorHandler + 'static>(addr: &str, mut handler: A) {
fn get_error_kind<T: std::fmt::Debug>(
result: Result<T, SocketError>,
) -> Option<std::io::ErrorKind> {
match result {
Err(SocketError::Io { source, .. }) => Some(source.kind()),
_ => None,
}
}

async fn test_server<A: AcceptorHandler + 'static>(
addr: &str,
mut handler: A,
nb_iter: usize,
timeout: u64,
) {
let listener = TcpListener::bind(addr).await.expect("binding");
debug!("server is running");
let mut incoming = listener.incoming();
Expand All @@ -593,7 +612,7 @@ mod tests {

let mut api_stream = stream.api_stream::<TestApiRequest, TestKafkaApiEnum>();

for i in 0..4u16 {
for i in 0..nb_iter {
debug!("server: waiting for next msg: {}", i);
let msg = api_stream.next().await.expect("msg").expect("unwrap");
debug!("server: msg received: {:#?}", msg);
Expand All @@ -605,7 +624,8 @@ mod tests {
if echo_request.request().msg == "slow" {
debug!("server: received slow msg");
spawn(async move {
sleep(Duration::from_millis(500)).await;
sleep(Duration::from_millis(SLEEP_MS * 50)).await;
sleep(Duration::from_secs(timeout)).await; //simulate more waiting time from server while receiving slow msg
let resp =
echo_request.new_response(EchoResponse::new("slow".to_owned()));
debug!("server send slow response");
Expand All @@ -631,7 +651,7 @@ mod tests {
debug!("server: received async status msg");
let mut reply_sink = shared_sink.clone();
spawn(async move {
sleep(Duration::from_millis(30)).await;
sleep(Duration::from_millis(SLEEP_MS * 3)).await;
let resp = status_request.new_response(AsyncStatusResponse {
status: status_request.request.count * 2,
});
Expand All @@ -640,7 +660,7 @@ mod tests {
.await
.expect("send succeed");
debug!("server: send back status first");
sleep(Duration::from_millis(100)).await;
sleep(Duration::from_millis(SLEEP_MS * 10)).await;
let resp = status_request.new_response(AsyncStatusResponse {
status: status_request.request.count * 4,
});
Expand Down Expand Up @@ -676,12 +696,12 @@ mod tests {
async fn test_client<C: ConnectorHandler + 'static>(addr: &str, mut handler: C) {
use std::time::SystemTime;

sleep(Duration::from_millis(20)).await;
sleep(Duration::from_millis(SLEEP_MS * 2)).await;
debug!("client: trying to connect");
let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail");
let socket = handler.connect(tcp_stream).await;
debug!("client: connected to test server and waiting...");
sleep(Duration::from_millis(20)).await;
sleep(Duration::from_millis(SLEEP_MS * 2)).await;
let multiplexer = MultiplexerSocket::shared(socket);

// create async status
Expand Down Expand Up @@ -709,7 +729,7 @@ mod tests {
},
async move {
// this message will be send later than slow but since there is no delay, it should get earlier than first
sleep(Duration::from_millis(20)).await;
sleep(Duration::from_millis(SLEEP_MS * 2)).await;
debug!("trying to send fast");
let request = RequestMessage::new_request(EchoRequest::new("fast".to_owned()));
let response = multiplexor2
Expand All @@ -721,7 +741,7 @@ mod tests {
SystemTime::now()
},
async move {
sleep(Duration::from_millis(100)).await;
sleep(Duration::from_millis(SLEEP_MS * 10)).await;
let response = status_response
.next()
.await
Expand Down Expand Up @@ -755,18 +775,111 @@ mod tests {
assert_eq!(response.msg, "hello");
}

async fn test_client_closed_socket<C: ConnectorHandler + 'static>(addr: &str, mut handler: C) {
use std::time::SystemTime;

sleep(Duration::from_millis(SLEEP_MS * 2)).await;
debug!("client: trying to connect");
let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail");
let socket = handler.connect(tcp_stream).await;
debug!("client: connected to test server and waiting...");
sleep(Duration::from_millis(SLEEP_MS * 2)).await;
let multiplexer: std::sync::Arc<MultiplexerSocket> = MultiplexerSocket::shared(socket);

let multiplexor2 = multiplexer.clone();

let (slow, fast) = join(
async move {
debug!("trying to send slow");
// this message was send first but since there is delay of 500ms, it will return slower than fast
let request = RequestMessage::new_request(EchoRequest::new("slow".to_owned()));
let response = multiplexer.send_and_receive(request).await;
assert!(response.is_err());

let err_kind = get_error_kind(response).expect("Get right Error Kind");
let expected = ErrorKind::UnexpectedEof;
assert_eq!(expected, err_kind);
debug!("client: socket was closed");

SystemTime::now()
},
async move {
// this message will be send later than slow but since there is no delay, it should get earlier than first
sleep(Duration::from_millis(SLEEP_MS * 2)).await;
debug!("trying to send fast");
let request = RequestMessage::new_request(EchoRequest::new("fast".to_owned()));
let response = multiplexor2
.send_and_receive(request)
.await
.expect("send success");
debug!("received fast response");
assert_eq!(response.msg, "hello");
multiplexor2.terminate.notify(usize::MAX); //close multiplexor2
SystemTime::now()
},
)
.await;
assert!(slow > fast);
}

async fn test_client_time_out<C: ConnectorHandler + 'static>(addr: &str, mut handler: C) {
sleep(Duration::from_millis(SLEEP_MS * 2)).await;
debug!("client: trying to connect");
let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail");
let socket = handler.connect(tcp_stream).await;
debug!("client: connected to test server and waiting...");
sleep(Duration::from_millis(SLEEP_MS * 2)).await;
let multiplexer: std::sync::Arc<MultiplexerSocket> = MultiplexerSocket::shared(socket);

let expected: ErrorKind = ErrorKind::TimedOut;

debug!("trying to send slow");

let request = RequestMessage::new_request(EchoRequest::new("slow".to_owned()));
let response = multiplexer.send_and_receive(request).await;
assert!(response.is_err());

let err_kind = get_error_kind(response).expect("Get right Error Kind");

assert_eq!(expected, err_kind);
debug!("client: socket was timeout");
}

#[fluvio_future::test(ignore)]
async fn test_multiplexing() {
debug!("start testing");
let addr = "127.0.0.1:6000";

let _r = join(
test_client(addr, TcpStreamHandler {}),
test_server(addr, TcpStreamHandler {}),
test_server(addr, TcpStreamHandler {}, 4, 0),
)
.await;
}

#[fluvio_future::test(ignore)]
async fn test_multiplexing_close_socket() {
debug!("start test_multiplexing_close_socket");
let addr = "127.0.0.1:6000";

let _r = join(
test_client_closed_socket(addr, TcpStreamHandler {}),
test_server(addr, TcpStreamHandler {}, 2, 0),
)
.await;
}

#[fluvio_future::test(ignore)]
async fn test_multiplexing_time_out() {
debug!("start test_multiplexing_timeout");
let addr = "127.0.0.1:6000";

let _r = join(
test_client_time_out(addr, TcpStreamHandler {}),
test_server(addr, TcpStreamHandler {}, 1, 60), //MAX_WAIT_TIME is 60 second
)
.await;
}
#[cfg(unix)]
mod tls_test {
use std::os::unix::io::AsRawFd;
Expand Down Expand Up @@ -856,7 +969,7 @@ mod tests {

let _r = join(
test_client(addr, TlsConnectorHandler::new()),
test_server(addr, TlsAcceptorHandler::new()),
test_server(addr, TlsAcceptorHandler::new(), 4, 0),
)
.await;
}
Expand Down

0 comments on commit c39f2df

Please sign in to comment.