diff --git a/Cargo.toml b/Cargo.toml index 8b78803..4bee23c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "web-transport", + "web-transport-browser-tests", "web-transport-proto", "web-transport-quiche", "web-transport-quinn", diff --git a/justfile b/justfile index 211b07d..3920d71 100644 --- a/justfile +++ b/justfile @@ -21,8 +21,8 @@ setup-tools: # Run the CI checks check: - cargo check --workspace --all-targets --all-features - cargo clippy --workspace --all-targets --all-features -- -D warnings + cargo check --workspace --all-targets --all-features --exclude web-transport-browser-tests + cargo clippy --workspace --all-targets --all-features --exclude web-transport-browser-tests -- -D warnings # Do the same but explicitly use the WASM target. cargo check --target wasm32-unknown-unknown -p web-transport --all-targets --all-features @@ -34,7 +34,7 @@ check: cargo fmt --all --check # requires: cargo install cargo-hack - cargo hack check --feature-powerset --workspace --keep-going + cargo hack check --feature-powerset --workspace --keep-going --exclude web-transport-browser-tests cargo hack check --feature-powerset --target wasm32-unknown-unknown -p web-transport --keep-going cargo hack check --feature-powerset --target wasm32-unknown-unknown -p web-transport-wasm --keep-going @@ -50,14 +50,18 @@ check: # Run any CI tests test: - cargo test --workspace --all-targets --all-features + cargo test --workspace --all-targets --all-features --exclude web-transport-browser-tests cargo test --target wasm32-unknown-unknown -p web-transport --all-targets --all-features cargo test --target wasm32-unknown-unknown -p web-transport-wasm --all-targets --all-features +# Run browser interoperability tests (requires Chromium or auto-downloads it) +browser-test: + cargo test -p web-transport-browser-tests + # Automatically fix some issues. fix: - cargo fix --allow-staged --allow-dirty --workspace --all-targets --all-features - cargo clippy --fix --allow-staged --allow-dirty --workspace --all-targets --all-features + cargo fix --allow-staged --allow-dirty --workspace --all-targets --all-features --exclude web-transport-browser-tests + cargo clippy --fix --allow-staged --allow-dirty --workspace --all-targets --all-features --exclude web-transport-browser-tests # Do the same but explicitly use the WASM target. cargo fix --allow-staged --allow-dirty --target wasm32-unknown-unknown -p web-transport --all-targets --all-features diff --git a/web-transport-browser-tests/Cargo.toml b/web-transport-browser-tests/Cargo.toml new file mode 100644 index 0000000..1e0543c --- /dev/null +++ b/web-transport-browser-tests/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "web-transport-browser-tests" +description = "Browser interoperability test infrastructure for web-transport" +version = "0.1.0" +edition = "2021" +publish = false + +[dependencies] +anyhow = "1" +chromiumoxide = { version = "0.9", default-features = false, features = [ + "bytes", + "fetcher", + "rustls", + "zip8", +] } +futures = "0.3" +rcgen = { version = "0.14", default-features = false, features = ["aws_lc_rs"] } +rustls = { version = "0.23", default-features = false, features = [ + "aws-lc-rs", + "std", +] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +time = "0.3" +tokio = { version = "1", features = ["full"] } +tracing = "0.1" + +[dependencies.web-transport-quinn] +path = "../web-transport-quinn" + +[dev-dependencies] +bytes = "1" +http = "1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/web-transport-browser-tests/src/browser.rs b/web-transport-browser-tests/src/browser.rs new file mode 100644 index 0000000..c45a558 --- /dev/null +++ b/web-transport-browser-tests/src/browser.rs @@ -0,0 +1,292 @@ +use std::net::SocketAddr; +use std::path::PathBuf; +use std::sync::OnceLock; +use std::time::Duration; + +use anyhow::{Context, Result}; +use chromiumoxide::browser::{Browser, BrowserConfig}; +use chromiumoxide::cdp::browser_protocol::browser::BrowserContextId; +use chromiumoxide::cdp::browser_protocol::target::{ + CreateBrowserContextParams, CreateTargetParams, +}; +use chromiumoxide::fetcher::{BrowserFetcher, BrowserFetcherOptions}; +use chromiumoxide::Page; +use futures::StreamExt; +use serde::Deserialize; + +use crate::js; + +struct SharedBrowser { + browser: tokio::sync::Mutex, + /// URL of the blank page server (http://localhost:{port}). + page_url: String, + // A dedicated Tokio runtime that owns the browser event handler and HTTP + // server tasks. This runtime lives as long as the `SharedBrowser` (i.e. + // for the entire process) so that the tasks are not cancelled when + // individual `#[tokio::test]` runtimes shut down. + _runtime: tokio::runtime::Runtime, + // A page that stays open for the lifetime of the browser to prevent + // Chrome from exiting when all test contexts are disposed. + _keepalive_page: Page, + // Holds the stdin pipe to the cleanup watchdog process. When our process + // exits (for any reason, including SIGKILL), the OS closes this FD, + // unblocking the watchdog which then kills Chrome and removes the temp dir. + _watchdog_pipe: std::sync::Mutex>, +} + +static BROWSER: OnceLock = OnceLock::new(); + +/// Try to build a BrowserConfig, auto-downloading Chromium via the fetcher if +/// no local executable is detected. +async fn build_browser_config() -> BrowserConfig { + // Use a per-process data dir to avoid stale SingletonLock conflicts. + let data_dir = std::env::temp_dir().join(format!("chromiumoxide-{}", std::process::id())); + + // Note: .arg() auto-prefixes with "--", so pass bare names. + let builder = BrowserConfig::builder() + .new_headless_mode() + .no_sandbox() + .user_data_dir(&data_dir) + .arg("allow-insecure-localhost"); + + match builder.clone().build() { + Ok(config) => return config, + Err(_) => { + tracing::info!("no local Chrome found, downloading via fetcher..."); + } + } + + // Auto-detection failed — download Chromium. + let exe_path = fetch_chromium().await; + + builder + .chrome_executable(exe_path) + .build() + .expect("failed to build browser config with fetched executable") +} + +/// Download Chromium using chromiumoxide_fetcher and return the executable path. +async fn fetch_chromium() -> PathBuf { + // Use the same cache directory as the fetcher's default + // ($XDG_CACHE_HOME or $HOME/.cache)/chromiumoxide. We must create it + // ourselves because the fetcher doesn't create parent directories. + let base = std::env::var("XDG_CACHE_HOME") + .map(PathBuf::from) + .unwrap_or_else(|_| { + PathBuf::from(std::env::var("HOME").expect("HOME not set")).join(".cache") + }); + let cache_dir = base.join("chromiumoxide"); + std::fs::create_dir_all(&cache_dir).expect("failed to create browser cache directory"); + + let options = BrowserFetcherOptions::builder() + .with_path(&cache_dir) + .build() + .expect("failed to create fetcher options"); + + let fetcher = BrowserFetcher::new(options); + let installation = fetcher.fetch().await.expect("failed to download Chromium"); + + tracing::info!(path = %installation.executable_path.display(), "downloaded Chromium"); + installation.executable_path +} + +/// Spawn a background shell that blocks on stdin. When our process exits (for +/// any reason), the OS closes the pipe, `read` gets EOF, and the shell kills +/// Chrome and removes its data directory. +fn spawn_cleanup_watchdog( + chrome_pid: u32, + data_dir: &std::path::Path, +) -> Option { + use std::process::{Command, Stdio}; + + // Pass chrome_pid and data_dir as positional arguments ($0, $1) to avoid + // shell injection from interpolating paths into the command string. + let mut child = Command::new("sh") + .arg("-c") + .arg(r#"read _; kill -9 "$0" 2>/dev/null; rm -rf "$1""#) + .arg(chrome_pid.to_string()) + .arg(data_dir.as_os_str()) + .stdin(Stdio::piped()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn() + .ok()?; + child.stdin.take() +} + +fn init_browser() -> SharedBrowser { + // Create a dedicated runtime on a separate thread. We cannot call + // `block_on` from within an existing Tokio runtime (which `#[tokio::test]` + // creates), so we spawn a plain OS thread to build and initialise + // everything, then send the results back. + std::thread::spawn(|| { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build() + .expect("failed to build dedicated browser runtime"); + + let (browser, page_url, keepalive_page, chrome_pid) = rt.block_on(async { + let config = build_browser_config().await; + + // Start the blank page server on this runtime. + let listener = tokio::net::TcpListener::bind("127.0.0.1:0") + .await + .expect("failed to bind blank page server"); + let addr: SocketAddr = listener.local_addr().unwrap(); + let page_url = format!("http://localhost:{}", addr.port()); + + tokio::spawn(async move { + loop { + let Ok((mut stream, _)) = listener.accept().await else { + break; + }; + tokio::spawn(async move { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + let mut buf = [0u8; 1024]; + let _ = stream.read(&mut buf).await; + let response = "HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nContent-Length: 13\r\n\r\n"; + let _ = stream.write_all(response.as_bytes()).await; + }); + } + }); + + let (mut browser, mut handler) = Browser::launch(config) + .await + .expect("failed to launch browser"); + + // Extract Chrome's PID before we lose mutability, so the watchdog + // can kill it on abnormal exit. + let chrome_pid = browser.get_mut_child().and_then(|c| c.inner.id()); + + // Spawn the CDP event handler on this runtime. + tokio::spawn(async move { + while let Some(event) = handler.next().await { + let _ = event; + } + }); + + // Open a keepalive page in the default context so Chrome doesn't + // exit when all test browser contexts are disposed. + let keepalive_page = browser + .new_page(&page_url) + .await + .expect("failed to create keepalive page"); + + (browser, page_url, keepalive_page, chrome_pid) + }); + + // Spawn a watchdog that kills Chrome and cleans up the temp dir when + // our process exits. The data dir path matches build_browser_config(). + let data_dir = + std::env::temp_dir().join(format!("chromiumoxide-{}", std::process::id())); + let watchdog_pipe = chrome_pid.and_then(|pid| spawn_cleanup_watchdog(pid, &data_dir)); + + SharedBrowser { + browser: tokio::sync::Mutex::new(browser), + page_url, + _runtime: rt, + _keepalive_page: keepalive_page, + _watchdog_pipe: std::sync::Mutex::new(watchdog_pipe), + } + }) + .join() + .expect("browser init thread panicked") +} + +fn get_browser() -> &'static SharedBrowser { + BROWSER.get_or_init(init_browser) +} + +/// An isolated browser context for a single test. +/// +/// Each context has its own cookies, cache, and page — preventing +/// cross-test interference. +pub struct TestContext { + page: Page, + context_id: Option, +} + +impl TestContext { + /// Create a new isolated browser context. + pub async fn new() -> Result { + let shared = get_browser(); + + let context_id = shared + .browser + .lock() + .await + .create_browser_context(CreateBrowserContextParams::default()) + .await + .context("failed to create browser context")?; + + // Navigate to a localhost HTTP page so the JS context has a secure + // origin with the WebTransport API available. + let page = shared + .browser + .lock() + .await + .new_page( + CreateTargetParams::builder() + .url(&shared.page_url) + .browser_context_id(context_id.clone()) + .build() + .unwrap(), + ) + .await + .context("failed to create page")?; + + Ok(Self { + page, + context_id: Some(context_id), + }) + } + + /// Evaluate a JavaScript test snippet in the browser. + /// + /// The `js_code` is wrapped with `connectWebTransport()` and error handling + /// via [`js::wrap_test_js`]. + pub async fn run_js_test( + &self, + server_url: &str, + certificate_hash: &[u8], + js_code: &str, + timeout: Duration, + ) -> Result { + let wrapped = js::wrap_test_js(server_url, certificate_hash, js_code); + + let result: String = tokio::time::timeout(timeout, self.page.evaluate(wrapped)) + .await + .context("JS test timed out")? + .context("JS evaluation failed")? + .into_value() + .context("failed to extract JS result value")?; + + let parsed: JsTestResult = + serde_json::from_str(&result).context("failed to parse JS test result")?; + + Ok(parsed) + } + + /// Dispose of the browser context. + pub async fn dispose(mut self) { + if let Some(id) = self.context_id.take() { + let shared = get_browser(); + let _ = shared + .browser + .lock() + .await + .dispose_browser_context(id) + .await; + } + } +} + +/// The result of a JavaScript test evaluation. +#[derive(Debug, Deserialize)] +pub struct JsTestResult { + pub success: bool, + pub message: String, + #[serde(default)] + pub details: serde_json::Value, +} diff --git a/web-transport-browser-tests/src/cert.rs b/web-transport-browser-tests/src/cert.rs new file mode 100644 index 0000000..9209dac --- /dev/null +++ b/web-transport-browser-tests/src/cert.rs @@ -0,0 +1,55 @@ +use rcgen::{CertificateParams, KeyPair, SanType}; +use rustls::pki_types::{CertificateDer, PrivateKeyDer}; +use web_transport_quinn::crypto; + +/// A self-signed certificate for use in tests, along with its SHA-256 fingerprint. +pub struct TestCert { + pub chain: Vec>, + pub key: PrivateKeyDer<'static>, + /// 32 raw SHA-256 bytes. + pub fingerprint: Vec, + /// Hex-encoded fingerprint for logging. + pub fingerprint_hex: String, +} + +/// Generate a self-signed EC P-256 certificate valid for 10 days. +/// +/// The certificate has SANs for `localhost` and `127.0.0.1`. +pub fn generate() -> TestCert { + let key_pair = KeyPair::generate().expect("failed to generate key pair"); + + let mut params = CertificateParams::default(); + params.subject_alt_names = vec![ + SanType::DnsName("localhost".try_into().unwrap()), + SanType::IpAddress("127.0.0.1".parse().unwrap()), + ]; + // Chrome requires serverCertificateHashes certs to be valid for at most 14 days. + let now = time::OffsetDateTime::now_utc(); + params.not_before = now; + params.not_after = now + time::Duration::days(10); + + let cert = params + .self_signed(&key_pair) + .expect("failed to self-sign certificate"); + + let cert_der = CertificateDer::from(cert); + let key_der = PrivateKeyDer::from(key_pair); + + // Compute fingerprint using the same function as the server-side code. + let provider = crypto::default_provider(); + let hash = crypto::sha256(&provider, &cert_der); + let fingerprint = hash.as_ref().to_vec(); + let fingerprint_hex = fingerprint + .iter() + .map(|b| format!("{b:02x}")) + .collect::(); + + tracing::debug!(fingerprint = %fingerprint_hex, "generated test certificate"); + + TestCert { + chain: vec![cert_der], + key: key_der, + fingerprint, + fingerprint_hex, + } +} diff --git a/web-transport-browser-tests/src/harness.rs b/web-transport-browser-tests/src/harness.rs new file mode 100644 index 0000000..794e0d2 --- /dev/null +++ b/web-transport-browser-tests/src/harness.rs @@ -0,0 +1,78 @@ +use std::time::Duration; + +use anyhow::Result; + +use crate::browser::{JsTestResult, TestContext}; +use crate::cert::{self, TestCert}; +use crate::server::{self, RequestHandler, ServerHandler, TestServer}; + +/// Orchestrates certificate generation, server startup, and browser context +/// creation for a single test. +pub struct TestHarness { + pub server: TestServer, + pub context: TestContext, + pub cert: TestCert, +} + +/// Set up a complete test environment: generate a cert, start a server with the +/// given handler, launch (or reuse) the browser, and create an isolated context. +pub async fn setup(handler: ServerHandler) -> Result { + let cert = cert::generate(); + let server = server::start(&cert, handler).await?; + let context = TestContext::new().await?; + + Ok(TestHarness { + server, + context, + cert, + }) +} + +impl TestHarness { + /// Evaluate JavaScript test code in the browser and return the result. + pub async fn run_js(&self, js_code: &str, timeout: Duration) -> Result { + self.context + .run_js_test(&self.server.url, &self.cert.fingerprint, js_code, timeout) + .await + } + + /// Evaluate JavaScript test code and assert that it succeeded. + pub async fn run_js_ok(&self, js_code: &str, timeout: Duration) { + let result = self + .run_js(js_code, timeout) + .await + .unwrap_or_else(|e| panic!("JS test failed with error: {e:#}")); + assert!(result.success, "JS test failed: {}", result.message); + } + + /// Dispose of the browser context and shut down the server, asserting that + /// exactly one handler invocation completed. + pub async fn teardown(self) { + self.context.dispose().await; + self.server.shutdown(1).await; + } + + /// Dispose of the browser context and shut down the server, asserting that + /// exactly `expected_handlers` handler invocations completed. + pub async fn teardown_expecting(self, expected_handlers: usize) { + self.context.dispose().await; + self.server.shutdown(expected_handlers).await; + } +} + +/// Set up a complete test environment using a [RequestHandler] that receives the +/// raw request before acceptance. +pub async fn setup_with_request_handler(handler: RequestHandler) -> Result { + let cert = cert::generate(); + let server = server::start_with_request_handler(&cert, handler).await?; + let context = TestContext::new().await?; + + Ok(TestHarness { + server, + context, + cert, + }) +} + +// Re-export handler constructors for convenience. +pub use crate::server::{echo_handler, idle_handler, immediate_close_handler}; diff --git a/web-transport-browser-tests/src/js.rs b/web-transport-browser-tests/src/js.rs new file mode 100644 index 0000000..100f4e8 --- /dev/null +++ b/web-transport-browser-tests/src/js.rs @@ -0,0 +1,64 @@ +/// Wrap user-provided JavaScript test code with a `connectWebTransport()` helper +/// and error handling. +/// +/// The returned string is a self-contained async IIFE that can be evaluated +/// directly via `page.evaluate()`. +pub fn wrap_test_js(server_url: &str, certificate_hash: &[u8], user_code: &str) -> String { + let hash_bytes = certificate_hash + .iter() + .map(|b| b.to_string()) + .collect::>() + .join(", "); + + format!( + r#"(async () => {{ + const SERVER_URL = "{server_url}"; + const CERT_HASH = new Uint8Array([{hash_bytes}]); + + async function connectWebTransport(url, options) {{ + const wt = new WebTransport(url || SERVER_URL, {{ + serverCertificateHashes: [{{ + algorithm: "sha-256", + value: CERT_HASH, + }}], + ...(options || {{}}), + }}); + await wt.ready; + return wt; + }} + + try {{ + const result = await (async () => {{ {user_code} }})(); + if (result === undefined || result === null) {{ + return JSON.stringify({{ success: false, message: "test code did not return a result" }}); + }} + return JSON.stringify(result); + }} catch (e) {{ + const msg = "JS exception: " + (e.stack ? e.stack : e.toString()); + return JSON.stringify({{ + success: false, + message: msg, + details: {{ name: e.name, stack: e.stack }} + }}); + }} +}})()"# + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn wrap_produces_valid_js() { + let js = wrap_test_js( + "https://localhost:4443", + &[0xab, 0xcd, 0x01], + "return { success: true, message: 'hello' };", + ); + assert!(js.contains("SERVER_URL")); + assert!(js.contains("171, 205, 1")); + assert!(js.contains("connectWebTransport")); + assert!(js.contains("return { success: true, message: 'hello' };")); + } +} diff --git a/web-transport-browser-tests/src/lib.rs b/web-transport-browser-tests/src/lib.rs new file mode 100644 index 0000000..d8532e2 --- /dev/null +++ b/web-transport-browser-tests/src/lib.rs @@ -0,0 +1,5 @@ +pub mod browser; +pub mod cert; +pub mod harness; +pub mod js; +pub mod server; diff --git a/web-transport-browser-tests/src/server.rs b/web-transport-browser-tests/src/server.rs new file mode 100644 index 0000000..cbce0e8 --- /dev/null +++ b/web-transport-browser-tests/src/server.rs @@ -0,0 +1,268 @@ +use std::future::Future; +use std::net::SocketAddr; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use anyhow::Result; +use tokio::sync::oneshot; +use tokio::task::JoinHandle; + +use crate::cert::TestCert; + +type TaskHandles = Arc>>>; + +/// A boxed async handler invoked for each accepted WebTransport session. +pub type ServerHandler = Box< + dyn FnMut(web_transport_quinn::Session) -> Pin + Send>> + + Send + + 'static, +>; + +/// A running WebTransport test server. +pub struct TestServer { + pub addr: SocketAddr, + pub url: String, + shutdown_tx: Option>, + task: Option>, + handler_tasks: TaskHandles, +} + +impl TestServer { + /// Shut down the server, verify exactly `expected_handlers` ran, and re-panic + /// if any handler panicked. + pub async fn shutdown(mut self, expected_handlers: usize) { + if let Some(tx) = self.shutdown_tx.take() { + let _ = tx.send(()); + } + if let Some(task) = self.task.take() { + task.await.expect("accept loop task panicked"); + } + let handles: Vec<_> = self.handler_tasks.lock().unwrap().drain(..).collect(); + assert_eq!( + handles.len(), + expected_handlers, + "expected {expected_handlers} handler invocation(s), got {}", + handles.len() + ); + for (i, handle) in handles.into_iter().enumerate() { + let result = tokio::time::timeout(Duration::from_secs(5), handle) + .await + .unwrap_or_else(|_| panic!("handler {i} did not complete within 5s")); + if let Err(e) = result { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } + } + } + } +} + +impl Drop for TestServer { + fn drop(&mut self) { + if let Some(tx) = self.shutdown_tx.take() { + let _ = tx.send(()); + } + if let Some(task) = self.task.take() { + task.abort(); + } + } +} + +/// Start a WebTransport server on a random port using the given certificate and handler. +pub async fn start(cert: &TestCert, handler: ServerHandler) -> Result { + let addr: SocketAddr = "[::1]:0".parse().unwrap(); + + let server = web_transport_quinn::ServerBuilder::new() + .with_addr(addr) + .with_certificate(cert.chain.clone(), cert.key.clone_key())?; + + let actual_addr = server.local_addr()?; + let url = format!("https://localhost:{}", actual_addr.port()); + + tracing::debug!(%url, "test server listening"); + + let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); + let handler_tasks: TaskHandles = Arc::new(Mutex::new(Vec::new())); + let handler_tasks2 = handler_tasks.clone(); + + let task = tokio::spawn(async move { + let mut server = server; + let mut handler = handler; + loop { + tokio::select! { + _ = &mut shutdown_rx => break, + request = server.accept() => { + let Some(request) = request else { break }; + let session = request.ok().await + .expect("failed to accept session"); + let fut = handler(session); + let handle = tokio::spawn(fut); + handler_tasks2.lock().unwrap().push(handle); + } + } + } + }); + + Ok(TestServer { + addr: actual_addr, + url, + shutdown_tx: Some(shutdown_tx), + task: Some(task), + handler_tasks, + }) +} + +/// A handler that echoes bidirectional streams back to the client. +pub fn echo_handler() -> ServerHandler { + Box::new(|session| { + Box::pin(async move { + echo_session(session).await; + }) + }) +} + +async fn echo_session(session: web_transport_quinn::Session) { + use tokio::io::AsyncWriteExt; + + let mut tasks = tokio::task::JoinSet::new(); + + loop { + tokio::select! { + stream = session.accept_bi() => { + match stream { + Ok((mut send, mut recv)) => { + tasks.spawn(async move { + let buf = recv.read_to_end(1024 * 1024).await + .expect("echo: read_to_end failed"); + send.write_all(&buf).await + .expect("echo: write_all failed"); + send.shutdown().await + .expect("echo: shutdown failed"); + }); + } + Err(e) if is_session_closed(&e) => break, + Err(e) => panic!("echo: accept_bi failed unexpectedly: {e}"), + } + } + datagram = session.read_datagram() => { + match datagram { + Ok(data) => session.send_datagram(data) + .expect("echo: send_datagram failed"), + Err(e) if is_session_closed(&e) => break, + Err(e) => panic!("echo: read_datagram failed unexpectedly: {e}"), + } + } + Some(result) = tasks.join_next() => { + if let Err(e) = result { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } + } + } + } + } + + // Drain remaining stream tasks and propagate panics. + while let Some(result) = tasks.join_next().await { + if let Err(e) = result { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } + } + } +} + +/// Returns true if the error indicates the session was closed (locally or by the peer). +pub fn is_session_closed(e: &web_transport_quinn::SessionError) -> bool { + use web_transport_quinn::SessionError::*; + matches!( + e, + ConnectionError(web_transport_quinn::quinn::ConnectionError::ApplicationClosed(_)) + | ConnectionError(web_transport_quinn::quinn::ConnectionError::LocallyClosed) + | WebTransportError(web_transport_quinn::WebTransportError::Closed(_, _)) + ) +} + +/// A handler that accepts the session and immediately closes it. +pub fn immediate_close_handler(code: u32, reason: &'static str) -> ServerHandler { + Box::new(move |session| { + Box::pin(async move { + // Give the browser a bit of time to finish establishing the session. + // Without this, browser sometimes throws WebTransportError instead of + // cleanly closing the session. + tokio::time::sleep(Duration::from_millis(1000)).await; + session.close(code, reason.as_bytes()); + // Wait for the connection to actually close. + // This ensures the CloseWebTransportSession capsule is delivered. + session.closed().await; + }) + }) +} + +/// A handler that accepts the session and holds it open until the client disconnects. +pub fn idle_handler() -> ServerHandler { + Box::new(|session| { + Box::pin(async move { + let err = session.closed().await; + assert!( + is_session_closed(&err), + "idle: unexpected session error: {err}" + ); + }) + }) +} + +/// A boxed async handler invoked with the raw [web_transport_quinn::Request] before +/// the session is accepted. This allows tests to reject or customize the response. +pub type RequestHandler = Box< + dyn FnMut(web_transport_quinn::Request) -> Pin + Send>> + + Send + + 'static, +>; + +/// Start a WebTransport server that passes the raw [web_transport_quinn::Request] +/// to the handler instead of auto-accepting it. +pub async fn start_with_request_handler( + cert: &TestCert, + handler: RequestHandler, +) -> Result { + let addr: SocketAddr = "[::1]:0".parse().unwrap(); + + let server = web_transport_quinn::ServerBuilder::new() + .with_addr(addr) + .with_certificate(cert.chain.clone(), cert.key.clone_key())?; + + let actual_addr = server.local_addr()?; + let url = format!("https://localhost:{}", actual_addr.port()); + + tracing::debug!(%url, "test server listening (request handler)"); + + let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); + let handler_tasks: TaskHandles = Arc::new(Mutex::new(Vec::new())); + let handler_tasks2 = handler_tasks.clone(); + + let task = tokio::spawn(async move { + let mut server = server; + let mut handler = handler; + loop { + tokio::select! { + _ = &mut shutdown_rx => break, + request = server.accept() => { + let Some(request) = request else { break }; + let fut = handler(request); + let handle = tokio::spawn(fut); + handler_tasks2.lock().unwrap().push(handle); + } + } + } + }); + + Ok(TestServer { + addr: actual_addr, + url, + shutdown_tx: Some(shutdown_tx), + task: Some(task), + handler_tasks, + }) +} diff --git a/web-transport-browser-tests/tests/bidi_stream.rs b/web-transport-browser-tests/tests/bidi_stream.rs new file mode 100644 index 0000000..95b3172 --- /dev/null +++ b/web-transport-browser-tests/tests/bidi_stream.rs @@ -0,0 +1,525 @@ +use web_transport_browser_tests::harness; +use web_transport_browser_tests::server::ServerHandler; +use web_transport_quinn::{SessionError, WebTransportError}; + +mod common; +use common::{init_tracing, TIMEOUT}; + +// --------------------------------------------------------------------------- +// Client-Initiated +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn bidi_stream_echo() { + init_tracing(); + let harness = harness::setup(harness::echo_handler()).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + await writer.write(new TextEncoder().encode("hello world")); + await writer.close(); + + let received = ""; + while (true) { + const { value, done } = await reader.read(); + if (done) break; + received += new TextDecoder().decode(value); + } + wt.close(); + return { + success: received === "hello world", + message: "echoed: " + received + }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn bidi_stream_binary_data() { + init_tracing(); + let harness = harness::setup(harness::echo_handler()).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + // Build 256-byte array with values 0x00-0xFF + const sent = new Uint8Array(256); + for (let i = 0; i < 256; i++) sent[i] = i; + await writer.write(sent); + await writer.close(); + + const chunks = []; + while (true) { + const { value, done } = await reader.read(); + if (done) break; + chunks.push(value); + } + const received = new Uint8Array(chunks.reduce((n, c) => n + c.length, 0)); + let off = 0; + for (const c of chunks) { received.set(c, off); off += c.length; } + + let ok = received.length === 256; + for (let i = 0; ok && i < 256; i++) ok = received[i] === i; + + wt.close(); + return { success: ok, message: "binary len=" + received.length }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn bidi_stream_empty_write() { + init_tracing(); + let harness = harness::setup(harness::echo_handler()).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + // Immediately close without writing + await writer.close(); + + const { value, done } = await reader.read(); + wt.close(); + return { + success: done === true && !value, + message: "done=" + done + " value=" + value + }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn bidi_stream_multiple_writes() { + init_tracing(); + let harness = harness::setup(harness::echo_handler()).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + const enc = new TextEncoder(); + await writer.write(enc.encode("aaa")); + await writer.write(enc.encode("bbb")); + await writer.write(enc.encode("ccc")); + await writer.close(); + + let received = ""; + while (true) { + const { value, done } = await reader.read(); + if (done) break; + received += new TextDecoder().decode(value); + } + wt.close(); + return { + success: received === "aaabbbccc", + message: "received: " + received + }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +// --------------------------------------------------------------------------- +// Server-Initiated +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn bidi_stream_server_initiated() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let (mut send, _recv) = session.open_bi().await.expect("open_bi failed"); + send.write_all(b"from server") + .await + .expect("write_all failed"); + send.finish().expect("finish failed"); + let err = session.closed().await; + assert!( + matches!( + err, + SessionError::WebTransportError(WebTransportError::Closed(_, _)) + ), + "expected WebTransportError::Closed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const reader = wt.incomingBidirectionalStreams.getReader(); + const { value: stream, done } = await reader.read(); + if (done) return { success: false, message: "no incoming stream" }; + + const sr = stream.readable.getReader(); + let received = ""; + while (true) { + const { value, done } = await sr.read(); + if (done) break; + received += new TextDecoder().decode(value); + } + wt.close(); + return { + success: received === "from server", + message: "received: " + received + }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn bidi_stream_server_initiated_multiple() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + for i in 0u8..3 { + let (mut send, _recv) = session.open_bi().await.expect("open_bi failed"); + send.write_all(&[i]).await.expect("write_all failed"); + send.finish().expect("finish failed"); + } + let err = session.closed().await; + assert!( + matches!( + err, + SessionError::WebTransportError(WebTransportError::Closed(_, _)) + ), + "expected WebTransportError::Closed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const reader = wt.incomingBidirectionalStreams.getReader(); + const values = []; + for (let i = 0; i < 3; i++) { + const { value: stream, done } = await reader.read(); + if (done) break; + const sr = stream.readable.getReader(); + const { value } = await sr.read(); + values.push(value[0]); + } + wt.close(); + values.sort((a, b) => a - b); + const ok = values.length === 3 && values[0] === 0 && values[1] === 1 && values[2] === 2; + return { success: ok, message: "values: " + JSON.stringify(values) }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn bidi_stream_server_initiated_bidirectional_exchange() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let (mut send, mut recv) = session.open_bi().await.expect("open_bi failed"); + send.write_all(b"ping").await.expect("write_all failed"); + send.finish().expect("finish failed"); + let data = recv.read_to_end(1024).await.expect("read_to_end failed"); + assert_eq!( + String::from_utf8_lossy(&data), + "pong", + "server should receive pong" + ); + let err = session.closed().await; + assert!( + matches!( + err, + SessionError::WebTransportError(WebTransportError::Closed(_, _)) + ), + "expected WebTransportError::Closed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const reader = wt.incomingBidirectionalStreams.getReader(); + const { value: stream } = await reader.read(); + + const sr = stream.readable.getReader(); + let msg = ""; + while (true) { + const { value, done } = await sr.read(); + if (done) break; + msg += new TextDecoder().decode(value); + } + + const sw = stream.writable.getWriter(); + await sw.write(new TextEncoder().encode("pong")); + await sw.close(); + wt.close(); + return { success: msg === "ping", message: "received: " + msg }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +// --------------------------------------------------------------------------- +// Edge Cases +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn write_to_closed_transport() { + init_tracing(); + let harness = harness::setup(harness::idle_handler()).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + + wt.close(); + // Wait for close to take effect + await wt.closed; + + try { + await writer.write(new TextEncoder().encode("after close")); + return { success: false, message: "write should have failed" }; + } catch (e) { + if (!(e instanceof WebTransportError) || e.source !== "session") throw e; + return { success: true, message: "write failed after close: " + e }; + } + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn read_from_cancelled_stream() { + init_tracing(); + let harness = harness::setup(harness::idle_handler()).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + const reader = stream.readable.getReader(); + + await reader.cancel(); + + const { value, done } = await reader.read(); + wt.close(); + return { + success: done === true, + message: "done=" + done + " value=" + value + }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +// --------------------------------------------------------------------------- +// Half-Close & Priority +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn bidi_stream_half_close_write_then_read() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let (mut send, mut recv) = session.accept_bi().await.expect("accept_bi failed"); + let data = recv + .read_to_end(64 * 1024) + .await + .expect("read_to_end failed"); + // Double the data to make the half-close assertion unambiguous + let mut doubled = data.clone(); + doubled.extend_from_slice(&data); + send.write_all(&doubled).await.expect("write_all failed"); + send.finish().expect("finish failed"); + let err = session.closed().await; + assert!( + matches!( + err, + SessionError::WebTransportError(WebTransportError::Closed(_, _)) + ), + "expected WebTransportError::Closed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + await writer.write(new TextEncoder().encode("abc")); + await writer.close(); + + let received = ""; + while (true) { + const { value, done } = await reader.read(); + if (done) break; + received += new TextDecoder().decode(value); + } + wt.close(); + return { + success: received === "abcabc", + message: "received: " + received + }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn bidi_stream_server_priority() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + for i in 0..3i32 { + let (mut send, _recv) = session.open_bi().await.expect("open_bi failed"); + send.set_priority(i).expect("set_priority failed"); + let msg = format!("prio{i}"); + send.write_all(msg.as_bytes()) + .await + .expect("write_all failed"); + send.finish().expect("finish failed"); + } + let err = session.closed().await; + assert!( + matches!( + err, + SessionError::WebTransportError(WebTransportError::Closed(_, _)) + ), + "expected WebTransportError::Closed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const reader = wt.incomingBidirectionalStreams.getReader(); + const messages = []; + for (let i = 0; i < 3; i++) { + const { value: stream, done } = await reader.read(); + if (done) break; + const sr = stream.readable.getReader(); + let msg = ""; + while (true) { + const { value, done } = await sr.read(); + if (done) break; + msg += new TextDecoder().decode(value); + } + messages.push(msg); + } + const expected = ["prio0", "prio1", "prio2"]; + const ok = JSON.stringify(messages) === JSON.stringify(expected); + wt.close(); + return { + success: ok, + message: "messages: " + JSON.stringify(messages) + }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} diff --git a/web-transport-browser-tests/tests/common/mod.rs b/web-transport-browser-tests/tests/common/mod.rs new file mode 100644 index 0000000..8087237 --- /dev/null +++ b/web-transport-browser-tests/tests/common/mod.rs @@ -0,0 +1,19 @@ +#![allow(dead_code)] + +use std::time::Duration; + +/// Maximum time a JS test snippet may run in Chrome before the harness aborts it. +/// Suitable for most tests (simple echo, error handling, connection lifecycle). +pub const TIMEOUT: Duration = Duration::from_secs(10); + +/// Extended timeout for JS test snippets that transfer large payloads, open many +/// streams, or establish multiple sequential sessions. +pub const LONG_TIMEOUT: Duration = Duration::from_secs(30); + +/// Enable `tracing` output filtered by `RUST_LOG`. Safe to call from every test — +/// `try_init` silently no-ops after the first successful initialization. +pub fn init_tracing() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); +} diff --git a/web-transport-browser-tests/tests/concurrent.rs b/web-transport-browser-tests/tests/concurrent.rs new file mode 100644 index 0000000..ec892fb --- /dev/null +++ b/web-transport-browser-tests/tests/concurrent.rs @@ -0,0 +1,790 @@ +use std::time::Duration; + +use web_transport_browser_tests::harness; +use web_transport_browser_tests::server::ServerHandler; +use web_transport_quinn::{SessionError, WebTransportError}; + +mod common; +use common::{init_tracing, LONG_TIMEOUT, TIMEOUT}; + +// --------------------------------------------------------------------------- +// Multiple Concurrent Streams +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn multiple_bidi_streams_concurrent() { + init_tracing(); + let harness = harness::setup(harness::echo_handler()).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const N = 5; + + // Open N streams and write concurrently + const promises = []; + for (let i = 0; i < N; i++) { + promises.push((async () => { + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + const msg = "stream" + i; + await writer.write(new TextEncoder().encode(msg)); + await writer.close(); + + let received = ""; + while (true) { + const { value, done } = await reader.read(); + if (done) break; + received += new TextDecoder().decode(value); + } + return received; + })()); + } + + const results = await Promise.all(promises); + const expected = Array.from({ length: N }, (_, i) => "stream" + i); + results.sort(); + expected.sort(); + const ok = JSON.stringify(results) === JSON.stringify(expected); + wt.close(); + return { + success: ok, + message: "results: " + JSON.stringify(results) + }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +// --------------------------------------------------------------------------- +// Race conditions, rapid creation, mixed server streams +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn rapid_stream_creation() { + init_tracing(); + let harness = harness::setup(harness::echo_handler()).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const N = 50; + const promises = []; + for (let i = 0; i < N; i++) { + promises.push((async () => { + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + await writer.write(new Uint8Array([i % 256])); + await writer.close(); + const { value, done } = await reader.read(); + if (done) return false; + return value.length === 1 && value[0] === (i % 256); + })()); + } + const results = await Promise.all(promises); + const allOk = results.every(r => r === true); + wt.close(); + return { + success: allOk, + message: results.filter(r => !r).length + " of " + N + " failed" + }; + "#, + LONG_TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn server_close_while_client_creating_streams() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + // Accept the first stream + let (_send, mut recv) = session.accept_bi().await.expect("accept_bi failed"); + recv.read_to_end(32).await.expect("initial read failed"); + recv.received_reset().await.expect("expected close"); + tokio::time::sleep(Duration::from_millis(50)).await; + session.close(99, b"closing"); + session.closed().await; + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const N = 20; + const promises = []; + for (let i = 0; i < N; i++) { + promises.push((async () => { + try { + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + await writer.write(new Uint8Array([i])); + await writer.close(); + return true; + } catch (e) { + return false; + } + })()); + } + const results = await Promise.all(promises); + const succeeded = results.filter(r => r).length; + const failed = results.filter(r => !r).length; + try { await wt.closed; } catch (e) {} + return { + success: succeeded >= 1, + message: "succeeded=" + succeeded + " failed=" + failed + }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn server_opens_bidi_and_uni_simultaneously() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let (mut bi_send, _bi_recv) = session.open_bi().await.expect("open_bi failed"); + bi_send + .write_all(b"bidi-data") + .await + .expect("write_all failed"); + bi_send.finish().expect("finish failed"); + + let mut uni_send = session.open_uni().await.expect("open_uni failed"); + uni_send + .write_all(b"uni-data") + .await + .expect("write_all failed"); + uni_send.finish().expect("finish failed"); + + let err = session.closed().await; + assert!( + matches!( + err, + SessionError::WebTransportError(WebTransportError::Closed(_, _)) + ), + "expected WebTransportError::Closed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const [bidiResult, uniResult] = await Promise.all([ + (async () => { + const reader = wt.incomingBidirectionalStreams.getReader(); + const { value: stream, done } = await reader.read(); + if (done) return ""; + const sr = stream.readable.getReader(); + let msg = ""; + while (true) { + const { value, done } = await sr.read(); + if (done) break; + msg += new TextDecoder().decode(value); + } + return msg; + })(), + (async () => { + const reader = wt.incomingUnidirectionalStreams.getReader(); + const { value: stream, done } = await reader.read(); + if (done) return ""; + const sr = stream.getReader(); + let msg = ""; + while (true) { + const { value, done } = await sr.read(); + if (done) break; + msg += new TextDecoder().decode(value); + } + return msg; + })() + ]); + + wt.close(); + const ok = bidiResult === "bidi-data" && uniResult === "uni-data"; + return { + success: ok, + message: "bidi=" + bidiResult + " uni=" + uniResult + }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn multiple_streams_mixed_types() { + init_tracing(); + + // Handler that echoes bidi, verifies uni, and echoes datagrams + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + loop { + tokio::select! { + stream = session.accept_bi() => { + let Ok((mut send, mut recv)) = stream else { break }; + tokio::spawn(async move { + let buf = recv.read_to_end(1024 * 1024).await.expect("echo: read_to_end failed"); + send.write_all(&buf).await.expect("echo: write_all failed"); + send.finish().expect("echo: finish failed"); + }); + } + stream = session.accept_uni() => { + let Ok(mut recv) = stream else { break }; + let session = session.clone(); + tokio::spawn(async move { + let data = recv.read_to_end(1024 * 1024).await.expect("read_to_end failed"); + assert_eq!( + String::from_utf8_lossy(&data), + "uni", + "server should receive uni stream data" + ); + // Acknowledge receipt back to the client + let mut ack = session.open_uni().await.expect("open_uni for ack failed"); + ack.write_all(b"uni-ack").await.expect("write ack failed"); + ack.finish().expect("finish ack failed"); + }); + } + datagram = session.read_datagram() => { + let Ok(data) = datagram else { break }; + session.send_datagram(data).expect("echo: send_datagram failed"); + } + } + } + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + + // Run bidi, uni, and datagram concurrently + const [bidiResult, uniResult, dgramResult] = await Promise.all([ + // Bidi echo + (async () => { + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + await writer.write(new TextEncoder().encode("bidi")); + await writer.close(); + let r = ""; + while (true) { + const { value, done } = await reader.read(); + if (done) break; + r += new TextDecoder().decode(value); + } + return r === "bidi"; + })(), + // Uni write + verify server received it + (async () => { + const stream = await wt.createUnidirectionalStream(); + const writer = stream.getWriter(); + await writer.write(new TextEncoder().encode("uni")); + await writer.close(); + // Read the server's acknowledgment uni stream + const uniReader = wt.incomingUnidirectionalStreams.getReader(); + const { value: ackStream, done } = await uniReader.read(); + if (done) return false; + const sr = ackStream.getReader(); + let ack = ""; + while (true) { + const { value, done } = await sr.read(); + if (done) break; + ack += new TextDecoder().decode(value); + } + return ack === "uni-ack"; + })(), + // Datagram echo + (async () => { + const writer = wt.datagrams.writable.getWriter(); + const reader = wt.datagrams.readable.getReader(); + await writer.write(new TextEncoder().encode("dg")); + const { value } = await reader.read(); + return new TextDecoder().decode(value) === "dg"; + })() + ]); + + wt.close(); + return { + success: bidiResult && uniResult && dgramResult, + message: "bidi=" + bidiResult + " uni=" + uniResult + " dgram=" + dgramResult + }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn many_streams_stress() { + init_tracing(); + let harness = harness::setup(harness::echo_handler()).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const N = 20; + const promises = []; + + for (let i = 0; i < N; i++) { + promises.push((async () => { + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + const payload = new Uint8Array(64).fill(i); + await writer.write(payload); + await writer.close(); + + const chunks = []; + while (true) { + const { value, done } = await reader.read(); + if (done) break; + chunks.push(value); + } + const received = new Uint8Array(chunks.reduce((n, c) => n + c.length, 0)); + let off = 0; + for (const c of chunks) { received.set(c, off); off += c.length; } + + if (received.length !== 64) return false; + for (let j = 0; j < 64; j++) { + if (received[j] !== i) return false; + } + return true; + })()); + } + + const results = await Promise.all(promises); + const allOk = results.every(r => r === true); + wt.close(); + return { + success: allOk, + message: results.filter(r => !r).length + " of " + N + " failed" + }; + "#, + LONG_TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +// --------------------------------------------------------------------------- +// Large Data Transfer +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn large_bidi_stream_256kb() { + init_tracing(); + let harness = harness::setup(harness::echo_handler()).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + const SIZE = 256 * 1024; + const sent = new Uint8Array(SIZE); + for (let i = 0; i < SIZE; i++) sent[i] = i % 251; + await writer.write(sent); + await writer.close(); + + const chunks = []; + while (true) { + const { value, done } = await reader.read(); + if (done) break; + chunks.push(value); + } + const received = new Uint8Array(chunks.reduce((n, c) => n + c.length, 0)); + let off = 0; + for (const c of chunks) { received.set(c, off); off += c.length; } + + let ok = received.length === SIZE; + for (let i = 0; ok && i < SIZE; i += 1024) { + if (received[i] !== (i % 251)) ok = false; + } + wt.close(); + return { success: ok, message: "len=" + received.length }; + "#, + LONG_TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn large_bidi_stream_1mb() { + init_tracing(); + let harness = harness::setup(harness::echo_handler()).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + const SIZE = 1024 * 1024; + const sent = new Uint8Array(SIZE); + for (let i = 0; i < SIZE; i++) sent[i] = i % 251; + + // Write in chunks to avoid memory pressure + const CHUNK = 64 * 1024; + for (let off = 0; off < SIZE; off += CHUNK) { + await writer.write(sent.subarray(off, Math.min(off + CHUNK, SIZE))); + } + await writer.close(); + + const chunks = []; + while (true) { + const { value, done } = await reader.read(); + if (done) break; + chunks.push(value); + } + const received = new Uint8Array(chunks.reduce((n, c) => n + c.length, 0)); + let off = 0; + for (const c of chunks) { received.set(c, off); off += c.length; } + + let ok = received.length === SIZE; + // Spot-check pattern every 4KB + for (let i = 0; ok && i < SIZE; i += 4096) { + if (received[i] !== (i % 251)) ok = false; + } + wt.close(); + return { success: ok, message: "len=" + received.length }; + "#, + LONG_TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn large_uni_stream_client_to_server_1mb() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let mut recv = session.accept_uni().await.expect("accept_uni failed"); + let data = recv + .read_to_end(2 * 1024 * 1024) + .await + .expect("read_to_end failed"); + assert_eq!(data.len(), 1024 * 1024, "server should receive 1MB"); + // Spot-check pattern every 4KB: each byte = (index % 251) as u8 + for i in (0..data.len()).step_by(4096) { + assert_eq!(data[i], (i % 251) as u8, "data mismatch at byte {i}"); + } + let _ = session.closed().await; + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createUnidirectionalStream(); + const writer = stream.getWriter(); + + const SIZE = 1024 * 1024; + const data = new Uint8Array(SIZE); + for (let i = 0; i < SIZE; i++) data[i] = i % 251; + + const CHUNK = 64 * 1024; + for (let off = 0; off < SIZE; off += CHUNK) { + await writer.write(data.subarray(off, Math.min(off + CHUNK, SIZE))); + } + await writer.close(); + wt.close(); + return { success: true, message: "sent 1MB via uni stream" }; + "#, + LONG_TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +// --------------------------------------------------------------------------- +// Multiple Sessions +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn multiple_sessions_sequential() { + init_tracing(); + let harness = harness::setup(harness::echo_handler()).await.unwrap(); + + let result = harness + .run_js( + r#" + const N = 20; + const echoes = []; + for (let i = 0; i < N; i++) { + const wt = await connectWebTransport(); + const s = await wt.createBidirectionalStream(); + const w = s.writable.getWriter(); + const r = s.readable.getReader(); + const msg = "session" + i; + await w.write(new TextEncoder().encode(msg)); + await w.close(); + let echo = ""; + while (true) { + const { value, done } = await r.read(); + if (done) break; + echo += new TextDecoder().decode(value); + } + echoes.push(echo); + wt.close(); + try { await wt.closed; } catch (e) { + if (!(e instanceof WebTransportError) || e.source !== "session") throw e; + } + } + const expected = Array.from({ length: N }, (_, i) => "session" + i); + const ok = JSON.stringify(echoes) === JSON.stringify(expected); + return { success: ok, message: "echoes: " + JSON.stringify(echoes) }; + "#, + LONG_TIMEOUT, + ) + .await; + + harness.teardown_expecting(20).await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn multiple_sessions_concurrent() { + init_tracing(); + let harness = harness::setup(harness::echo_handler()).await.unwrap(); + + let result = harness + .run_js( + r#" + const N = 20; + const promises = []; + for (let i = 0; i < N; i++) { + promises.push((async () => { + const wt = await connectWebTransport(); + const s = await wt.createBidirectionalStream(); + const w = s.writable.getWriter(); + const r = s.readable.getReader(); + const msg = "sess-" + i; + await w.write(new TextEncoder().encode(msg)); + await w.close(); + let echo = ""; + while (true) { + const { value, done } = await r.read(); + if (done) break; + echo += new TextDecoder().decode(value); + } + wt.close(); + return { i, echo }; + })()); + } + const results = await Promise.all(promises); + const failed = results.filter(r => r.echo !== "sess-" + r.i); + return { + success: failed.length === 0, + message: failed.length + " of " + N + " failed: " + JSON.stringify(failed) + }; + "#, + LONG_TIMEOUT, + ) + .await; + + harness.teardown_expecting(20).await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +// --------------------------------------------------------------------------- +// Bidirectional Open +// --------------------------------------------------------------------------- + +/// Both client and server open bidi streams concurrently. The server opens +/// N streams sending "s0"…"s{N-1}", while the client opens N streams sending +/// "c0"…"c{N-1}". Each side reads the peer's data and echoes it back. +#[tokio::test] +async fn bidirectional_open() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let mut tasks = tokio::task::JoinSet::new(); + let n = 5usize; + + // Server opens N bidi streams and sends data + for i in 0..n { + let session = session.clone(); + tasks.spawn(async move { + let (mut send, mut recv) = session.open_bi().await.expect("open_bi failed"); + let msg = format!("s{i}"); + send.write_all(msg.as_bytes()) + .await + .expect("write_all failed"); + send.finish().expect("finish failed"); + let data = recv.read_to_end(1024).await.expect("read_to_end failed"); + assert_eq!( + String::from_utf8_lossy(&data), + msg, + "server stream {i}: expected echo" + ); + }); + } + + // Server accepts N client-opened bidi streams and echoes them + for _ in 0..n { + match session.accept_bi().await { + Ok((mut send, mut recv)) => { + tasks.spawn(async move { + let data = recv.read_to_end(1024).await.expect("read_to_end failed"); + send.write_all(&data).await.expect("write_all failed"); + send.finish().expect("finish failed"); + }); + } + Err(e) => panic!("accept_bi failed: {e}"), + } + } + + // Wait for all tasks, propagate panics + while let Some(result) = tasks.join_next().await { + if let Err(e) = result { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } + } + } + + let err = session.closed().await; + assert!( + matches!( + err, + SessionError::WebTransportError(WebTransportError::Closed(_, _)) + ), + "expected WebTransportError::Closed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const N = 5; + const errors = []; + + // Client opens N bidi streams and sends data + const clientOpenPromises = []; + for (let i = 0; i < N; i++) { + clientOpenPromises.push((async () => { + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + const msg = "c" + i; + await writer.write(new TextEncoder().encode(msg)); + await writer.close(); + let received = ""; + while (true) { + const { value, done } = await reader.read(); + if (done) break; + received += new TextDecoder().decode(value); + } + if (received !== msg) errors.push("client stream " + i + ": expected " + msg + " got " + received); + })()); + } + + // Client accepts N server-opened bidi streams and echoes them + const serverStreamReader = wt.incomingBidirectionalStreams.getReader(); + const clientAcceptPromises = []; + for (let i = 0; i < N; i++) { + clientAcceptPromises.push((async () => { + const { value: stream, done } = await serverStreamReader.read(); + if (done) { errors.push("incoming bidi stream ended early"); return; } + const reader = stream.readable.getReader(); + const writer = stream.writable.getWriter(); + let received = ""; + while (true) { + const { value, done } = await reader.read(); + if (done) break; + received += new TextDecoder().decode(value); + } + // Echo back what the server sent + await writer.write(new TextEncoder().encode(received)); + await writer.close(); + })()); + } + + await Promise.all([...clientOpenPromises, ...clientAcceptPromises]); + wt.close(); + return { + success: errors.length === 0, + message: errors.length === 0 ? "all streams OK" : errors.join("; ") + }; + "#, + LONG_TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} diff --git a/web-transport-browser-tests/tests/concurrent_accept.rs b/web-transport-browser-tests/tests/concurrent_accept.rs new file mode 100644 index 0000000..95d10ca --- /dev/null +++ b/web-transport-browser-tests/tests/concurrent_accept.rs @@ -0,0 +1,404 @@ +use std::time::Duration; + +use web_transport_browser_tests::harness; +use web_transport_browser_tests::server::ServerHandler; +use web_transport_quinn::{SessionError, WebTransportError}; + +mod common; +use common::{init_tracing, TIMEOUT}; + +/// Reproduces the lost-waker bug when multiple tasks call `accept_bi()` +/// concurrently on the same session. +/// +/// The browser opens N bidi streams. On the server side, N independent tasks +/// each call `session.accept_bi()` concurrently. With the unfold-based +/// implementation, only one waker is stored, so all but one task hang forever. +#[tokio::test] +async fn concurrent_accept_bi_from_multiple_tasks() { + init_tracing(); + + const N: usize = 3; + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let mut tasks = tokio::task::JoinSet::new(); + + // Spawn N independent tasks that each call accept_bi() concurrently. + // This is the pattern that triggers the lost-waker bug: the unfold + // stream only stores one waker, so N-1 tasks never get woken. + for i in 0..N { + let session = session.clone(); + tasks.spawn(async move { + let (mut send, mut recv) = session + .accept_bi() + .await + .unwrap_or_else(|e| panic!("task {i}: accept_bi failed: {e}")); + let data = recv + .read_to_end(1024) + .await + .unwrap_or_else(|e| panic!("task {i}: read_to_end failed: {e}")); + send.write_all(&data) + .await + .unwrap_or_else(|e| panic!("task {i}: write_all failed: {e}")); + send.finish() + .unwrap_or_else(|e| panic!("task {i}: finish failed: {e}")); + }); + } + + // All N tasks must complete. With the bug, this times out because + // N-1 tasks are stuck in accept_bi() with dead wakers. + let deadline = tokio::time::Instant::now() + Duration::from_secs(5); + let mut completed = 0; + while let Some(result) = tokio::time::timeout_at(deadline, tasks.join_next()) + .await + .ok() + .flatten() + { + if let Err(e) = result { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } + } + completed += 1; + } + assert_eq!( + completed, N, + "only {completed}/{N} accept_bi tasks completed (lost waker bug)" + ); + + // Keep the session alive until the client closes. + let err = session.closed().await; + assert!( + matches!( + err, + SessionError::WebTransportError(WebTransportError::Closed(_, _)) + ), + "expected WebTransportError::Closed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + &format!( + r#" + const wt = await connectWebTransport(); + const N = {N}; + const promises = []; + + for (let i = 0; i < N; i++) {{ + promises.push((async () => {{ + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + const msg = "bi-" + i; + await writer.write(new TextEncoder().encode(msg)); + await writer.close(); + + let received = ""; + while (true) {{ + const {{ value, done }} = await reader.read(); + if (done) break; + received += new TextDecoder().decode(value); + }} + return received; + }})()); + }} + + const results = await Promise.all(promises); + const expected = Array.from({{ length: N }}, (_, i) => "bi-" + i); + results.sort(); + expected.sort(); + const ok = JSON.stringify(results) === JSON.stringify(expected); + wt.close(); + return {{ + success: ok, + message: "results: " + JSON.stringify(results) + }}; + "# + ), + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +/// Tests that `accept_bi()` and `accept_uni()` work correctly when called +/// concurrently from separate tasks on the same session. They share a single +/// `Mutex` but operate on independent internal state. +#[tokio::test] +async fn concurrent_accept_bi_and_uni_from_multiple_tasks() { + init_tracing(); + + const N: usize = 3; + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let mut tasks = tokio::task::JoinSet::new(); + + // Spawn N tasks for accept_bi and N tasks for accept_uni concurrently. + for i in 0..N { + let session = session.clone(); + tasks.spawn(async move { + let (mut send, mut recv) = session + .accept_bi() + .await + .unwrap_or_else(|e| panic!("bi task {i}: accept_bi failed: {e}")); + let data = recv + .read_to_end(1024) + .await + .unwrap_or_else(|e| panic!("bi task {i}: read_to_end failed: {e}")); + send.write_all(&data) + .await + .unwrap_or_else(|e| panic!("bi task {i}: write_all failed: {e}")); + send.finish() + .unwrap_or_else(|e| panic!("bi task {i}: finish failed: {e}")); + format!("bi-done-{i}") + }); + } + for i in 0..N { + let session = session.clone(); + tasks.spawn(async move { + let mut recv = session + .accept_uni() + .await + .unwrap_or_else(|e| panic!("uni task {i}: accept_uni failed: {e}")); + let data = recv + .read_to_end(1024) + .await + .unwrap_or_else(|e| panic!("uni task {i}: read_to_end failed: {e}")); + String::from_utf8(data) + .unwrap_or_else(|e| panic!("uni task {i}: invalid utf8: {e}")) + }); + } + + let deadline = tokio::time::Instant::now() + Duration::from_secs(5); + let mut completed = 0; + while let Some(result) = tokio::time::timeout_at(deadline, tasks.join_next()) + .await + .ok() + .flatten() + { + if let Err(e) = result { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } + } + completed += 1; + } + assert_eq!( + completed, + N * 2, + "only {completed}/{} accept tasks completed", + N * 2 + ); + + // Signal the browser that all streams were received. + let mut signal = session + .open_uni() + .await + .expect("open_uni for signal failed"); + signal.write_all(b"ok").await.expect("signal write failed"); + signal.finish().expect("signal finish failed"); + + // Keep the session alive until the client closes. + let err = session.closed().await; + assert!( + matches!( + err, + SessionError::WebTransportError(WebTransportError::Closed(_, _)) + ), + "expected WebTransportError::Closed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + &format!( + r#" + const wt = await connectWebTransport(); + const N = {N}; + const promises = []; + + // Open N bidi streams (server echoes them back) + for (let i = 0; i < N; i++) {{ + promises.push((async () => {{ + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + const msg = "bi-" + i; + await writer.write(new TextEncoder().encode(msg)); + await writer.close(); + + let received = ""; + while (true) {{ + const {{ value, done }} = await reader.read(); + if (done) break; + received += new TextDecoder().decode(value); + }} + return received; + }})()); + }} + + // Open N uni streams + for (let i = 0; i < N; i++) {{ + promises.push((async () => {{ + const stream = await wt.createUnidirectionalStream(); + const writer = stream.getWriter(); + await writer.write(new TextEncoder().encode("uni-" + i)); + await writer.close(); + return "uni-sent-" + i; + }})()); + }} + + const results = await Promise.all(promises); + + // Wait for the server signal before closing. + const reader = wt.incomingUnidirectionalStreams.getReader(); + const {{ value: signal }} = await reader.read(); + const sr = signal.getReader(); + await sr.read(); + + wt.close(); + + const biResults = results.slice(0, N).sort(); + const expected = Array.from({{ length: N }}, (_, i) => "bi-" + i).sort(); + const ok = JSON.stringify(biResults) === JSON.stringify(expected); + return {{ + success: ok, + message: "bi results: " + JSON.stringify(biResults) + }}; + "# + ), + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +/// Same bug but for `accept_uni()`: multiple tasks calling it concurrently +/// causes lost wakers. +#[tokio::test] +async fn concurrent_accept_uni_from_multiple_tasks() { + init_tracing(); + + const N: usize = 3; + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let mut tasks = tokio::task::JoinSet::new(); + + for i in 0..N { + let session = session.clone(); + tasks.spawn(async move { + let mut recv = session + .accept_uni() + .await + .unwrap_or_else(|e| panic!("task {i}: accept_uni failed: {e}")); + let data = recv + .read_to_end(1024) + .await + .unwrap_or_else(|e| panic!("task {i}: read_to_end failed: {e}")); + String::from_utf8(data) + .unwrap_or_else(|e| panic!("task {i}: invalid utf8: {e}")) + }); + } + + let deadline = tokio::time::Instant::now() + Duration::from_secs(5); + let mut received = Vec::new(); + while let Some(result) = tokio::time::timeout_at(deadline, tasks.join_next()) + .await + .ok() + .flatten() + { + match result { + Ok(s) => received.push(s), + Err(e) => { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } + } + } + } + received.sort(); + assert_eq!( + received.len(), + N, + "only {}/{N} accept_uni tasks completed (lost waker bug); got: {received:?}", + received.len() + ); + + // Signal the browser that all streams were received. + let mut signal = session + .open_uni() + .await + .expect("open_uni for signal failed"); + signal.write_all(b"ok").await.expect("signal write failed"); + signal.finish().expect("signal finish failed"); + + // Keep the session alive until the client closes. + let err = session.closed().await; + assert!( + matches!( + err, + SessionError::WebTransportError(WebTransportError::Closed(_, _)) + ), + "expected WebTransportError::Closed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + &format!( + r#" + const wt = await connectWebTransport(); + const N = {N}; + const promises = []; + + for (let i = 0; i < N; i++) {{ + promises.push((async () => {{ + const stream = await wt.createUnidirectionalStream(); + const writer = stream.getWriter(); + await writer.write(new TextEncoder().encode("uni-" + i)); + await writer.close(); + return true; + }})()); + }} + + await Promise.all(promises); + + // Wait for the server to signal that all streams were received + // before closing, so we don't abort in-flight stream decodes. + const reader = wt.incomingUnidirectionalStreams.getReader(); + const {{ value: signal }} = await reader.read(); + const sr = signal.getReader(); + await sr.read(); + + wt.close(); + return {{ success: true, message: "sent " + N + " uni streams" }}; + "# + ), + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} diff --git a/web-transport-browser-tests/tests/connection.rs b/web-transport-browser-tests/tests/connection.rs new file mode 100644 index 0000000..d137fcf --- /dev/null +++ b/web-transport-browser-tests/tests/connection.rs @@ -0,0 +1,546 @@ +use std::time::Duration; + +use bytes::Bytes; +use web_transport_browser_tests::harness; +use web_transport_browser_tests::server::{RequestHandler, ServerHandler}; +use web_transport_quinn::generic::Stats; +use web_transport_quinn::{quinn, SessionError, WebTransportError}; + +mod common; +use common::{init_tracing, TIMEOUT}; + +// --------------------------------------------------------------------------- +// Session Stats +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn session_stats_after_data_transfer() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let (mut send, mut recv) = session.accept_bi().await.expect("accept_bi failed"); + let data = recv + .read_to_end(64 * 1024) + .await + .expect("read_to_end failed"); + send.write_all(&data).await.expect("write_all failed"); + send.finish().expect("finish failed"); + + let stats = session.stats(); + assert!( + stats.bytes_sent().unwrap_or(0) > 0, + "bytes_sent should be > 0" + ); + assert!( + stats.bytes_received().unwrap_or(0) > 0, + "bytes_received should be > 0" + ); + assert!( + stats.packets_sent().unwrap_or(0) > 0, + "packets_sent should be > 0" + ); + assert!( + stats.rtt().unwrap_or(Duration::ZERO) > Duration::ZERO, + "rtt should be > 0" + ); + + let err = session.closed().await; + assert!( + matches!( + err, + SessionError::WebTransportError(WebTransportError::Closed(_, _)) + ), + "expected WebTransportError::Closed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + const data = new Uint8Array(10 * 1024).fill(42); + await writer.write(data); + await writer.close(); + + let total = 0; + while (true) { + const { value, done } = await reader.read(); + if (done) break; + total += value.length; + } + wt.close(); + return { success: total === 10240, message: "echoed " + total + " bytes" }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +// --------------------------------------------------------------------------- +// Session Close +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn close_client_server_sees_closed() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let err = session.closed().await; + assert!( + matches!( + err, + SessionError::WebTransportError(WebTransportError::Closed(_, _)) + ), + "expected WebTransportError::Closed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + wt.close({ closeCode: 99, reason: "bye" }); + await wt.closed; + return { success: true, message: "client closed session" }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn close_client_with_code_and_reason() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let err = session.closed().await; + match err { + web_transport_quinn::SessionError::WebTransportError( + web_transport_quinn::WebTransportError::Closed(code, reason), + ) => { + assert_eq!(code, 99, "close code should round-trip"); + assert_eq!(reason, "bye", "close reason should round-trip"); + } + other => { + panic!("expected WebTransportError::Closed, got {other:?}"); + } + } + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + wt.close({ closeCode: 99, reason: "bye" }); + await wt.closed; + return { success: true, message: "closed with code 99" }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn close_server_client_sees_closed() { + init_tracing(); + let harness = harness::setup(harness::immediate_close_handler(7, "server goodbye")) + .await + .unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + await wt.closed; + return { success: true, message: "server closed session" }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn connection_closed_resolves_on_server_close() { + init_tracing(); + let harness = harness::setup(harness::immediate_close_handler(7, "server goodbye")) + .await + .unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const info = await wt.closed; + const ok = info.closeCode === 7 && info.reason === "server goodbye"; + return { + success: ok, + message: "closeCode=" + info.closeCode + " reason=" + JSON.stringify(info.reason) + }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn close_server_while_streaming() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + // Accept a bidi stream to confirm the client started streaming + let (_send, mut recv) = session.accept_bi().await.expect("accept_bi failed"); + recv.read(&mut [1]).await.expect("read failed"); + session.close(55, b"mid-stream"); + let err = session.closed().await; + assert!( + matches!( + err, + SessionError::ConnectionError(quinn::ConnectionError::LocallyClosed) + ), + "expected ConnectionError::LocallyClosed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + try { + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + + // Start writing but don't close — server will close the session + for (let i = 0; i < 10; i++) { + const chunk = new Uint8Array(1024).fill(i); + await writer.write(chunk); + await new Promise(r => setTimeout(r, 50)); + } + + // Wait for the session to close. + await wt.closed; + + // Verify we can't write anymore + try { + await writer.write(new TextEncoder().encode("after close")); + return { success: false, message: "write after close should fail" }; + } catch (e) { + return { success: true, message: "write failed after server close: " + e }; + } + } catch (e) { + // Server close may race with stream setup/write — that's still a valid + // demonstration that the server close disrupted the client. + return { success: true, message: "server close interrupted streaming: " + e }; + } + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn close_client_while_streaming() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + // Accept a bidi stream to confirm the client started streaming + let (mut send, _recv) = session.accept_bi().await.expect("accept_bi failed"); + + // Start writing from the server side — the client will close mid-stream + let chunk = vec![0xABu8; 4096]; + loop { + match send.write_all(&chunk).await { + Ok(()) => {} + Err(_) => break, + } + } + + let err = session.closed().await; + match err { + SessionError::WebTransportError(WebTransportError::Closed(code, reason)) => { + assert_eq!(code, 77, "close code mismatch"); + assert_eq!(reason, "client mid-stream", "close reason mismatch"); + } + other => panic!( + "expected WebTransportError::Closed(77, \"client mid-stream\"), got {other:?}" + ), + } + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + const reader = stream.readable.getReader(); + + // Give the server a moment to start writing back + await reader.read(); + + // Close the session from the client while the server is writing + wt.close({ closeCode: 77, reason: "client mid-stream" }); + await wt.closed; + return { success: true, message: "client closed while server was streaming" }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +// --------------------------------------------------------------------------- +// Edge Cases +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn session_rejection_by_server() { + init_tracing(); + + let handler: RequestHandler = Box::new(|request| { + Box::pin(async move { + request + .reject(http::StatusCode::NOT_FOUND) + .await + .expect("reject failed"); + }) + }); + + let harness = harness::setup_with_request_handler(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = new WebTransport(SERVER_URL, { + serverCertificateHashes: [{ + algorithm: "sha-256", + value: CERT_HASH, + }], + }); + try { + await wt.ready; + throw new Error("ready should have rejected"); + } catch (e) { + if (!(e instanceof WebTransportError) || e.source !== "session") throw e; + } + return { success: true, message: "session rejected" }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +// --------------------------------------------------------------------------- +// Close boundary values (parameterized) +// --------------------------------------------------------------------------- + +macro_rules! close_code_reason_test { + ($name:ident, $code:expr, $reason_js:expr, $reason_rust:expr) => { + #[tokio::test] + async fn $name() { + init_tracing(); + + let expected_code: u32 = $code; + let expected_reason: String = ($reason_rust).to_string(); + + let handler: ServerHandler = Box::new({ + let reason = expected_reason.clone(); + move |session| { + let reason = reason.clone(); + Box::pin(async move { + let err = session.closed().await; + match err { + SessionError::WebTransportError(WebTransportError::Closed(c, r)) => { + assert_eq!(c, expected_code, "close code mismatch"); + assert_eq!(r, reason, "close reason mismatch"); + } + other => panic!("expected WebTransportError::Closed, got {other:?}"), + } + }) + } + }); + + let harness = harness::setup(handler).await.unwrap(); + + let js_code = format!( + r#" + const wt = await connectWebTransport(); + wt.close({{ closeCode: {}, reason: {} }}); + await wt.closed; + return {{ success: true, message: "closed" }}; + "#, + expected_code, $reason_js + ); + + let result = harness.run_js(&js_code, TIMEOUT).await; + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); + } + }; +} + +close_code_reason_test!(close_code_zero, 0, r#""""#, ""); +close_code_reason_test!(close_max_code, 4294967295u32, r#""max""#, "max"); +close_code_reason_test!(close_empty_reason, 42, r#""""#, ""); +close_code_reason_test!(close_unicode_reason, 1, r#""goodbye 👋🌍""#, "goodbye 👋🌍"); +close_code_reason_test!( + close_long_reason, + 1, + r#""x".repeat(1024)"#, + "x".repeat(1024) +); + +// --------------------------------------------------------------------------- +// Server use-after-close (parameterized) +// --------------------------------------------------------------------------- + +macro_rules! server_use_after_close_test { + ($name:ident, |$session:ident| $op:expr) => { + #[tokio::test] + async fn $name() { + init_tracing(); + + let handler: ServerHandler = Box::new(|$session| { + Box::pin(async move { + $session.close(7, b"done"); + $session.closed().await; + let err = { $op }.unwrap_err(); + assert!( + matches!( + err, + SessionError::ConnectionError(quinn::ConnectionError::LocallyClosed) + ), + "expected ConnectionError::LocallyClosed, got {err:?}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + try { await wt.closed; } catch (e) { + if (!(e instanceof WebTransportError)) throw e; + } + return { success: true, message: "session closed" }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); + } + }; +} + +server_use_after_close_test!(server_open_bi_after_close, |session| session + .open_bi() + .await); +server_use_after_close_test!(server_open_uni_after_close, |session| session + .open_uni() + .await); +server_use_after_close_test!(server_accept_bi_after_close, |session| session + .accept_bi() + .await); +server_use_after_close_test!(server_accept_uni_after_close, |session| session + .accept_uni() + .await); + +#[tokio::test] +async fn server_send_datagram_after_close() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + session.close(7, b"done"); + session.closed().await; + let err = session + .send_datagram(Bytes::from_static(b"test")) + .unwrap_err(); + assert!( + matches!( + err, + SessionError::ConnectionError(quinn::ConnectionError::LocallyClosed) + ), + "expected ConnectionError(LocallyClosed), got {err:?}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + try { await wt.closed; } catch (e) { + if (!(e instanceof WebTransportError)) throw e; + } + return { success: true, message: "session closed" }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} diff --git a/web-transport-browser-tests/tests/datagram.rs b/web-transport-browser-tests/tests/datagram.rs new file mode 100644 index 0000000..cdbb13e --- /dev/null +++ b/web-transport-browser-tests/tests/datagram.rs @@ -0,0 +1,313 @@ +use std::time::Duration; + +use bytes::Bytes; +use web_transport_browser_tests::harness; +use web_transport_browser_tests::server::ServerHandler; +use web_transport_quinn::{SessionError, WebTransportError}; + +mod common; +use common::{init_tracing, TIMEOUT}; + +#[tokio::test] +async fn datagram_echo() { + init_tracing(); + let harness = harness::setup(harness::echo_handler()).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const writer = wt.datagrams.writable.getWriter(); + const reader = wt.datagrams.readable.getReader(); + + await writer.write(new TextEncoder().encode("dgram hello")); + const { value } = await reader.read(); + const received = new TextDecoder().decode(value); + wt.close(); + return { + success: received === "dgram hello", + message: "echoed: " + received + }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn datagram_binary_data() { + init_tracing(); + let harness = harness::setup(harness::echo_handler()).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const writer = wt.datagrams.writable.getWriter(); + const reader = wt.datagrams.readable.getReader(); + + const sent = new Uint8Array([0, 128, 255, 42]); + await writer.write(sent); + const { value } = await reader.read(); + + const ok = value.length === 4 && + value[0] === 0 && value[1] === 128 && + value[2] === 255 && value[3] === 42; + wt.close(); + return { success: ok, message: "received: [" + Array.from(value) + "]" }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn datagram_multiple_roundtrips() { + init_tracing(); + let harness = harness::setup(harness::echo_handler()).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const writer = wt.datagrams.writable.getWriter(); + const reader = wt.datagrams.readable.getReader(); + + // Send 10 datagrams + for (let i = 0; i < 10; i++) { + await writer.write(new Uint8Array([i])); + } + + // Wait for a single echoed datagram + const { value } = await reader.read(); + const valid = value.length === 1 && value[0] >= 0 && value[0] <= 9; + reader.releaseLock(); + wt.close(); + return { + success: valid, + message: "received datagram with value " + Array.from(value) + }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn datagram_max_size() { + init_tracing(); + let harness = harness::setup(harness::idle_handler()).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const maxSize = wt.datagrams.maxDatagramSize; + wt.close(); + return { + success: typeof maxSize === "number" && maxSize >= 500, + message: "maxDatagramSize=" + maxSize + }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn datagram_server_initiated() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + // Small delay to let the client set up a datagram reader + tokio::time::sleep(Duration::from_millis(100)).await; + session + .send_datagram(Bytes::from_static(b"server dgram")) + .expect("send_datagram failed"); + let err = session.closed().await; + assert!( + matches!( + err, + SessionError::WebTransportError(WebTransportError::Closed(_, _)) + ), + "expected WebTransportError::Closed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const reader = wt.datagrams.readable.getReader(); + const { value } = await reader.read(); + const received = new TextDecoder().decode(value); + wt.close(); + return { + success: received === "server dgram", + message: "received: " + received + }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +// --------------------------------------------------------------------------- +// Size boundaries & properties +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn datagram_at_max_size() { + init_tracing(); + let harness = harness::setup(harness::echo_handler()).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const maxSize = wt.datagrams.maxDatagramSize; + const writer = wt.datagrams.writable.getWriter(); + const reader = wt.datagrams.readable.getReader(); + + const data = new Uint8Array(maxSize).fill(0xAB); + await writer.write(data); + const { value } = await reader.read(); + wt.close(); + return { + success: value.length === maxSize, + message: "sent=" + maxSize + " received=" + value.length + }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn datagram_oversized_rejected() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let max_size = session.max_datagram_size(); + let oversized = Bytes::from(vec![0xFFu8; max_size + 100]); + let result = session.send_datagram(oversized); + assert!( + result.is_err(), + "oversized send_datagram should fail, but succeeded" + ); + session.close(0, b""); + session.closed().await; + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + // Give the server time to attempt the oversized send + await wt.closed; + return { success: true, message: "server rejected oversized datagram" }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +#[ignore = "empty datagrams appear to be silently dropped, causing a timeout"] +async fn datagram_empty() { + init_tracing(); + let harness = harness::setup(harness::echo_handler()).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const writer = wt.datagrams.writable.getWriter(); + const reader = wt.datagrams.readable.getReader(); + + await writer.write(new Uint8Array(0)); + const { value } = await reader.read(); + wt.close(); + return { + success: value.length === 0, + message: "received length=" + value.length + }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn datagram_high_water_marks() { + init_tracing(); + let harness = harness::setup(harness::idle_handler()).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const inHWM = wt.datagrams.incomingHighWaterMark; + const outHWM = wt.datagrams.outgoingHighWaterMark; + const initialOk = typeof inHWM === "number" && typeof outHWM === "number"; + + wt.datagrams.incomingHighWaterMark = 10; + wt.datagrams.outgoingHighWaterMark = 20; + const newIn = wt.datagrams.incomingHighWaterMark; + const newOut = wt.datagrams.outgoingHighWaterMark; + + wt.close(); + return { + success: initialOk && newIn === 10 && newOut === 20, + message: "initial: in=" + inHWM + " out=" + outHWM + + " new: in=" + newIn + " out=" + newOut + }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} diff --git a/web-transport-browser-tests/tests/smoke.rs b/web-transport-browser-tests/tests/smoke.rs new file mode 100644 index 0000000..727fc87 --- /dev/null +++ b/web-transport-browser-tests/tests/smoke.rs @@ -0,0 +1,25 @@ +use std::time::Duration; + +use web_transport_browser_tests::harness; + +#[tokio::test] +async fn browser_connects_to_server() { + let harness = harness::setup(harness::echo_handler()).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + wt.close(); + return { success: true, message: "connected and closed" }; + "#, + Duration::from_secs(10), + ) + .await; + + // Teardown before asserting so resources are cleaned up even on failure. + harness.teardown().await; + + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} diff --git a/web-transport-browser-tests/tests/stream_error.rs b/web-transport-browser-tests/tests/stream_error.rs new file mode 100644 index 0000000..767866a --- /dev/null +++ b/web-transport-browser-tests/tests/stream_error.rs @@ -0,0 +1,1177 @@ +use std::time::Duration; + +use web_transport_browser_tests::harness; +use web_transport_browser_tests::server::ServerHandler; +use web_transport_quinn::{quinn, ReadError, SessionError, WebTransportError, WriteError}; + +mod common; +use common::{init_tracing, TIMEOUT}; + +#[tokio::test] +async fn stream_client_abort_sends_reset() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let (mut send, mut recv) = session.accept_bi().await.expect("accept_bi failed"); + send.write(&[1, 2, 3]).await.expect("write failed"); + let code = recv.received_reset().await.ok().flatten(); + assert_eq!( + code, + Some(42), + "server should receive RESET_STREAM with code 42" + ); + send.write(&[1, 2, 3]).await.expect("write failed"); + let _ = session.closed().await; + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + + // Wait for the server to establish the stream + const reader = stream.readable.getReader(); + await reader.read(); + + const writer = stream.writable.getWriter(); + + // Construct WebTransportError — try both (message, init) and (init) forms + let err = new WebTransportError({ message: "abort", streamErrorCode: 42 }); + await writer.abort(err); + + await reader.read(); + return { success: true, message: "writer aborted with code 42" }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn stream_client_cancel_sends_stop_sending() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let (mut send, _recv) = session.accept_bi().await.expect("accept_bi failed"); + let (mut send2, _recv2) = session.accept_bi().await.expect("2nd accept_bi failed"); + send.write(&[1]).await.expect("write failed"); + let code = send.stopped().await.ok().flatten(); + assert_eq!( + code, + Some(77), + "server should receive STOP_SENDING with code 77" + ); + send2.write(&[1]).await.expect("write to 2nd stream failed"); + let _ = session.closed().await; + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + const stream2 = await wt.createBidirectionalStream(); + const reader = stream.readable.getReader(); + const reader2 = stream2.readable.getReader(); + + // Wait for the server to establish the stream + await reader.read(); + + let err = new WebTransportError({ message: "cancel", streamErrorCode: 77 }); + await reader.cancel(err); + + await reader2.read(); + return { success: true, message: "reader cancelled with code 77" }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn stream_client_reset_server_reader_errors() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let (mut send, mut recv) = session.accept_bi().await.expect("accept_bi failed"); + send.write(&[1]).await.expect("write failed"); + let mut buf = [0u8; 1024]; + loop { + match recv.read(&mut buf).await { + Ok(Some(_)) => continue, + Ok(None) => panic!("expected reset, got clean finish"), + Err(ReadError::Reset(code)) => { + assert_eq!(code, 42, "reset code should be 42"); + break; + } + Err(e) => panic!("unexpected read error: {e}"), + } + } + send.write(&[1]).await.expect("write failed"); + let _ = session.closed().await; + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + + // Wait for the server to establish the stream + const reader = stream.readable.getReader(); + await reader.read(); + + const writer = stream.writable.getWriter(); + await writer.write(new TextEncoder().encode("some data")); + let err = new WebTransportError({ message: "abort", streamErrorCode: 42 }); + await writer.abort(err); + + await reader.read(); + return { success: true, message: "writer aborted with code 42" }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn stream_client_stop_server_writer_errors() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let (mut send, _recv) = session.accept_bi().await.expect("accept_bi failed"); + let (mut send2, _recv2) = session.accept_bi().await.expect("2nd accept_bi failed"); + // Keep writing until we get a Stopped error from the client's cancel + let chunk = vec![0u8; 1024]; + loop { + match send.write_all(&chunk).await { + Ok(()) => { + tokio::time::sleep(Duration::from_millis(10)).await; + } + Err(WriteError::Stopped(code)) => { + assert_eq!(code, 77, "stop code should be 77"); + break; + } + Err(e) => panic!("unexpected write error: {e}"), + } + } + send2.write(&[1]).await.expect("write to 2nd stream failed"); + let _ = session.closed().await; + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + const stream2 = await wt.createBidirectionalStream(); + const reader = stream.readable.getReader(); + const reader2 = stream2.readable.getReader(); + + // Read to confirm the server has started writing + await reader.read(); + + let err = new WebTransportError({ message: "cancel", streamErrorCode: 77 }); + await reader.cancel(err); + + await reader2.read(); + + return { success: true, message: "reader cancelled with code 77" }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn stream_server_reset_client_reader_errors() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let (mut send, mut recv) = session.accept_bi().await.expect("accept_bi failed"); + // Read to wait until the client starts + let mut buf = [0u8; 1024]; + recv.read(&mut buf).await.expect("read failed"); + send.reset(33).expect("reset failed"); + let err = session.closed().await; + assert!( + matches!( + err, + SessionError::WebTransportError(WebTransportError::Closed(_, _)) + ), + "expected WebTransportError::Closed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + try { + const stream = await wt.createBidirectionalStream(); + const reader = stream.readable.getReader(); + const writer = stream.writable.getWriter(); + await writer.write(new Uint8Array([1])); + + while (true) { + const { done } = await reader.read(); + if (done) { + wt.close(); + return { success: false, message: "expected reader to error on reset" }; + } + } + } catch (e) { + const isWTE = e instanceof WebTransportError; + const code = isWTE ? e.streamErrorCode : null; + wt.close(); + return { + success: isWTE && e.source === "stream" && code === 33, + message: "reader errored: isWebTransportError=" + isWTE + " code=" + code + " " + e, + details: { streamErrorCode: code } + }; + } + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn stream_server_stop_client_writer_errors() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let (_send, mut recv) = session.accept_bi().await.expect("accept_bi failed"); + // Read so QUIC acknowledges the stream before calling stop() + let mut buf = [0u8; 1024]; + recv.read(&mut buf).await.expect("read failed"); + recv.stop(88).expect("stop failed"); + // small delay to make sure stop propagates + let err = session.closed().await; + assert!( + matches!( + err, + SessionError::WebTransportError(WebTransportError::Closed(_, _)) + ), + "expected WebTransportError::Closed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + try { + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + await writer.write(new Uint8Array([1])); + + // Keep writing until we get an error from STOP_SENDING + for (let i = 0; i < 100; i++) { + await writer.write(new Uint8Array(1024)); + await new Promise(r => setTimeout(r, 10)); + } + wt.close(); + return { success: false, message: "expected writer to error on stop" }; + } catch (e) { + const isWTE = e instanceof WebTransportError; + const code = isWTE ? e.streamErrorCode : null; + wt.close(); + return { + success: isWTE && e.source === "stream" && code === 88, + message: "writer errored: isWebTransportError=" + isWTE + " code=" + code + " " + e, + details: { streamErrorCode: code } + }; + } + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +// --------------------------------------------------------------------------- +// Connection close interrupts stream I/O +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn client_close_interrupts_server_read() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let (_send, mut recv) = session.accept_bi().await.expect("accept_bi failed"); + let mut buf = [0u8; 1024]; + loop { + match recv.read(&mut buf).await { + Ok(Some(_)) => continue, + Ok(None) => panic!("expected connection error, got clean finish"), + Err(ReadError::SessionError(SessionError::WebTransportError( + WebTransportError::Closed(_, _), + ))) => break, + Err(e) => panic!("expected session closed, got {e}"), + } + } + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + await writer.write(new TextEncoder().encode("some data")); + wt.close(); + await wt.closed; + return { success: true, message: "client closed while server was reading" }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn client_close_interrupts_server_write() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let (mut send, mut recv) = session.accept_bi().await.expect("accept_bi failed"); + // Read to confirm the client has opened the stream + let mut buf = [0u8; 1024]; + recv.read(&mut buf).await.expect("read failed"); + let chunk = vec![0u8; 1024]; + loop { + match send.write_all(&chunk).await { + Ok(()) => { + tokio::time::sleep(Duration::from_millis(10)).await; + } + Err(WriteError::SessionError(SessionError::WebTransportError( + WebTransportError::Closed(_, _), + ))) => break, + Err(e) => panic!("expected session closed, got {e}"), + } + } + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + // Write to trigger server accept + await writer.write(new Uint8Array([1])); + // Small delay so the server starts writing + await new Promise(r => setTimeout(r, 50)); + wt.close(); + await wt.closed; + return { success: true, message: "client closed while server was writing" }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn server_close_interrupts_client_read() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let (_send, mut recv) = session.accept_bi().await.expect("accept_bi failed"); + // Read to confirm the client has started + let mut buf = [0u8; 1024]; + recv.read(&mut buf).await.expect("read failed"); + session.close(0, b""); + let _ = session.closed().await; + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + // Write to trigger server accept + await writer.write(new Uint8Array([1])); + try { + while (true) { + const { done } = await reader.read(); + if (done) { + return { success: false, message: "expected error, got clean finish" }; + } + } + } catch (e) { + if (!(e instanceof WebTransportError) || e.source !== "session") throw e; + return { success: true, message: "read interrupted by server close: " + e }; + } + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn server_close_interrupts_client_write() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let (_send, mut recv) = session.accept_bi().await.expect("accept_bi failed"); + // Read to confirm the client has started + let mut buf = [0u8; 1024]; + recv.read(&mut buf).await.expect("read failed"); + session.close(0, b""); + let _ = session.closed().await; + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + // Write to trigger server accept + await writer.write(new Uint8Array([1])); + try { + for (let i = 0; i < 100; i++) { + await writer.write(new Uint8Array(1024)); + await new Promise(r => setTimeout(r, 10)); + } + return { success: false, message: "expected error on write" }; + } catch (e) { + if (!(e instanceof WebTransportError) || e.source !== "session") throw e; + return { success: true, message: "write interrupted by server close: " + e }; + } + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +// --------------------------------------------------------------------------- +// Connection close interrupts accept +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn client_close_interrupts_server_accept_bi() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + // First accept succeeds — client opened one stream before closing + // Keep references to streams to avoid early cancellation + let _s1 = session.accept_bi().await.expect("first accept_bi failed"); + // Second accept should fail with a session close error + let err = session.accept_bi().await.unwrap_err(); + assert!( + matches!( + err, + SessionError::WebTransportError(WebTransportError::Closed(_, _)) + ), + "expected WebTransportError::Closed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + await writer.write(new Uint8Array([1])); + wt.close(); + await wt.closed; + return { success: true, message: "client closed while server was accepting bidi" }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn client_close_interrupts_server_accept_uni() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + // First accept succeeds — client opened one stream before closing + // Keep references to streams to avoid early cancellation + let _s1 = session.accept_uni().await.expect("first accept_uni failed"); + // Second accept should fail with a session close error + let err = session.accept_uni().await.unwrap_err(); + assert!( + matches!( + err, + SessionError::WebTransportError(WebTransportError::Closed(_, _)) + ), + "expected WebTransportError::Closed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createUnidirectionalStream(); + const writer = stream.getWriter(); + await writer.write(new Uint8Array([1])); + await writer.close(); + wt.close(); + await wt.closed; + return { success: true, message: "client closed while server was accepting uni" }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn server_close_interrupts_client_accept_bi() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + // Small delay so the client starts waiting on incomingBidirectionalStreams + tokio::time::sleep(Duration::from_millis(100)).await; + session.close(0, b""); + let _ = session.closed().await; + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const reader = wt.incomingBidirectionalStreams.getReader(); + try { + const { done } = await reader.read(); + if (done) { + return { success: true, message: "incomingBidirectionalStreams closed" }; + } + return { success: false, message: "expected stream to end, got a value" }; + } catch (e) { + if (!(e instanceof WebTransportError) || e.source !== "session") throw e; + return { success: true, message: "accept bidi interrupted by server close: " + e }; + } + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn server_close_interrupts_client_accept_uni() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + // Small delay so the client starts waiting on incomingUnidirectionalStreams + tokio::time::sleep(Duration::from_millis(100)).await; + session.close(0, b""); + let _ = session.closed().await; + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const reader = wt.incomingUnidirectionalStreams.getReader(); + try { + const { done } = await reader.read(); + if (done) { + return { success: true, message: "incomingUnidirectionalStreams closed" }; + } + return { success: false, message: "expected stream to end, got a value" }; + } catch (e) { + if (!(e instanceof WebTransportError) || e.source !== "session") throw e; + return { success: true, message: "accept uni interrupted by server close: " + e }; + } + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +// --------------------------------------------------------------------------- +// Stream isolation +// --------------------------------------------------------------------------- + +// --------------------------------------------------------------------------- +// Boundary error codes (parameterized) +// --------------------------------------------------------------------------- + +macro_rules! reset_code_test { + ($name:ident, $code:expr) => { + #[tokio::test] + async fn $name() { + init_tracing(); + + let expected_code: u32 = $code; + + let handler: ServerHandler = Box::new(move |session| { + Box::pin(async move { + let (mut send, mut recv) = session.accept_bi().await.expect("accept_bi failed"); + send.write(&[1]).await.expect("write failed"); + let code = recv.received_reset().await.ok().flatten(); + assert_eq!( + code, + Some(expected_code), + "reset code mismatch" + ); + send.write(&[1]).await.expect("write failed"); + let _ = session.closed().await; + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let js_code = format!( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + const reader = stream.readable.getReader(); + await reader.read(); + const writer = stream.writable.getWriter(); + let err = new WebTransportError({{ message: "abort", streamErrorCode: {} }}); + await writer.abort(err); + await reader.read(); + return {{ success: true, message: "writer aborted with code {}" }}; + "#, + expected_code, expected_code + ); + + let result = harness.run_js(&js_code, TIMEOUT).await; + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); + } + }; +} + +reset_code_test!(stream_reset_code_zero, 0); +reset_code_test!(stream_reset_code_255, 255); + +macro_rules! stop_code_test { + ($name:ident, $code:expr) => { + #[tokio::test] + async fn $name() { + init_tracing(); + + let expected_code: u32 = $code; + + let handler: ServerHandler = Box::new(move |session| { + Box::pin(async move { + let (mut send, _recv) = session.accept_bi().await.expect("accept_bi failed"); + let (mut send2, _recv2) = session.accept_bi().await.expect("2nd accept_bi failed"); + send.write(&[1]).await.expect("write failed"); + let code = send.stopped().await.ok().flatten(); + assert_eq!( + code, + Some(expected_code), + "stop code mismatch" + ); + send2.write(&[1]).await.expect("write to 2nd stream failed"); + let _ = session.closed().await; + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let js_code = format!( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + const stream2 = await wt.createBidirectionalStream(); + const reader = stream.readable.getReader(); + const reader2 = stream2.readable.getReader(); + await reader.read(); + let err = new WebTransportError({{ message: "cancel", streamErrorCode: {} }}); + await reader.cancel(err); + await reader2.read(); + return {{ success: true, message: "reader cancelled with code {}" }}; + "#, + expected_code, expected_code + ); + + let result = harness.run_js(&js_code, TIMEOUT).await; + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); + } + }; +} + +stop_code_test!(stream_stop_code_zero, 0); +stop_code_test!(stream_stop_code_255, 255); + +// --------------------------------------------------------------------------- +// Server stream use-after-finish/reset/stop/session-close +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn server_write_after_finish() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let (mut send, _recv) = session.open_bi().await.expect("open_bi failed"); + send.write_all(b"hello").await.expect("write_all failed"); + send.finish().expect("finish failed"); + send.stopped().await.expect("stopped failed"); + let result = send.write_all(b"more").await; + assert!( + matches!(result, Err(WriteError::ClosedStream)), + "expected ClosedStream after finish, got {result:?}" + ); + let _ = session.closed().await; + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const reader = wt.incomingBidirectionalStreams.getReader(); + const { value: stream } = await reader.read(); + const sr = stream.readable.getReader(); + let received = ""; + while (true) { + const { value, done } = await sr.read(); + if (done) break; + received += new TextDecoder().decode(value); + } + wt.close(); + return { + success: received === "hello", + message: "received: " + received + }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn server_write_after_reset() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let (mut send, _recv) = session.open_bi().await.expect("open_bi failed"); + send.reset(42).expect("reset failed"); + let result = send.write_all(b"more").await; + assert!( + matches!(result, Err(WriteError::ClosedStream)), + "expected ClosedStream after reset, got {result:?}" + ); + session.close(0, b""); + let _ = session.closed().await; + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + // Give the server time to open a stream, reset it, and attempt write + await wt.closed; + return { success: true, message: "server tested write after reset" }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn server_read_after_stop() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let (_send, mut recv) = session.accept_bi().await.expect("accept_bi failed"); + // Read initial data to ensure stream is established + let mut buf = [0u8; 1024]; + recv.read(&mut buf).await.expect("initial read failed"); + recv.stop(10).expect("stop failed"); + let result = recv.read(&mut buf).await; + // Quinn sets `all_data_read = true` in stop(), so subsequent + // reads return Ok(None) — same as a cleanly finished stream — + // rather than an error. + assert!( + matches!(result, Ok(None)), + "expected Ok(None) after stop, got {result:?}" + ); + let err = session.closed().await; + assert!( + matches!( + err, + SessionError::WebTransportError(WebTransportError::Closed(_, _)) + ), + "expected WebTransportError::Closed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + await writer.write(new Uint8Array([1])); + // Wait for STOP_SENDING to arrive, then handle writer error + try { + for (let i = 0; i < 100; i++) { + await writer.write(new Uint8Array(1024)); + await new Promise(r => setTimeout(r, 10)); + } + } catch (e) { + // Expected — STOP_SENDING causes writer error + } + wt.close(); + return { success: true, message: "client done" }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn server_read_on_stream_after_session_close() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let (_send, mut recv) = session.accept_bi().await.expect("accept_bi failed"); + recv.read(&mut [0u8; 1]).await.expect("initial read failed"); + session.close(7, b"done"); + session.closed().await; + let mut buf = [0u8; 1024]; + let result = recv.read(&mut buf).await; + match result { + Err(ReadError::SessionError(SessionError::ConnectionError( + quinn::ConnectionError::LocallyClosed, + ))) => {} + other => panic!( + "expected ReadError::SessionError(ConnectionError::LocallyClosed), got {other:?}" + ), + } + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + await writer.write(new Uint8Array([1])); + try { + await writer.write(new Uint8Array([1])); + await writer.write(new Uint8Array([1])); + } catch (e) { + // Expected — STOP_SENDING causes writer error + } + return { success: true, message: "session closed" }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn server_write_on_stream_after_session_close() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let (mut send, mut recv) = session.accept_bi().await.expect("accept_bi failed"); + recv.read(&mut [0u8; 1]).await.expect("initial read failed"); + session.close(7, b"done"); + session.closed().await; + let result = send.write_all(b"test").await; + match result { + Err(WriteError::SessionError(SessionError::ConnectionError( + quinn::ConnectionError::LocallyClosed, + ))) => {} + other => panic!( + "expected WriteError::SessionError(ConnectionError::LocallyClosed), got {other:?}" + ), + } + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + await writer.write(new Uint8Array([1])); + try { await wt.closed; } catch (e) { + if (!(e instanceof WebTransportError)) throw e; + } + return { success: true, message: "session closed" }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +// --------------------------------------------------------------------------- +// Stream isolation +// --------------------------------------------------------------------------- + +/// Opens 3 bidi streams on one session. Stream 0 is echoed normally, stream 1 +/// is reset by the client, stream 2 is reset by the server. Verifies that the +/// resets on streams 1 and 2 do not disturb stream 0. +#[tokio::test] +async fn stream_reset_does_not_affect_other_streams() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let mut tasks = tokio::task::JoinSet::new(); + loop { + match session.accept_bi().await { + Ok((mut send, mut recv)) => { + tasks.spawn(async move { + match recv.read_to_end(1024 * 1024).await { + Ok(data) if data == b"reset-this" => { + send.reset(33).expect("reset failed"); + } + Ok(data) => { + send.write_all(&data).await.expect("echo: write_all failed"); + send.finish().expect("echo: finish failed"); + } + Err(web_transport_quinn::ReadToEndError::ReadError( + ReadError::Reset(_), + )) => { + // Client reset this stream — expected + } + Err(e) => panic!("unexpected read error: {e}"), + } + }); + } + Err(SessionError::WebTransportError(WebTransportError::Closed(_, _))) => break, + Err(e) => panic!("accept_bi failed: {e}"), + } + } + while let Some(result) = tasks.join_next().await { + if let Err(e) = result { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } + } + } + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + + const [echoResult, clientResetResult, serverResetResult] = await Promise.allSettled([ + // Stream 0: normal echo + (async () => { + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + await writer.write(new TextEncoder().encode("stream0")); + await writer.close(); + let received = ""; + while (true) { + const { value, done } = await reader.read(); + if (done) break; + received += new TextDecoder().decode(value); + } + return received; + })(), + // Stream 1: client resets writer + (async () => { + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + await writer.write(new TextEncoder().encode("data")); + const err = new WebTransportError({ message: "abort", streamErrorCode: 42 }); + await writer.abort(err); + })(), + // Stream 2: server resets after reading + (async () => { + const stream = await wt.createBidirectionalStream(); + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + await writer.write(new TextEncoder().encode("reset-this")); + await writer.close(); + while (true) { + const { done } = await reader.read(); + if (done) throw new Error("expected reset, got clean finish"); + } + })() + ]); + + wt.close(); + + const echoOk = echoResult.status === "fulfilled" && echoResult.value === "stream0"; + const clientResetOk = clientResetResult.status === "fulfilled"; + const srErr = serverResetResult.reason; + const serverResetOk = serverResetResult.status === "rejected" + && srErr instanceof WebTransportError + && srErr.source === "stream" + && srErr.streamErrorCode === 33; + + return { + success: echoOk && clientResetOk && serverResetOk, + message: "echo=" + echoOk + " clientReset=" + clientResetOk + + " serverReset=" + serverResetOk + + " echoVal=" + JSON.stringify(echoResult.value) + + " srSource=" + (srErr instanceof WebTransportError ? srErr.source : "N/A") + + " srCode=" + (srErr instanceof WebTransportError ? srErr.streamErrorCode : "N/A") + }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} diff --git a/web-transport-browser-tests/tests/uni_stream.rs b/web-transport-browser-tests/tests/uni_stream.rs new file mode 100644 index 0000000..deb29d3 --- /dev/null +++ b/web-transport-browser-tests/tests/uni_stream.rs @@ -0,0 +1,332 @@ +use web_transport_browser_tests::harness; +use web_transport_browser_tests::server::ServerHandler; +use web_transport_quinn::{SessionError, WebTransportError}; + +mod common; +use common::{init_tracing, LONG_TIMEOUT, TIMEOUT}; + +// --------------------------------------------------------------------------- +// Client-Initiated +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn uni_stream_client_to_server() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let mut recv = session.accept_uni().await.expect("accept_uni failed"); + let data = recv + .read_to_end(1024 * 1024) + .await + .expect("read_to_end failed"); + assert_eq!( + String::from_utf8_lossy(&data), + "uni data", + "server should receive 'uni data'" + ); + let err = session.closed().await; + assert!( + matches!( + err, + SessionError::WebTransportError(WebTransportError::Closed(_, _)) + ), + "expected WebTransportError::Closed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createUnidirectionalStream(); + const writer = stream.getWriter(); + await writer.write(new TextEncoder().encode("uni data")); + await writer.close(); + wt.close(); + return { success: true, message: "uni stream sent" }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn uni_stream_client_to_server_multiple() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let mut collected = Vec::new(); + for _ in 0..3 { + let mut recv = session.accept_uni().await.expect("accept_uni failed"); + let data = recv.read_to_end(1024).await.expect("read_to_end failed"); + collected.push(String::from_utf8_lossy(&data).into_owned()); + } + assert_eq!(collected.len(), 3, "server should receive 3 messages"); + collected.sort(); + assert_eq!(collected, vec!["msg0", "msg1", "msg2"]); + let err = session.closed().await; + assert!( + matches!( + err, + SessionError::WebTransportError(WebTransportError::Closed(_, _)) + ), + "expected WebTransportError::Closed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + for (let i = 0; i < 3; i++) { + const stream = await wt.createUnidirectionalStream(); + const writer = stream.getWriter(); + await writer.write(new TextEncoder().encode("msg" + i)); + await writer.close(); + } + wt.close(); + return { success: true, message: "3 uni streams sent" }; + "#, + LONG_TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn uni_stream_client_large_payload() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let mut recv = session.accept_uni().await.expect("accept_uni failed"); + let data = recv + .read_to_end(2 * 1024 * 1024) + .await + .expect("read_to_end failed"); + assert_eq!(data.len(), 1024 * 1024, "server should receive 1MB"); + // Spot-check pattern every 4KB: each byte = (index % 251) as u8 + for i in (0..data.len()).step_by(4096) { + assert_eq!(data[i], (i % 251) as u8, "data mismatch at byte {i}"); + } + let err = session.closed().await; + assert!( + matches!( + err, + SessionError::WebTransportError(WebTransportError::Closed(_, _)) + ), + "expected WebTransportError::Closed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const stream = await wt.createUnidirectionalStream(); + const writer = stream.getWriter(); + + const SIZE = 1024 * 1024; + const data = new Uint8Array(SIZE); + for (let i = 0; i < SIZE; i++) data[i] = i % 251; + + const CHUNK = 64 * 1024; + for (let off = 0; off < SIZE; off += CHUNK) { + await writer.write(data.subarray(off, Math.min(off + CHUNK, SIZE))); + } + await writer.close(); + wt.close(); + return { success: true, message: "sent 1MB via uni stream" }; + "#, + LONG_TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +// --------------------------------------------------------------------------- +// Server-Initiated +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn uni_stream_server_to_client() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let mut send = session.open_uni().await.expect("open_uni failed"); + send.write_all(b"server uni") + .await + .expect("write_all failed"); + send.finish().expect("finish failed"); + let err = session.closed().await; + assert!( + matches!( + err, + SessionError::WebTransportError(WebTransportError::Closed(_, _)) + ), + "expected WebTransportError::Closed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const reader = wt.incomingUnidirectionalStreams.getReader(); + const { value: stream } = await reader.read(); + + const sr = stream.getReader(); + let received = ""; + while (true) { + const { value, done } = await sr.read(); + if (done) break; + received += new TextDecoder().decode(value); + } + wt.close(); + return { + success: received === "server uni", + message: "received: " + received + }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn uni_stream_server_to_client_multiple() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + for i in 0u8..3 { + let mut send = session.open_uni().await.expect("open_uni failed"); + send.write_all(&[i]).await.expect("write_all failed"); + send.finish().expect("finish failed"); + } + let err = session.closed().await; + assert!( + matches!( + err, + SessionError::WebTransportError(WebTransportError::Closed(_, _)) + ), + "expected WebTransportError::Closed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const reader = wt.incomingUnidirectionalStreams.getReader(); + const values = []; + for (let i = 0; i < 3; i++) { + const { value: stream, done } = await reader.read(); + if (done) break; + const sr = stream.getReader(); + const { value } = await sr.read(); + values.push(value[0]); + } + wt.close(); + values.sort((a, b) => a - b); + const ok = values.length === 3 && values[0] === 0 && values[1] === 1 && values[2] === 2; + return { success: ok, message: "values: " + JSON.stringify(values) }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +} + +#[tokio::test] +async fn uni_stream_server_large_payload() { + init_tracing(); + + let handler: ServerHandler = Box::new(|session| { + Box::pin(async move { + let mut send = session.open_uni().await.expect("open_uni failed"); + // 64KB of pattern data: each byte = (index % 251) as u8 + let data: Vec = (0..65536).map(|i| (i % 251) as u8).collect(); + send.write_all(&data).await.expect("write_all failed"); + send.finish().expect("finish failed"); + let err = session.closed().await; + assert!( + matches!( + err, + SessionError::WebTransportError(WebTransportError::Closed(_, _)) + ), + "expected WebTransportError::Closed, got {err}" + ); + }) + }); + + let harness = harness::setup(handler).await.unwrap(); + + let result = harness + .run_js( + r#" + const wt = await connectWebTransport(); + const reader = wt.incomingUnidirectionalStreams.getReader(); + const { value: stream } = await reader.read(); + const sr = stream.getReader(); + + const chunks = []; + while (true) { + const { value, done } = await sr.read(); + if (done) break; + chunks.push(value); + } + const received = new Uint8Array(chunks.reduce((n, c) => n + c.length, 0)); + let off = 0; + for (const c of chunks) { received.set(c, off); off += c.length; } + + let ok = received.length === 65536; + for (let i = 0; ok && i < received.length; i++) { + if (received[i] !== (i % 251)) { ok = false; } + } + wt.close(); + return { success: ok, message: "len=" + received.length }; + "#, + TIMEOUT, + ) + .await; + + harness.teardown().await; + let result = result.unwrap(); + assert!(result.success, "{}", result.message); +}