Skip to content

Commit 7323b9d

Browse files
committed
Test: Concurrent clients for TokenRouter
This is an integration test to ensure that concurrent clients to the same proxy and endpoint didn't mix packets. Could not replicate the reported issue below, but it felt like a good test to have for concurrency testing. Work on googleforgames#988
1 parent f07777c commit 7323b9d

File tree

1 file changed

+99
-30
lines changed

1 file changed

+99
-30
lines changed

tests/token_router.rs

Lines changed: 99 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,109 @@
1616

1717
use std::net::{Ipv6Addr, SocketAddr};
1818

19-
use tokio::time::{timeout, Duration};
20-
2119
use quilkin::{
2220
config::Filter,
2321
filters::{Capture, StaticFilter, TokenRouter},
2422
net::endpoint::{metadata::MetadataView, Endpoint},
2523
test::{AddressType, TestHelper},
2624
};
25+
use tokio::time::{timeout, Duration};
2726

2827
/// This test covers both token_router and capture filters,
2928
/// since they work in concert together.
3029
#[tokio::test]
3130
async fn token_router() {
3231
let mut t = TestHelper::default();
32+
33+
let local_addr = echo_server(&mut t).await;
34+
35+
// valid packet
36+
let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;
37+
38+
let msg = b"helloabc";
39+
tracing::trace!(%local_addr, "sending echo packet");
40+
socket.send_to(msg, &local_addr).await.unwrap();
41+
42+
tracing::trace!("awaiting echo packet");
43+
assert_eq!(
44+
"hello",
45+
timeout(Duration::from_millis(500), recv_chan.recv())
46+
.await
47+
.expect("should have received a packet")
48+
.unwrap()
49+
);
50+
51+
// send an invalid packet
52+
let msg = b"helloxyz";
53+
socket.send_to(msg, &local_addr).await.unwrap();
54+
55+
let result = timeout(Duration::from_millis(500), recv_chan.recv()).await;
56+
assert!(result.is_err(), "should not have received a packet");
57+
}
58+
59+
// This test covers the scenario in https://github.com/googleforgames/quilkin/issues/988
60+
// to make sure there are no issues with overlapping streams between clients.
61+
#[tokio::test]
62+
async fn multiple_clients() {
63+
let limit = 10_000;
64+
let mut t = TestHelper::default();
65+
let local_addr = echo_server(&mut t).await;
66+
67+
let (mut a_rx, a_socket) = t.open_socket_and_recv_multiple_packets().await;
68+
let (mut b_rx, b_socket) = t.open_socket_and_recv_multiple_packets().await;
69+
70+
tokio::spawn(async move {
71+
// some room to breath
72+
tokio::time::sleep(Duration::from_millis(50)).await;
73+
for _ in 0..limit {
74+
a_socket.send_to(b"Aabc", &local_addr).await.unwrap();
75+
tokio::time::sleep(Duration::from_nanos(5)).await;
76+
}
77+
});
78+
tokio::spawn(async move {
79+
// some room to breath
80+
tokio::time::sleep(Duration::from_millis(50)).await;
81+
for _ in 0..limit {
82+
b_socket.send_to(b"Babc", &local_addr).await.unwrap();
83+
tokio::time::sleep(Duration::from_nanos(5)).await;
84+
}
85+
});
86+
87+
let mut success = 0;
88+
let mut failed = 0;
89+
for _ in 0..limit {
90+
match timeout(Duration::from_millis(60), a_rx.recv()).await {
91+
Ok(packet) => {
92+
assert_eq!("A", packet.unwrap());
93+
success += 1;
94+
}
95+
Err(_) => {
96+
failed += 1;
97+
}
98+
}
99+
match timeout(Duration::from_millis(60), b_rx.recv()).await {
100+
Ok(packet) => {
101+
assert_eq!("B", packet.unwrap());
102+
success += 1;
103+
}
104+
Err(_) => {
105+
failed += 1;
106+
}
107+
}
108+
}
109+
110+
// allow for some dropped packets, since UDP.
111+
let threshold = 0.95 * (2 * limit) as f64;
112+
assert!(
113+
success as f64 > threshold,
114+
"Success: {}, Failed: {}",
115+
success,
116+
failed
117+
);
118+
}
119+
120+
// start an echo server and return what port it's on.
121+
async fn echo_server(t: &mut TestHelper) -> SocketAddr {
33122
let mut echo = t.run_echo_server(AddressType::Ipv6).await;
34123
quilkin::test::map_to_localhost(&mut echo).await;
35124

@@ -47,10 +136,13 @@ quilkin.dev:
47136
let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
48137
server_config.clusters.modify(|clusters| {
49138
clusters.insert_default(
50-
[Endpoint::with_metadata(
51-
echo.clone(),
52-
serde_yaml::from_str::<MetadataView<_>>(endpoint_metadata).unwrap(),
53-
)]
139+
[
140+
Endpoint::with_metadata(
141+
echo.clone(),
142+
serde_yaml::from_str::<MetadataView<_>>(endpoint_metadata).unwrap(),
143+
),
144+
"127.0.0.2:5000".parse().unwrap(), // goes nowhere, so shouldn't do anything.
145+
]
54146
.into(),
55147
)
56148
});
@@ -73,28 +165,5 @@ quilkin.dev:
73165
);
74166

75167
let server_port = t.run_server(server_config, None, None).await;
76-
77-
// valid packet
78-
let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;
79-
80-
let local_addr = SocketAddr::from((Ipv6Addr::LOCALHOST, server_port));
81-
let msg = b"helloabc";
82-
tracing::trace!(%local_addr, "sending echo packet");
83-
socket.send_to(msg, &local_addr).await.unwrap();
84-
85-
tracing::trace!("awaiting echo packet");
86-
assert_eq!(
87-
"hello",
88-
timeout(Duration::from_millis(500), recv_chan.recv())
89-
.await
90-
.expect("should have received a packet")
91-
.unwrap()
92-
);
93-
94-
// send an invalid packet
95-
let msg = b"helloxyz";
96-
socket.send_to(msg, &local_addr).await.unwrap();
97-
98-
let result = timeout(Duration::from_millis(500), recv_chan.recv()).await;
99-
assert!(result.is_err(), "should not have received a packet");
168+
SocketAddr::from((Ipv6Addr::LOCALHOST, server_port))
100169
}

0 commit comments

Comments
 (0)