From dec96436772007178a2c9190d87598893a38b57d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=BCdiger=20Klaehn?= Date: Thu, 12 Dec 2024 17:05:38 +0200 Subject: [PATCH] fix: fix silent failure to add data of more than ~16MB via add_bytes or add_bytes_named (#36) * WIP * Add a very basic test for quinn rpc * Add tests that use the rpc client with quinn. Some errors don't happen if you use the mem transport. Also fix bug where add_bytes fails if the bytes are larger than the max frame size. * use published quic-rpc --- .gitignore | 1 + Cargo.lock | 99 ++++++++++++++++++++++++++ Cargo.toml | 3 +- src/rpc/client/blobs.rs | 16 +++-- tests/rpc.rs | 153 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 266 insertions(+), 6 deletions(-) create mode 100644 tests/rpc.rs diff --git a/.gitignore b/.gitignore index fe8147e8..e31b44fb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target iroh.config.toml +.vscode/* diff --git a/Cargo.lock b/Cargo.lock index ae11cb1a..4f6be782 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -218,6 +218,15 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "atomic-polyfill" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cf2bce30dfe09ef0bfaef228b9d414faaf7e563035494d7fe092dba54b300f4" +dependencies = [ + "critical-section", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -363,6 +372,15 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "597bb81c80a54b6a4381b23faba8d7774b144c94cbd1d6fe3f1329bd776554ab" +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bit-set" version = "0.5.3" @@ -667,6 +685,12 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "critical-section" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b" + [[package]] name = "crossbeam-channel" version = "0.5.13" @@ -961,6 +985,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "educe" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4bd92664bf78c4d3dba9b7cdafce6fa15b13ed3ed16175218196942e99168a8" +dependencies = [ + "enum-ordinalize", + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "elliptic-curve" version = "0.13.8" @@ -1010,6 +1046,26 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "enum-ordinalize" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea0dcfa4e54eeb516fe454635a95753ddd39acda650ce703031c6973e315dd5" +dependencies = [ + "enum-ordinalize-derive", +] + +[[package]] +name = "enum-ordinalize-derive" +version = "4.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d28318a75d4aead5c4db25382e8ef717932d0346600cacae6357eb5941bc5ff" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "enumflags2" version = "0.7.10" @@ -1444,6 +1500,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "hash32" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c35f58762feb77d74ebe43bdbc3210f09be9fe6742234d573bacc26ed92b67" +dependencies = [ + "byteorder", +] + [[package]] name = "hashbrown" version = "0.14.5" @@ -1468,6 +1533,20 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "heapless" +version = "0.7.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdc6457c0eb62c71aac4bc17216026d8410337c4126773b9c5daba343f17964f" +dependencies = [ + "atomic-polyfill", + "hash32", + "rustc_version", + "serde", + "spin", + "stable_deref_trait", +] + [[package]] name = "heck" version = "0.5.0" @@ -3252,6 +3331,7 @@ dependencies = [ "cobs", "embedded-io 0.4.0", "embedded-io 0.6.1", + "heapless", "postcard-derive", "serde", ] @@ -3440,17 +3520,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7a980daf521a275ae2a04fefc311c96fd0cf11ae430324d1b914d072bcc408b" dependencies = [ "anyhow", + "bytes", "derive_more", "flume", "futures-lite 2.5.0", "futures-sink", "futures-util", + "iroh-quinn", "pin-project", + "postcard", "serde", "slab", "smallvec", "time", "tokio", + "tokio-serde", "tokio-util", "tracing", ] @@ -4803,6 +4887,21 @@ dependencies = [ "x509-parser", ] +[[package]] +name = "tokio-serde" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "caf600e7036b17782571dd44fa0a5cea3c82f60db5137f774a325a76a0d6852b" +dependencies = [ + "bincode", + "bytes", + "educe", + "futures-core", + "futures-sink", + "pin-project", + "serde", +] + [[package]] name = "tokio-stream" version = "0.1.17" diff --git a/Cargo.toml b/Cargo.toml index 07cdd623..0a936e00 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,7 @@ postcard = { version = "1", default-features = false, features = [ "use-std", "experimental-derive", ] } -quic-rpc = { version = "0.17", optional = true } +quic-rpc = { version = "0.17.1", optional = true } quic-rpc-derive = { version = "0.17", optional = true } quinn = { package = "iroh-quinn", version = "0.12", features = ["ring"] } rand = "0.8" @@ -119,6 +119,7 @@ example-iroh = [ "dep:console", "iroh/discovery-local-network" ] +test = ["quic-rpc/quinn-transport"] [package.metadata.docs.rs] all-features = true diff --git a/src/rpc/client/blobs.rs b/src/rpc/client/blobs.rs index 454617a8..1809b6aa 100644 --- a/src/rpc/client/blobs.rs +++ b/src/rpc/client/blobs.rs @@ -268,10 +268,10 @@ where } }); tokio::spawn(async move { - // TODO: Is it important to catch this error? It should also result in an error on the - // response stream. If we deem it important, we could one-shot send it into the - // BlobAddProgress and return from there. Not sure. if let Err(err) = sink.send_all(&mut input).await { + // if we get an error in send_all due to the connection being closed, this will just fail again. + // if we get an error due to something else (serialization or size limit), tell the remote to abort. + sink.send(AddStreamUpdate::Abort).await.ok(); warn!("Failed to send input stream to remote: {err:?}"); } }); @@ -281,7 +281,7 @@ where /// Write a blob by passing bytes. pub async fn add_bytes(&self, bytes: impl Into) -> anyhow::Result { - let input = futures_lite::stream::once(Ok(bytes.into())); + let input = chunked_bytes_stream(bytes.into(), 1024 * 64).map(Ok); self.add_stream(input, SetTagOption::Auto).await?.await } @@ -291,7 +291,7 @@ where bytes: impl Into, name: impl Into, ) -> anyhow::Result { - let input = futures_lite::stream::once(Ok(bytes.into())); + let input = chunked_bytes_stream(bytes.into(), 1024 * 64).map(Ok); self.add_stream(input, SetTagOption::Named(name.into())) .await? .await @@ -987,6 +987,12 @@ pub struct DownloadOptions { pub mode: DownloadMode, } +fn chunked_bytes_stream(mut b: Bytes, c: usize) -> impl Stream { + futures_lite::stream::iter(std::iter::from_fn(move || { + Some(b.split_to(b.len().min(c))).filter(|x| !x.is_empty()) + })) +} + #[cfg(test)] mod tests { use std::{path::Path, time::Duration}; diff --git a/tests/rpc.rs b/tests/rpc.rs new file mode 100644 index 00000000..5eab6127 --- /dev/null +++ b/tests/rpc.rs @@ -0,0 +1,153 @@ +#![cfg(feature = "test")] +use std::{net::SocketAddr, path::PathBuf, sync::Arc}; + +use iroh_blobs::{net_protocol::Blobs, util::local_pool::LocalPool}; +use quic_rpc::transport::quinn::QuinnConnector; +use quinn::{ + crypto::rustls::{QuicClientConfig, QuicServerConfig}, + rustls, ClientConfig, Endpoint, ServerConfig, +}; +use rcgen::CertifiedKey; +use tempfile::TempDir; +use testresult::TestResult; +use tokio_util::task::AbortOnDropHandle; + +type QC = QuinnConnector; +type BlobsClient = iroh_blobs::rpc::client::blobs::Client; + +/// Builds default quinn client config and trusts given certificates. +/// +/// ## Args +/// +/// - server_certs: a list of trusted certificates in DER format. +fn configure_client(server_certs: &[CertifiedKey]) -> anyhow::Result { + let mut certs = rustls::RootCertStore::empty(); + for cert in server_certs { + let cert = cert.cert.der().clone(); + certs.add(cert)?; + } + + let crypto_client_config = rustls::ClientConfig::builder_with_provider(Arc::new( + rustls::crypto::ring::default_provider(), + )) + .with_protocol_versions(&[&rustls::version::TLS13]) + .expect("valid versions") + .with_root_certificates(certs) + .with_no_client_auth(); + let quic_client_config = QuicClientConfig::try_from(crypto_client_config)?; + + Ok(ClientConfig::new(Arc::new(quic_client_config))) +} + +/// Returns default server configuration along with its certificate. +#[allow(clippy::field_reassign_with_default)] // https://github.com/rust-lang/rust-clippy/issues/6527 +fn configure_server() -> anyhow::Result<(ServerConfig, CertifiedKey)> { + let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()])?; + let cert_der = cert.cert.der(); + let priv_key = rustls::pki_types::PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()); + let cert_chain = vec![cert_der.clone()]; + + let crypto_server_config = rustls::ServerConfig::builder_with_provider(Arc::new( + rustls::crypto::ring::default_provider(), + )) + .with_protocol_versions(&[&rustls::version::TLS13]) + .expect("valid versions") + .with_no_client_auth() + .with_single_cert(cert_chain, priv_key.into())?; + let quic_server_config = QuicServerConfig::try_from(crypto_server_config)?; + let mut server_config = ServerConfig::with_crypto(Arc::new(quic_server_config)); + + Arc::get_mut(&mut server_config.transport) + .unwrap() + .max_concurrent_uni_streams(0_u8.into()); + + Ok((server_config, cert)) +} + +pub fn make_server_endpoint(bind_addr: SocketAddr) -> anyhow::Result<(Endpoint, CertifiedKey)> { + let (server_config, server_cert) = configure_server()?; + let endpoint = Endpoint::server(server_config, bind_addr)?; + Ok((endpoint, server_cert)) +} + +pub fn make_client_endpoint( + bind_addr: SocketAddr, + server_certs: &[CertifiedKey], +) -> anyhow::Result { + let client_cfg = configure_client(server_certs)?; + let mut endpoint = Endpoint::client(bind_addr)?; + endpoint.set_default_client_config(client_cfg); + Ok(endpoint) +} + +/// An iroh node that just has the blobs transport +#[derive(Debug)] +pub struct Node { + pub router: iroh::protocol::Router, + pub blobs: Blobs, + pub local_pool: LocalPool, + pub rpc_task: AbortOnDropHandle<()>, +} + +impl Node { + pub async fn new(path: PathBuf) -> anyhow::Result<(Self, SocketAddr, CertifiedKey)> { + let store = iroh_blobs::store::fs::Store::load(path).await?; + let local_pool = LocalPool::default(); + let endpoint = iroh::Endpoint::builder().bind().await?; + let blobs = Blobs::builder(store).build(local_pool.handle(), &endpoint); + let router = iroh::protocol::Router::builder(endpoint) + .accept(iroh_blobs::ALPN, blobs.clone()) + .spawn() + .await?; + let (config, key) = configure_server()?; + let endpoint = quinn::Endpoint::server(config, "127.0.0.1:0".parse().unwrap())?; + let local_addr = endpoint.local_addr()?; + let rpc_server = quic_rpc::transport::quinn::QuinnListener::new(endpoint)?; + let rpc_server = + quic_rpc::RpcServer::::new(rpc_server); + let blobs2 = blobs.clone(); + let rpc_task = rpc_server + .spawn_accept_loop(move |msg, chan| blobs2.clone().handle_rpc_request(msg, chan)); + let node = Self { + router, + blobs, + local_pool, + rpc_task, + }; + Ok((node, local_addr, key)) + } +} + +async fn node_and_client() -> TestResult<(Node, BlobsClient, TempDir)> { + let testdir = tempfile::tempdir()?; + let (node, addr, key) = Node::new(testdir.path().join("blobs")).await?; + let client = make_client_endpoint("127.0.0.1:0".parse().unwrap(), &[key])?; + let client = QuinnConnector::new(client, addr, "localhost".to_string()); + let client = quic_rpc::RpcClient::::new(client); + let client = iroh_blobs::rpc::client::blobs::Client::new(client); + Ok((node, client, testdir)) +} + +#[tokio::test] +async fn quinn_rpc_smoke() -> TestResult<()> { + let _ = tracing_subscriber::fmt::try_init(); + let (_node, client, _testdir) = node_and_client().await?; + let data = b"hello"; + let hash = client.add_bytes(data.to_vec()).await?.hash; + assert_eq!(hash, iroh_blobs::Hash::new(data)); + let data2 = client.read_to_bytes(hash).await?; + assert_eq!(data, &data2[..]); + Ok(()) +} + +#[tokio::test] +async fn quinn_rpc_large() -> TestResult<()> { + let _ = tracing_subscriber::fmt::try_init(); + let (_node, client, _testdir) = node_and_client().await?; + let data = vec![0; 1024 * 1024 * 16]; + let hash = client.add_bytes(data.clone()).await?.hash; + assert_eq!(hash, iroh_blobs::Hash::new(&data)); + let data2 = client.read_to_bytes(hash).await?; + assert_eq!(data, &data2[..]); + Ok(()) +}