From fdde430c2366fb036076c9ba4525e2bd7bae3198 Mon Sep 17 00:00:00 2001 From: Mark Mandel Date: Sat, 24 Aug 2024 01:12:06 -0700 Subject: [PATCH] Test: Concurrent clients for TokenRouter (#1010) This is an integration test to ensure that concurrent clients to the same proxy and endpoint didn't mix packets. Could not replicate the reported issue below, but it felt like a good test to have for concurrency testing. Work on #988 --- tests/token_router.rs | 129 ++++++++++++++++++++++++++++++++---------- 1 file changed, 99 insertions(+), 30 deletions(-) diff --git a/tests/token_router.rs b/tests/token_router.rs index 698267675f..b97d7d953a 100644 --- a/tests/token_router.rs +++ b/tests/token_router.rs @@ -16,20 +16,109 @@ use std::net::{Ipv6Addr, SocketAddr}; -use tokio::time::{timeout, Duration}; - use quilkin::{ config::Filter, filters::{Capture, StaticFilter, TokenRouter}, net::endpoint::{metadata::MetadataView, Endpoint}, test::{AddressType, TestHelper}, }; +use tokio::time::{timeout, Duration}; /// This test covers both token_router and capture filters, /// since they work in concert together. #[tokio::test] async fn token_router() { let mut t = TestHelper::default(); + + let local_addr = echo_server(&mut t).await; + + // valid packet + let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; + + let msg = b"helloabc"; + tracing::trace!(%local_addr, "sending echo packet"); + socket.send_to(msg, &local_addr).await.unwrap(); + + tracing::trace!("awaiting echo packet"); + assert_eq!( + "hello", + timeout(Duration::from_millis(500), recv_chan.recv()) + .await + .expect("should have received a packet") + .unwrap() + ); + + // send an invalid packet + let msg = b"helloxyz"; + socket.send_to(msg, &local_addr).await.unwrap(); + + let result = timeout(Duration::from_millis(500), recv_chan.recv()).await; + assert!(result.is_err(), "should not have received a packet"); +} + +// This test covers the scenario in https://github.com/googleforgames/quilkin/issues/988 +// to make sure there are no issues with overlapping streams between clients. +#[tokio::test] +async fn multiple_clients() { + let limit = 10_000; + let mut t = TestHelper::default(); + let local_addr = echo_server(&mut t).await; + + let (mut a_rx, a_socket) = t.open_socket_and_recv_multiple_packets().await; + let (mut b_rx, b_socket) = t.open_socket_and_recv_multiple_packets().await; + + tokio::spawn(async move { + // some room to breath + tokio::time::sleep(Duration::from_millis(50)).await; + for _ in 0..limit { + a_socket.send_to(b"Aabc", &local_addr).await.unwrap(); + tokio::time::sleep(Duration::from_nanos(5)).await; + } + }); + tokio::spawn(async move { + // some room to breath + tokio::time::sleep(Duration::from_millis(50)).await; + for _ in 0..limit { + b_socket.send_to(b"Babc", &local_addr).await.unwrap(); + tokio::time::sleep(Duration::from_nanos(5)).await; + } + }); + + let mut success = 0; + let mut failed = 0; + for _ in 0..limit { + match timeout(Duration::from_millis(60), a_rx.recv()).await { + Ok(packet) => { + assert_eq!("A", packet.unwrap()); + success += 1; + } + Err(_) => { + failed += 1; + } + } + match timeout(Duration::from_millis(60), b_rx.recv()).await { + Ok(packet) => { + assert_eq!("B", packet.unwrap()); + success += 1; + } + Err(_) => { + failed += 1; + } + } + } + + // allow for some dropped packets, since UDP. + let threshold = 0.95 * (2 * limit) as f64; + assert!( + success as f64 > threshold, + "Success: {}, Failed: {}", + success, + failed + ); +} + +// start an echo server and return what port it's on. +async fn echo_server(t: &mut TestHelper) -> SocketAddr { let mut echo = t.run_echo_server(AddressType::Ipv6).await; quilkin::test::map_to_localhost(&mut echo).await; @@ -47,10 +136,13 @@ quilkin.dev: let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); server_config.clusters.modify(|clusters| { clusters.insert_default( - [Endpoint::with_metadata( - echo.clone(), - serde_yaml::from_str::>(endpoint_metadata).unwrap(), - )] + [ + Endpoint::with_metadata( + echo.clone(), + serde_yaml::from_str::>(endpoint_metadata).unwrap(), + ), + "127.0.0.2:5000".parse().unwrap(), // goes nowhere, so shouldn't do anything. + ] .into(), ) }); @@ -73,28 +165,5 @@ quilkin.dev: ); let server_port = t.run_server(server_config, None, None).await; - - // valid packet - let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; - - let local_addr = SocketAddr::from((Ipv6Addr::LOCALHOST, server_port)); - let msg = b"helloabc"; - tracing::trace!(%local_addr, "sending echo packet"); - socket.send_to(msg, &local_addr).await.unwrap(); - - tracing::trace!("awaiting echo packet"); - assert_eq!( - "hello", - timeout(Duration::from_millis(500), recv_chan.recv()) - .await - .expect("should have received a packet") - .unwrap() - ); - - // send an invalid packet - let msg = b"helloxyz"; - socket.send_to(msg, &local_addr).await.unwrap(); - - let result = timeout(Duration::from_millis(500), recv_chan.recv()).await; - assert!(result.is_err(), "should not have received a packet"); + SocketAddr::from((Ipv6Addr::LOCALHOST, server_port)) }