Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - add more multiplexing_test #3249

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ fluvio-protocol = { version = "0.9.3", path = "crates/fluvio-protocol" }
fluvio-spu-schema = { version = "0.14.0", path = "crates/fluvio-spu-schema", default-features = false }
fluvio-sc-schema = { version = "0.19.0", path = "crates/fluvio-sc-schema", default-features = false }
fluvio-service = { path = "crates/fluvio-service" }
fluvio-socket = { version = "0.14.2", path = "crates/fluvio-socket", default-features = false }
fluvio-socket = { version = "0.14.3", path = "crates/fluvio-socket", default-features = false }
fluvio-smartengine = { version = "0.7.0", path = "crates/fluvio-smartengine", default-features = false }
fluvio-smartmodule = { version = "0.5.0", path = "crates/fluvio-smartmodule", default-features = false }
fluvio-storage = { path = "crates/fluvio-storage" }
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-socket/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio-socket"
version = "0.14.2"
version = "0.14.3"
edition = "2021"
authors = ["Fluvio Contributors <[email protected]>"]
description = "Provide TCP socket wrapper for fluvio protocol"
Expand Down
135 changes: 124 additions & 11 deletions crates/fluvio-socket/src/multiplexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ impl SharedMsg {
mod tests {

use std::time::Duration;
use std::io::ErrorKind;

use async_trait::async_trait;
use futures_util::future::{join, join3};
Expand All @@ -545,6 +546,7 @@ mod tests {
use fluvio_protocol::api::RequestMessage;

use super::MultiplexerSocket;
use super::SocketError;
use crate::test_request::*;
use crate::ExclusiveFlvSink;
use crate::FluvioSocket;
Expand All @@ -560,6 +562,9 @@ mod tests {
#[allow(unused)]
const X509_CLIENT_KEY: &str = "certs/certs/client.key";

#[allow(unused)]
const SLEEP_MS: u64 = 10;

#[async_trait]
trait AcceptorHandler {
type Stream: AsyncRead + AsyncWrite + Unpin + Send;
Expand All @@ -578,7 +583,21 @@ mod tests {
}
}

async fn test_server<A: AcceptorHandler + 'static>(addr: &str, mut handler: A) {
fn get_error_kind<T: std::fmt::Debug>(
result: Result<T, SocketError>,
) -> Option<std::io::ErrorKind> {
match result {
Err(SocketError::Io { source, .. }) => Some(source.kind()),
_ => None,
}
}

async fn test_server<A: AcceptorHandler + 'static>(
addr: &str,
mut handler: A,
nb_iter: usize,
timeout: u64,
) {
let listener = TcpListener::bind(addr).await.expect("binding");
debug!("server is running");
let mut incoming = listener.incoming();
Expand All @@ -593,7 +612,7 @@ mod tests {

let mut api_stream = stream.api_stream::<TestApiRequest, TestKafkaApiEnum>();

for i in 0..4u16 {
for i in 0..nb_iter {
debug!("server: waiting for next msg: {}", i);
let msg = api_stream.next().await.expect("msg").expect("unwrap");
debug!("server: msg received: {:#?}", msg);
Expand All @@ -605,7 +624,8 @@ mod tests {
if echo_request.request().msg == "slow" {
debug!("server: received slow msg");
spawn(async move {
sleep(Duration::from_millis(500)).await;
sleep(Duration::from_millis(SLEEP_MS * 50)).await;
sleep(Duration::from_secs(timeout)).await; //simulate more waiting time from server while receiving slow msg
let resp =
echo_request.new_response(EchoResponse::new("slow".to_owned()));
debug!("server send slow response");
Expand All @@ -631,7 +651,7 @@ mod tests {
debug!("server: received async status msg");
let mut reply_sink = shared_sink.clone();
spawn(async move {
sleep(Duration::from_millis(30)).await;
sleep(Duration::from_millis(SLEEP_MS * 3)).await;
let resp = status_request.new_response(AsyncStatusResponse {
status: status_request.request.count * 2,
});
Expand All @@ -640,7 +660,7 @@ mod tests {
.await
.expect("send succeed");
debug!("server: send back status first");
sleep(Duration::from_millis(100)).await;
sleep(Duration::from_millis(SLEEP_MS * 10)).await;
let resp = status_request.new_response(AsyncStatusResponse {
status: status_request.request.count * 4,
});
Expand Down Expand Up @@ -676,12 +696,12 @@ mod tests {
async fn test_client<C: ConnectorHandler + 'static>(addr: &str, mut handler: C) {
use std::time::SystemTime;

sleep(Duration::from_millis(20)).await;
sleep(Duration::from_millis(SLEEP_MS * 2)).await;
debug!("client: trying to connect");
let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail");
let socket = handler.connect(tcp_stream).await;
debug!("client: connected to test server and waiting...");
sleep(Duration::from_millis(20)).await;
sleep(Duration::from_millis(SLEEP_MS * 2)).await;
let multiplexer = MultiplexerSocket::shared(socket);

// create async status
Expand Down Expand Up @@ -709,7 +729,7 @@ mod tests {
},
async move {
// this message will be send later than slow but since there is no delay, it should get earlier than first
sleep(Duration::from_millis(20)).await;
sleep(Duration::from_millis(SLEEP_MS * 2)).await;
debug!("trying to send fast");
let request = RequestMessage::new_request(EchoRequest::new("fast".to_owned()));
let response = multiplexor2
Expand All @@ -721,7 +741,7 @@ mod tests {
SystemTime::now()
},
async move {
sleep(Duration::from_millis(100)).await;
sleep(Duration::from_millis(SLEEP_MS * 10)).await;
let response = status_response
.next()
.await
Expand Down Expand Up @@ -755,18 +775,111 @@ mod tests {
assert_eq!(response.msg, "hello");
}

async fn test_client_closed_socket<C: ConnectorHandler + 'static>(addr: &str, mut handler: C) {
use std::time::SystemTime;

sleep(Duration::from_millis(SLEEP_MS * 2)).await;
debug!("client: trying to connect");
let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail");
let socket = handler.connect(tcp_stream).await;
debug!("client: connected to test server and waiting...");
sleep(Duration::from_millis(SLEEP_MS * 2)).await;
let multiplexer: std::sync::Arc<MultiplexerSocket> = MultiplexerSocket::shared(socket);

let multiplexor2 = multiplexer.clone();

let (slow, fast) = join(
async move {
debug!("trying to send slow");
// this message was send first but since there is delay of 500ms, it will return slower than fast
let request = RequestMessage::new_request(EchoRequest::new("slow".to_owned()));
let response = multiplexer.send_and_receive(request).await;
assert!(response.is_err());

let err_kind = get_error_kind(response).expect("Get right Error Kind");
let expected = ErrorKind::UnexpectedEof;
assert_eq!(expected, err_kind);
debug!("client: socket was closed");

SystemTime::now()
},
async move {
// this message will be send later than slow but since there is no delay, it should get earlier than first
sleep(Duration::from_millis(SLEEP_MS * 2)).await;
debug!("trying to send fast");
let request = RequestMessage::new_request(EchoRequest::new("fast".to_owned()));
let response = multiplexor2
.send_and_receive(request)
.await
.expect("send success");
debug!("received fast response");
assert_eq!(response.msg, "hello");
multiplexor2.terminate.notify(usize::MAX); //close multiplexor2
SystemTime::now()
},
)
.await;
assert!(slow > fast);
}

async fn test_client_time_out<C: ConnectorHandler + 'static>(addr: &str, mut handler: C) {
sleep(Duration::from_millis(SLEEP_MS * 2)).await;
debug!("client: trying to connect");
let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail");
let socket = handler.connect(tcp_stream).await;
debug!("client: connected to test server and waiting...");
sleep(Duration::from_millis(SLEEP_MS * 2)).await;
let multiplexer: std::sync::Arc<MultiplexerSocket> = MultiplexerSocket::shared(socket);

let expected: ErrorKind = ErrorKind::TimedOut;

debug!("trying to send slow");

let request = RequestMessage::new_request(EchoRequest::new("slow".to_owned()));
let response = multiplexer.send_and_receive(request).await;
assert!(response.is_err());

let err_kind = get_error_kind(response).expect("Get right Error Kind");

assert_eq!(expected, err_kind);
debug!("client: socket was timeout");
}

#[fluvio_future::test(ignore)]
async fn test_multiplexing() {
debug!("start testing");
let addr = "127.0.0.1:6000";

let _r = join(
test_client(addr, TcpStreamHandler {}),
test_server(addr, TcpStreamHandler {}),
test_server(addr, TcpStreamHandler {}, 4, 0),
)
.await;
}

#[fluvio_future::test(ignore)]
async fn test_multiplexing_close_socket() {
debug!("start test_multiplexing_close_socket");
let addr = "127.0.0.1:6000";

let _r = join(
test_client_closed_socket(addr, TcpStreamHandler {}),
test_server(addr, TcpStreamHandler {}, 2, 0),
)
.await;
}

#[fluvio_future::test(ignore)]
async fn test_multiplexing_time_out() {
debug!("start test_multiplexing_timeout");
let addr = "127.0.0.1:6000";

let _r = join(
test_client_time_out(addr, TcpStreamHandler {}),
test_server(addr, TcpStreamHandler {}, 1, 60), //MAX_WAIT_TIME is 60 second
)
.await;
}
#[cfg(unix)]
mod tls_test {
use std::os::unix::io::AsRawFd;
Expand Down Expand Up @@ -856,7 +969,7 @@ mod tests {

let _r = join(
test_client(addr, TlsConnectorHandler::new()),
test_server(addr, TlsAcceptorHandler::new()),
test_server(addr, TlsAcceptorHandler::new(), 4, 0),
)
.await;
}
Expand Down