Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - add more multiplexing_test #3249

Closed
Closed
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 125 additions & 11 deletions crates/fluvio-socket/src/multiplexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ impl SharedMsg {
mod tests {

use std::time::Duration;
use std::io::ErrorKind;

use async_trait::async_trait;
use futures_util::future::{join, join3};
Expand All @@ -545,6 +546,7 @@ mod tests {
use fluvio_protocol::api::RequestMessage;

use super::MultiplexerSocket;
use super::SocketError;
use crate::test_request::*;
use crate::ExclusiveFlvSink;
use crate::FluvioSocket;
Expand All @@ -560,6 +562,9 @@ mod tests {
#[allow(unused)]
const X509_CLIENT_KEY: &str = "certs/certs/client.key";

#[allow(unused)]
const SLEEP_MS: u64 = 10;

#[async_trait]
trait AcceptorHandler {
type Stream: AsyncRead + AsyncWrite + Unpin + Send;
Expand All @@ -578,7 +583,21 @@ mod tests {
}
}

async fn test_server<A: AcceptorHandler + 'static>(addr: &str, mut handler: A) {
fn get_error_kind<T: std::fmt::Debug>(
result: Result<T, SocketError>,
) -> Option<std::io::ErrorKind> {
match result.unwrap_err() {
SocketError::Io { source, .. } => Some(source.kind()),
galibey marked this conversation as resolved.
Show resolved Hide resolved
_ => None,
}
}

async fn test_server<A: AcceptorHandler + 'static>(
addr: &str,
mut handler: A,
nb_iter: usize,
timeout: u64,
) {
let listener = TcpListener::bind(addr).await.expect("binding");
debug!("server is running");
let mut incoming = listener.incoming();
Expand All @@ -593,7 +612,7 @@ mod tests {

let mut api_stream = stream.api_stream::<TestApiRequest, TestKafkaApiEnum>();

for i in 0..4u16 {
for i in 0..nb_iter {
debug!("server: waiting for next msg: {}", i);
let msg = api_stream.next().await.expect("msg").expect("unwrap");
debug!("server: msg received: {:#?}", msg);
Expand All @@ -605,7 +624,8 @@ mod tests {
if echo_request.request().msg == "slow" {
debug!("server: received slow msg");
spawn(async move {
sleep(Duration::from_millis(500)).await;
sleep(Duration::from_millis(SLEEP_MS * 50)).await;
sleep(Duration::from_secs(timeout)).await;
let resp =
echo_request.new_response(EchoResponse::new("slow".to_owned()));
debug!("server send slow response");
Expand All @@ -631,7 +651,7 @@ mod tests {
debug!("server: received async status msg");
let mut reply_sink = shared_sink.clone();
spawn(async move {
sleep(Duration::from_millis(30)).await;
sleep(Duration::from_millis(SLEEP_MS * 3)).await;
let resp = status_request.new_response(AsyncStatusResponse {
status: status_request.request.count * 2,
});
Expand All @@ -640,7 +660,7 @@ mod tests {
.await
.expect("send succeed");
debug!("server: send back status first");
sleep(Duration::from_millis(100)).await;
sleep(Duration::from_millis(SLEEP_MS * 10)).await;
let resp = status_request.new_response(AsyncStatusResponse {
status: status_request.request.count * 4,
});
Expand Down Expand Up @@ -676,12 +696,12 @@ mod tests {
async fn test_client<C: ConnectorHandler + 'static>(addr: &str, mut handler: C) {
use std::time::SystemTime;

sleep(Duration::from_millis(20)).await;
sleep(Duration::from_millis(SLEEP_MS * 2)).await;
debug!("client: trying to connect");
let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail");
let socket = handler.connect(tcp_stream).await;
debug!("client: connected to test server and waiting...");
sleep(Duration::from_millis(20)).await;
sleep(Duration::from_millis(SLEEP_MS * 2)).await;
let multiplexer = MultiplexerSocket::shared(socket);

// create async status
Expand Down Expand Up @@ -709,7 +729,7 @@ mod tests {
},
async move {
// this message will be send later than slow but since there is no delay, it should get earlier than first
sleep(Duration::from_millis(20)).await;
sleep(Duration::from_millis(SLEEP_MS * 2)).await;
debug!("trying to send fast");
let request = RequestMessage::new_request(EchoRequest::new("fast".to_owned()));
let response = multiplexor2
Expand All @@ -721,7 +741,7 @@ mod tests {
SystemTime::now()
},
async move {
sleep(Duration::from_millis(100)).await;
sleep(Duration::from_millis(SLEEP_MS * 10)).await;
let response = status_response
.next()
.await
Expand Down Expand Up @@ -755,18 +775,112 @@ mod tests {
assert_eq!(response.msg, "hello");
}

async fn test_client_closed_socket<C: ConnectorHandler + 'static>(addr: &str, mut handler: C) {
use std::time::SystemTime;

sleep(Duration::from_millis(SLEEP_MS * 2)).await;
debug!("client: trying to connect");
let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail");
let socket = handler.connect(tcp_stream).await;
debug!("client: connected to test server and waiting...");
sleep(Duration::from_millis(SLEEP_MS * 2)).await;
let multiplexer: std::sync::Arc<MultiplexerSocket> = MultiplexerSocket::shared(socket);

let multiplexor2 = multiplexer.clone();

let (slow, fast) = join(
async move {
debug!("trying to send slow");
// this message was send first but since there is delay of 500ms, it will return slower than fast
let request = RequestMessage::new_request(EchoRequest::new("slow".to_owned()));
let response = multiplexer.send_and_receive(request).await;
assert!(response.is_err());

let err_kind = get_error_kind(response);
let expected = ErrorKind::UnexpectedEof;
assert_eq!(expected, err_kind.unwrap());
debug!("client: socket was closed");

SystemTime::now()
},
async move {
// this message will be send later than slow but since there is no delay, it should get earlier than first
sleep(Duration::from_millis(SLEEP_MS * 2)).await;
debug!("trying to send fast");
let request = RequestMessage::new_request(EchoRequest::new("fast".to_owned()));
let response = multiplexor2
.send_and_receive(request)
.await
.expect("send success");
debug!("received fast response");
assert_eq!(response.msg, "hello");
multiplexor2.terminate.notify(usize::MAX); //close multiplexor2
SystemTime::now()
},
)
.await;
assert!(slow > fast);
}

async fn test_client_time_out<C: ConnectorHandler + 'static>(addr: &str, mut handler: C) {
sleep(Duration::from_millis(SLEEP_MS * 2)).await;
debug!("client: trying to connect");
let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail");
let socket = handler.connect(tcp_stream).await;
debug!("client: connected to test server and waiting...");
sleep(Duration::from_millis(SLEEP_MS * 2)).await;
let multiplexer: std::sync::Arc<MultiplexerSocket> = MultiplexerSocket::shared(socket);

let fut = async move {
galibey marked this conversation as resolved.
Show resolved Hide resolved
debug!("trying to send slow");

let request = RequestMessage::new_request(EchoRequest::new("slow".to_owned()));
let response = multiplexer.send_and_receive(request).await;
assert!(response.is_err());

let err_kind = get_error_kind(response);
let expected: ErrorKind = ErrorKind::TimedOut;
assert_eq!(expected, err_kind.unwrap());
debug!("client: socket was timeout");
};
fut.await;
}

#[fluvio_future::test(ignore)]
async fn test_multiplexing() {
debug!("start testing");
let addr = "127.0.0.1:6000";

let _r = join(
test_client(addr, TcpStreamHandler {}),
test_server(addr, TcpStreamHandler {}),
test_server(addr, TcpStreamHandler {}, 4, 0),
)
.await;
}

#[fluvio_future::test(ignore)]
async fn test_multiplexing_close_socket() {
debug!("start test_multiplexing_close_socket");
let addr = "127.0.0.1:6000";

let _r = join(
test_client_closed_socket(addr, TcpStreamHandler {}),
test_server(addr, TcpStreamHandler {}, 2, 0),
)
.await;
}

#[fluvio_future::test(ignore)]
async fn test_multiplexing_time_out() {
debug!("start test_multiplexing_timeout");
let addr = "127.0.0.1:6000";

let _r = join(
test_client_time_out(addr, TcpStreamHandler {}),
test_server(addr, TcpStreamHandler {}, 1, 60), //MAX_WAIT_TIME is 60 second
)
.await;
}
#[cfg(unix)]
mod tls_test {
use std::os::unix::io::AsRawFd;
Expand Down Expand Up @@ -856,7 +970,7 @@ mod tests {

let _r = join(
test_client(addr, TlsConnectorHandler::new()),
test_server(addr, TlsAcceptorHandler::new()),
test_server(addr, TlsAcceptorHandler::new(), 4, 0),
)
.await;
}
Expand Down