From 430e19a9cb4642eea7c9209b97deae05120d71ef Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Thu, 14 Nov 2024 13:50:01 +0100 Subject: [PATCH] refactor(iroh): remove iroh-blobs --- Cargo.lock | 1 - iroh-cli/src/main.rs | 1 + {iroh/src/util => iroh-cli/src}/progress.rs | 0 iroh/Cargo.toml | 5 +- iroh/src/client.rs | 12 - iroh/src/lib.rs | 2 - iroh/src/node.rs | 84 +-- iroh/src/node/builder.rs | 194 +------ iroh/src/node/rpc.rs | 51 +- iroh/src/rpc_protocol.rs | 2 - iroh/src/util.rs | 1 - iroh/tests/batch.rs | 247 --------- iroh/tests/gc.rs | 459 ----------------- iroh/tests/provide.rs | 542 -------------------- 14 files changed, 48 insertions(+), 1553 deletions(-) rename {iroh/src/util => iroh-cli/src}/progress.rs (100%) delete mode 100644 iroh/tests/batch.rs delete mode 100644 iroh/tests/gc.rs delete mode 100644 iroh/tests/provide.rs diff --git a/Cargo.lock b/Cargo.lock index ea7e67a195c..ed0a932531c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2620,7 +2620,6 @@ dependencies = [ "indicatif", "iroh", "iroh-base", - "iroh-blobs", "iroh-io", "iroh-metrics", "iroh-net", diff --git a/iroh-cli/src/main.rs b/iroh-cli/src/main.rs index 24e8b2e3d04..4fefb80b5fa 100644 --- a/iroh-cli/src/main.rs +++ b/iroh-cli/src/main.rs @@ -6,6 +6,7 @@ use clap::Parser; mod commands; mod config; mod logging; +mod progress; use crate::commands::Cli; diff --git a/iroh/src/util/progress.rs b/iroh-cli/src/progress.rs similarity index 100% rename from iroh/src/util/progress.rs rename to iroh-cli/src/progress.rs diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 17e650210a4..94dc630c192 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -37,7 +37,6 @@ genawaiter = { version = "0.99", default-features = false, features = [ "futures03", ] } hex = { version = "0.4.3" } -iroh-blobs = { version = "0.28.0", features = ["downloader"] } iroh-base = { version = "0.28.0", features = ["key"] } iroh-io = { version = "0.6.0", features = ["stats"] } iroh-metrics = { version = "0.28.0", optional = true } @@ -81,8 +80,8 @@ serde-error = "0.1.3" [features] default = ["metrics", "fs-store"] -metrics = ["iroh-metrics", "iroh-blobs/metrics"] -fs-store = ["iroh-blobs/fs-store"] +metrics = ["iroh-metrics"] +fs-store = [] test = [] examples = ["dep:clap", "dep:indicatif"] discovery-local-network = [ diff --git a/iroh/src/client.rs b/iroh/src/client.rs index 6fdbe639a47..1098f578148 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -14,8 +14,6 @@ pub use crate::rpc_protocol::RpcService; mod quic; -pub use iroh_blobs::rpc::client::{blobs, tags}; - pub use self::net::NodeStatus; pub(crate) use self::quic::{connect_raw as quic_connect_raw, RPC_ALPN}; pub mod net; @@ -57,16 +55,6 @@ impl Iroh { self.rpc.clone() } - /// Returns the blobs client. - pub fn blobs(&self) -> blobs::Client { - blobs::Client::new(self.rpc.clone().map().boxed()) - } - - /// Returns the tags client. - pub fn tags(&self) -> tags::Client { - tags::Client::new(self.rpc.clone().map().boxed()) - } - /// Returns the net client. pub fn net(&self) -> &net::Client { net::Client::ref_cast(&self.rpc) diff --git a/iroh/src/lib.rs b/iroh/src/lib.rs index e5004e72e4d..d5662459df8 100644 --- a/iroh/src/lib.rs +++ b/iroh/src/lib.rs @@ -91,8 +91,6 @@ #[doc(inline)] pub use iroh_base as base; #[doc(inline)] -pub use iroh_blobs as blobs; -#[doc(inline)] pub use iroh_net as net; #[doc(inline)] pub use iroh_router as router; diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 7785ca95c1f..4753f895259 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -38,7 +38,6 @@ use std::{ collections::BTreeSet, fmt::Debug, - marker::PhantomData, net::SocketAddr, path::{Path, PathBuf}, sync::Arc, @@ -49,11 +48,6 @@ use anyhow::{anyhow, Result}; use futures_lite::StreamExt; use futures_util::future::{MapErr, Shared}; use iroh_base::key::PublicKey; -use iroh_blobs::{ - net_protocol::Blobs as BlobsProtocol, - store::Store as BaoStore, - util::local_pool::{LocalPool, LocalPoolHandle}, -}; use iroh_net::{ endpoint::{DirectAddrsStream, RemoteInfo}, AddrInfo, Endpoint, NodeAddr, @@ -73,9 +67,7 @@ mod rpc_status; pub(crate) use self::rpc::RpcResult; pub use self::{ - builder::{ - Builder, DiscoveryConfig, GcPolicy, ProtocolBuilder, StorageConfig, DEFAULT_RPC_ADDR, - }, + builder::{Builder, DiscoveryConfig, ProtocolBuilder, StorageConfig, DEFAULT_RPC_ADDR}, rpc_status::RpcStatus, }; @@ -101,8 +93,8 @@ pub type IrohServerEndpoint = quic_rpc::transport::boxed::BoxedListener< /// await the [`Node`] struct directly, it will complete when the task completes. If /// this is dropped the node task is not stopped but keeps running. #[derive(Debug, Clone)] -pub struct Node { - inner: Arc>, +pub struct Node { + inner: Arc, // `Node` needs to be `Clone + Send`, and we need to `task.await` in its `shutdown()` impl. // So we need // - `Shared` so we can `task.await` from all `Node` clones @@ -116,43 +108,37 @@ pub struct Node { pub(crate) type JoinErrToStr = Box String + Send + Sync + 'static>; #[derive(derive_more::Debug)] -struct NodeInner { - db: PhantomData, +struct NodeInner { rpc_addr: Option, endpoint: Endpoint, cancel_token: CancellationToken, client: crate::client::Iroh, - local_pool_handle: LocalPoolHandle, } /// In memory node. -pub type MemNode = Node; +#[deprecated] +pub type MemNode = Node; /// Persistent node. -pub type FsNode = Node; +#[deprecated] +pub type FsNode = Node; -impl MemNode { +impl Node { /// Returns a new builder for the [`Node`], by default configured to run in memory. /// /// Once done with the builder call [`Builder::spawn`] to create the node. - pub fn memory() -> Builder { - Builder::default() + pub fn memory() -> Builder { + Builder::memory() } -} -impl FsNode { /// Returns a new builder for the [`Node`], configured to persist all data /// from the given path. /// /// Once done with the builder call [`Builder::spawn`] to create the node. - pub async fn persistent( - root: impl AsRef, - ) -> Result> { - Builder::default().persist(root).await + pub async fn persistent(root: impl AsRef) -> Result { + Builder::memory().persist(root).await } -} -impl Node { /// Returns the [`Endpoint`] of the node. /// /// This can be used to establish connections to other nodes under any @@ -196,11 +182,6 @@ impl Node { &self.inner.client } - /// Returns a reference to the used `LocalPoolHandle`. - pub fn local_pool_handle(&self) -> &LocalPoolHandle { - &self.inner.local_pool_handle - } - /// Get the relay server we are connected to. pub fn home_relay(&self) -> Option { self.inner.endpoint.home_relay() @@ -243,7 +224,7 @@ impl Node { } } -impl std::ops::Deref for Node { +impl std::ops::Deref for Node { type Target = crate::client::Iroh; fn deref(&self) -> &Self::Target { @@ -251,7 +232,7 @@ impl std::ops::Deref for Node { } } -impl NodeInner { +impl NodeInner { async fn local_endpoint_addresses(&self) -> Result> { let endpoints = self .endpoint @@ -268,10 +249,7 @@ impl NodeInner { external_rpc: IrohServerEndpoint, internal_rpc: IrohServerEndpoint, router: Router, - gc_policy: GcPolicy, - gc_done_callback: Option>, nodes_data_path: Option, - local_pool: LocalPool, ) { let (ipv4, ipv6) = self.endpoint.bound_sockets(); debug!( @@ -287,37 +265,6 @@ impl NodeInner { let external_rpc = RpcServer::new(external_rpc); let internal_rpc = RpcServer::new(internal_rpc); - // Spawn a task for the garbage collection. - if let GcPolicy::Interval(gc_period) = gc_policy { - let router = router.clone(); - let handle = local_pool.spawn(move || async move { - let blobs = router - .get_protocol::>(iroh_blobs::protocol::ALPN) - .expect("missing blobs"); - - blobs - .store() - .gc_run( - iroh_blobs::store::GcConfig { - period: gc_period, - done_callback: gc_done_callback, - }, - || async move { BTreeSet::default() }, - ) - .await; - }); - // We cannot spawn tasks that run on the local pool directly into the join set, - // so instead we create a new task that supervises the local task. - join_set.spawn({ - async move { - if let Err(err) = handle.await { - return Err(anyhow::Error::from(err)); - } - Ok(()) - } - }); - } - if let Some(nodes_data_path) = nodes_data_path { let ep = self.endpoint.clone(); let token = self.cancel_token.clone(); @@ -419,7 +366,6 @@ impl NodeInner { // Abort remaining local tasks. tracing::info!("Shutting down local pool"); - local_pool.shutdown().await; } } diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index 8de15131f00..ca18703683d 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -9,13 +9,6 @@ use anyhow::{Context, Result}; use futures_lite::StreamExt; use futures_util::{FutureExt as _, TryFutureExt as _}; use iroh_base::key::SecretKey; -use iroh_blobs::{ - downloader::Downloader, - net_protocol::Blobs as BlobsProtocol, - provider::EventSender, - store::{Map, Store as BaoStore}, - util::local_pool::{self, LocalPool, LocalPoolHandle, PanicMode}, -}; #[cfg(not(test))] use iroh_net::discovery::local_swarm_discovery::LocalSwarmDiscovery; use iroh_net::{ @@ -26,10 +19,9 @@ use iroh_net::{ }; use iroh_router::{ProtocolHandler, RouterBuilder}; use quic_rpc::transport::{boxed::BoxableListener, quinn::QuinnListener}; -use serde::{Deserialize, Serialize}; use tokio::task::JoinError; use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle}; -use tracing::{debug, error_span, trace, Instrument}; +use tracing::{error_span, trace, Instrument}; use super::{rpc_status::RpcStatus, IrohServerEndpoint, JoinErrToStr, Node, NodeInner}; use crate::{ @@ -45,9 +37,6 @@ pub const DEFAULT_BIND_PORT: u16 = 11204; /// How long we wait at most for some endpoints to be discovered. const ENDPOINT_WAIT: Duration = Duration::from_secs(5); -/// Default interval between GC runs. -const DEFAULT_GC_INTERVAL: Duration = Duration::from_secs(60 * 5); - /// The default bind address for the iroh IPv4 socket. pub const DEFAULT_BIND_ADDR_V4: SocketAddrV4 = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_BIND_PORT); @@ -80,28 +69,19 @@ pub const DEFAULT_BIND_ADDR_V6: SocketAddrV6 = /// /// [number 0]: https://n0.computer #[derive(derive_more::Debug)] -pub struct Builder -where - D: Map, -{ +pub struct Builder { storage: StorageConfig, addr_v4: SocketAddrV4, addr_v6: SocketAddrV6, secret_key: SecretKey, rpc_endpoint: IrohServerEndpoint, rpc_addr: Option, - blobs_store: D, keylog: bool, relay_mode: RelayMode, - gc_policy: GcPolicy, dns_resolver: Option, node_discovery: DiscoveryConfig, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: bool, - /// Callback to register when a gc loop is done - #[debug("callback")] - gc_done_callback: Option>, - blob_events: EventSender, transport_config: Option, } @@ -191,8 +171,9 @@ fn mk_external_rpc() -> IrohServerEndpoint { quic_rpc::transport::boxed::BoxedListener::new(DummyServerEndpoint) } -impl Default for Builder { - fn default() -> Self { +impl Builder { + /// Creates a default node builder with in memory configuration. + pub fn memory() -> Self { // Use staging in testing let relay_mode = match force_staging_infra() { true => RelayMode::Staging, @@ -204,26 +185,20 @@ impl Default for Builder { addr_v4: DEFAULT_BIND_ADDR_V4, addr_v6: DEFAULT_BIND_ADDR_V6, secret_key: SecretKey::generate(), - blobs_store: Default::default(), keylog: false, relay_mode, dns_resolver: None, rpc_endpoint: mk_external_rpc(), rpc_addr: None, - gc_policy: GcPolicy::Disabled, node_discovery: Default::default(), #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, - gc_done_callback: None, - blob_events: Default::default(), transport_config: None, } } -} -impl Builder { /// Creates a new builder for [`Node`] using the given databases. - pub fn with_db_and_store(blobs_store: D, storage: StorageConfig) -> Self { + pub fn with_db_and_store(storage: StorageConfig) -> Self { // Use staging in testing let relay_mode = match force_staging_infra() { true => RelayMode::Staging, @@ -235,51 +210,23 @@ impl Builder { addr_v4: SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_BIND_PORT), addr_v6: SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, DEFAULT_BIND_PORT + 1, 0, 0), secret_key: SecretKey::generate(), - blobs_store, keylog: false, relay_mode, dns_resolver: None, rpc_endpoint: mk_external_rpc(), rpc_addr: None, - gc_policy: GcPolicy::Disabled, node_discovery: Default::default(), #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, - gc_done_callback: None, - blob_events: Default::default(), transport_config: None, } } } -impl Builder -where - D: BaoStore, -{ - /// Configure a blob events sender. This will replace the previous blob - /// event sender. By default, no events are sent. - /// - /// To define an event sender, implement the [`iroh_blobs::provider::CustomEventSender`] trait. - pub fn blobs_events(mut self, blob_events: impl Into) -> Self { - self.blob_events = blob_events.into(); - self - } - +impl Builder { /// Persist all node data in the provided directory. - pub async fn persist( - self, - root: impl AsRef, - ) -> Result> { + pub async fn persist(self, root: impl AsRef) -> Result { let root = root.as_ref(); - let blob_dir = IrohPaths::BaoStoreDir.with_root(root); - - tokio::fs::create_dir_all(&blob_dir).await?; - let blobs_store = iroh_blobs::store::fs::Store::load(&blob_dir) - .await - .with_context(|| { - format!("Failed to load blobs database from {}", blob_dir.display()) - })?; - let secret_key_path = IrohPaths::SecretKey.with_root(root); let secret_key = load_secret_key(secret_key_path).await?; @@ -288,18 +235,14 @@ where addr_v4: self.addr_v4, addr_v6: self.addr_v6, secret_key, - blobs_store, keylog: self.keylog, rpc_endpoint: self.rpc_endpoint, rpc_addr: self.rpc_addr, relay_mode: self.relay_mode, dns_resolver: self.dns_resolver, - gc_policy: self.gc_policy, node_discovery: self.node_discovery, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, - gc_done_callback: self.gc_done_callback, - blob_events: self.blob_events, transport_config: self.transport_config, }) } @@ -314,12 +257,12 @@ where } /// Configure the default iroh rpc endpoint, on the default address. - pub async fn enable_rpc(self) -> Result> { + pub async fn enable_rpc(self) -> Result { self.enable_rpc_with_addr(DEFAULT_RPC_ADDR).await } /// Configure the default iroh rpc endpoint. - pub async fn enable_rpc_with_addr(self, mut rpc_addr: SocketAddr) -> Result> { + pub async fn enable_rpc_with_addr(self, mut rpc_addr: SocketAddr) -> Result { let (ep, actual_rpc_port) = make_rpc_endpoint(&self.secret_key, rpc_addr)?; rpc_addr.set_port(actual_rpc_port); @@ -336,14 +279,6 @@ where }) } - /// Sets the garbage collection policy. - /// - /// By default garbage collection is disabled. - pub fn gc_policy(mut self, gc_policy: GcPolicy) -> Self { - self.gc_policy = gc_policy; - self - } - /// Sets the relay servers to assist in establishing connectivity. /// /// Relay servers are used to discover other nodes by `PublicKey` and also help @@ -454,14 +389,6 @@ where self } - /// Register a callback for when GC is done. - #[cfg(any(test, feature = "test-utils"))] - #[cfg_attr(iroh_docsrs, doc(cfg(any(test, feature = "test-utils"))))] - pub fn register_gc_done_cb(mut self, cb: Box) -> Self { - self.gc_done_callback.replace(cb); - self - } - /// Whether to log the SSL pre-master key. /// /// If `true` and the `SSLKEYLOGFILE` environment variable is the path to a file this @@ -477,7 +404,7 @@ where /// This will create the underlying network server and spawn a tokio task accepting /// connections. The returned [`Node`] can be used to control the task as well as /// get information about it. - pub async fn spawn(self) -> Result> { + pub async fn spawn(self) -> Result { let unspawned_node = self.build().await?; unspawned_node.spawn().await } @@ -486,24 +413,8 @@ where /// /// Returns a [`ProtocolBuilder`], on which custom protocols can be registered with /// [`ProtocolBuilder::accept`]. To spawn the node, call [`ProtocolBuilder::spawn`]. - pub async fn build(self) -> Result> { - // Clone the blob store to shutdown in case of error. - let blobs_store = self.blobs_store.clone(); - match self.build_inner().await { - Ok(node) => Ok(node), - Err(err) => { - blobs_store.shutdown().await; - Err(err) - } - } - } - - async fn build_inner(self) -> Result> { + pub async fn build(self) -> Result { trace!("building node"); - let lp = LocalPool::new(local_pool::Config { - panic_mode: PanicMode::LogAndContinue, - ..Default::default() - }); let (endpoint, nodes_data_path) = { let discovery: Option> = match self.node_discovery { DiscoveryConfig::None => None, @@ -590,9 +501,6 @@ where let addr = endpoint.node_addr().await?; trace!("endpoint address: {addr:?}"); - // Initialize the downloader. - let downloader = Downloader::new(self.blobs_store.clone(), endpoint.clone(), lp.clone()); - // Initialize the internal RPC connection. let (internal_rpc, controller) = quic_rpc::transport::flume::channel(32); let internal_rpc = quic_rpc::transport::boxed::BoxedListener::new(internal_rpc); @@ -603,11 +511,9 @@ where let inner = Arc::new(NodeInner { rpc_addr: self.rpc_addr, - db: Default::default(), endpoint: endpoint.clone(), client, cancel_token: CancellationToken::new(), - local_pool_handle: lp.handle().clone(), }); let protocol_builder = ProtocolBuilder { @@ -615,18 +521,9 @@ where router: RouterBuilder::new(endpoint), internal_rpc, external_rpc: self.rpc_endpoint, - gc_policy: self.gc_policy, - gc_done_callback: self.gc_done_callback, nodes_data_path, - local_pool: lp, }; - let protocol_builder = protocol_builder.register_iroh_protocols( - self.blob_events, - self.blobs_store, - downloader, - ); - Ok(protocol_builder) } } @@ -640,19 +537,15 @@ where /// Note that RPC calls performed with client returned from [`Self::client`] will not complete /// until the node is spawned. #[derive(derive_more::Debug)] -pub struct ProtocolBuilder { - inner: Arc>, +pub struct ProtocolBuilder { + inner: Arc, internal_rpc: IrohServerEndpoint, external_rpc: IrohServerEndpoint, router: RouterBuilder, - #[debug("callback")] - gc_done_callback: Option>, - gc_policy: GcPolicy, nodes_data_path: Option, - local_pool: LocalPool, } -impl ProtocolBuilder { +impl ProtocolBuilder { /// Registers a protocol handler for incoming connections. /// /// Use this to register custom protocols onto the iroh node. Whenever a new connection for @@ -717,11 +610,6 @@ impl ProtocolBuilder { &self.inner.endpoint } - /// Returns a reference to the used [`LocalPoolHandle`]. - pub fn local_pool_handle(&self) -> &LocalPoolHandle { - self.local_pool.handle() - } - /// Returns a protocol handler for an ALPN. /// /// This downcasts to the concrete type and returns `None` if the handler registered for `alpn` @@ -730,37 +618,14 @@ impl ProtocolBuilder { self.router.get_protocol::

(alpn) } - /// Registers the core iroh protocols (blobs, gossip, docs). - fn register_iroh_protocols( - mut self, - blob_events: EventSender, - store: D, - downloader: Downloader, - ) -> Self { - // Register blobs. - let blobs_proto = BlobsProtocol::new_with_events( - store, - self.local_pool_handle().clone(), - blob_events, - downloader, - self.endpoint().clone(), - ); - self = self.accept(iroh_blobs::protocol::ALPN.to_vec(), Arc::new(blobs_proto)); - - self - } - /// Spawns the node and starts accepting connections. - pub async fn spawn(self) -> Result> { + pub async fn spawn(self) -> Result { let Self { inner, internal_rpc, external_rpc, router, - gc_done_callback, - gc_policy, nodes_data_path, - local_pool: rt, } = self; let node_id = inner.endpoint.node_id(); @@ -769,15 +634,7 @@ impl ProtocolBuilder { // Spawn the main task and store it in the node for structured termination in shutdown. let fut = inner .clone() - .run( - external_rpc, - internal_rpc, - router.clone(), - gc_policy, - gc_done_callback, - nodes_data_path, - rt, - ) + .run(external_rpc, internal_rpc, router.clone(), nodes_data_path) .instrument(error_span!("node", me=%node_id.fmt_short())); let task = tokio::task::spawn(fut); @@ -811,23 +668,6 @@ impl ProtocolBuilder { } } -/// Policy for garbage collection. -// Please note that this is documented in the `iroh.computer` repository under -// `src/app/docs/reference/config/page.mdx`. Any changes to this need to be updated there. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -pub enum GcPolicy { - /// Garbage collection is disabled. - Disabled, - /// Garbage collection is run at the given interval. - Interval(Duration), -} - -impl Default for GcPolicy { - fn default() -> Self { - Self::Interval(DEFAULT_GC_INTERVAL) - } -} - const DEFAULT_RPC_PORT: u16 = 0x1337; const MAX_RPC_STREAMS: u32 = 1024; diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 47fc2cf0302..479d010ae2a 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -2,10 +2,6 @@ use std::{fmt::Debug, sync::Arc, time::Duration}; use anyhow::Result; use futures_lite::Stream; -use iroh_blobs::{ - net_protocol::Blobs as BlobsProtocol, store::Store as BaoStore, - util::local_pool::LocalPoolHandle, -}; use iroh_net::{NodeAddr, NodeId}; use iroh_router::Router; use quic_rpc::server::{RpcChannel, RpcServerError}; @@ -33,26 +29,23 @@ pub(crate) type RpcError = serde_error::Error; pub(crate) type RpcResult = Result; #[derive(Debug, Clone)] -pub(crate) struct Handler { - pub(crate) inner: Arc>, - pub(crate) router: Router, +pub(crate) struct Handler { + pub(crate) inner: Arc, + pub(crate) _router: Router, } -impl Handler { - pub fn new(inner: Arc>, router: Router) -> Self { - Self { inner, router } +impl Handler { + pub fn new(inner: Arc, router: Router) -> Self { + Self { + inner, + _router: router, + } } } -impl Handler { - fn blobs(&self) -> Arc> { - self.router - .get_protocol::>(iroh_blobs::protocol::ALPN) - .expect("missing blobs") - } - +impl Handler { pub(crate) fn spawn_rpc_request( - inner: Arc>, + inner: Arc, join_set: &mut JoinSet>, accepting: quic_rpc::server::Accepting, router: Router, @@ -102,17 +95,6 @@ impl Handler { } } - async fn handle_blobs_and_tags_request( - self, - msg: iroh_blobs::rpc::proto::Request, - chan: RpcChannel, - ) -> Result<(), RpcServerError> { - self.blobs() - .handle_rpc_request(msg, chan) - .await - .map_err(|e| e.errors_into()) - } - pub(crate) async fn handle_rpc_request( self, msg: Request, @@ -123,10 +105,6 @@ impl Handler { match msg { Net(msg) => self.handle_net_request(msg, chan).await, Node(msg) => self.handle_node_request(msg, chan).await, - BlobsAndTags(msg) => { - self.handle_blobs_and_tags_request(msg, chan.map().boxed()) - .await - } } } @@ -205,10 +183,6 @@ impl Handler { }) } - fn local_pool_handle(&self) -> LocalPoolHandle { - self.inner.local_pool_handle.clone() - } - fn remote_infos_iter( self, _: RemoteInfosIterRequest, @@ -217,7 +191,8 @@ impl Handler { let (tx, rx) = async_channel::bounded(32); let mut infos: Vec<_> = self.inner.endpoint.remote_info_iter().collect(); infos.sort_by_key(|n| n.node_id.to_string()); - self.local_pool_handle().spawn_detached(|| async move { + // TODO: track tasks + tokio::task::spawn(async move { for info in infos { tx.send(Ok(RemoteInfosIterResponse { info })).await.ok(); } diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index a2597a1764c..74d3a9fc4ba 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -31,7 +31,6 @@ pub struct RpcService; pub enum Request { Node(node::Request), Net(net::Request), - BlobsAndTags(iroh_blobs::rpc::proto::Request), } /// The response enum, listing all possible responses. @@ -41,7 +40,6 @@ pub enum Request { pub enum Response { Node(node::Response), Net(net::Response), - BlobsAndTags(iroh_blobs::rpc::proto::Response), } impl quic_rpc::Service for RpcService { diff --git a/iroh/src/util.rs b/iroh/src/util.rs index a4c57778ed3..9c70fc741f9 100644 --- a/iroh/src/util.rs +++ b/iroh/src/util.rs @@ -3,4 +3,3 @@ pub mod fs; pub mod io; pub mod path; -pub mod progress; diff --git a/iroh/tests/batch.rs b/iroh/tests/batch.rs deleted file mode 100644 index c3289d03e6c..00000000000 --- a/iroh/tests/batch.rs +++ /dev/null @@ -1,247 +0,0 @@ -use std::{io, time::Duration}; - -use bao_tree::blake3; -use bytes::Bytes; -use futures_lite::StreamExt; -use iroh::{ - client::blobs::{AddDirOpts, WrapOption}, - node::GcPolicy, -}; -use iroh_blobs::store::mem::Store; - -async fn create_node() -> anyhow::Result<(iroh::node::Node, async_channel::Receiver<()>)> { - let (gc_send, gc_recv) = async_channel::unbounded(); - let node = iroh::node::Node::memory() - .gc_policy(GcPolicy::Interval(Duration::from_millis(10))) - .register_gc_done_cb(Box::new(move || { - gc_send.send_blocking(()).ok(); - })) - .spawn() - .await?; - Ok((node, gc_recv)) -} - -async fn wait_for_gc(chan: &mut async_channel::Receiver<()>) { - let _ = chan.drain(); - for _ in 0..5 { - chan.recv().await.unwrap(); - } -} - -/// Test that add_bytes adds the right data -#[tokio::test] -async fn add_bytes() -> anyhow::Result<()> { - let (node, _) = create_node().await?; - let client = &node.client().blobs(); - let batch = client.batch().await?; - let data: &[u8] = b"test"; - let tag = batch.add_bytes(data).await?; - let hash = *tag.hash(); - let actual = client.read_to_bytes(hash).await?; - assert_eq!(hash, blake3::hash(data).into()); - assert_eq!(actual.as_ref(), data); - Ok(()) -} - -/// Test that add_bytes adds the right data -#[tokio::test] -async fn add_stream() -> anyhow::Result<()> { - let (node, _) = create_node().await?; - let client = &node.client().blobs(); - let batch = client.batch().await?; - let data: &[u8] = b"test"; - let data_stream = futures_lite::stream::iter([io::Result::Ok(Bytes::copy_from_slice(data))]); - let tag = batch.add_stream(data_stream).await?; - let hash = *tag.hash(); - let actual = client.read_to_bytes(hash).await?; - assert_eq!(hash, blake3::hash(data).into()); - assert_eq!(actual.as_ref(), data); - Ok(()) -} - -/// Test that add_file adds the right data -#[tokio::test] -async fn add_file() -> anyhow::Result<()> { - let (node, _) = create_node().await?; - let client = &node.client().blobs(); - let batch = client.batch().await?; - let dir = tempfile::tempdir()?; - let temp_path = dir.path().join("test"); - std::fs::write(&temp_path, b"test")?; - let (tag, _) = batch.add_file(temp_path).await?; - let hash = *tag.hash(); - let actual = client.read_to_bytes(hash).await?; - assert_eq!(hash, blake3::hash(b"test").into()); - assert_eq!(actual.as_ref(), b"test"); - Ok(()) -} - -/// Tests that add_dir adds the right data -#[tokio::test] -async fn add_dir() -> anyhow::Result<()> { - let (node, _) = create_node().await?; - let client = &node.client().blobs(); - let batch = client.batch().await?; - let dir = tempfile::tempdir()?; - let data: [(&str, &[u8]); 2] = [("test1", b"test1"), ("test2", b"test2")]; - for (name, content) in &data { - let temp_path = dir.path().join(name); - std::fs::write(&temp_path, content)?; - } - let tag = batch.add_dir(dir.path().to_owned()).await?; - assert!(client.has(*tag.hash()).await?); - for (_, content) in &data { - let hash = blake3::hash(content).into(); - let data = client.read_to_bytes(hash).await?; - assert_eq!(data.as_ref(), *content); - } - Ok(()) -} - -/// Tests that add_dir adds the right data -#[tokio::test] -async fn add_dir_single_file() -> anyhow::Result<()> { - let (node, _) = create_node().await?; - let client = &node.client().blobs(); - let batch = client.batch().await?; - let dir = tempfile::tempdir()?; - let temp_path = dir.path().join("test"); - let data: &[u8] = b"test"; - std::fs::write(&temp_path, data)?; - let tag = batch - .add_dir_with_opts( - temp_path, - AddDirOpts { - wrap: WrapOption::Wrap { name: None }, - ..Default::default() - }, - ) - .await?; - assert!(client.read_to_bytes(*tag.hash()).await.is_ok()); - let hash = blake3::hash(data).into(); - let actual_data = client.read_to_bytes(hash).await?; - assert_eq!(actual_data.as_ref(), data); - Ok(()) -} - -#[tokio::test] -async fn batch_drop() -> anyhow::Result<()> { - let (node, mut gc) = create_node().await?; - let client = &node.client().blobs(); - let batch = client.batch().await?; - let data: &[u8] = b"test"; - let tag = batch.add_bytes(data).await?; - let hash = *tag.hash(); - // Check that the store has the data and that it is protected from gc - wait_for_gc(&mut gc).await; - assert!(client.has(hash).await?); - drop(batch); - // Check that the store drops the data when the temp tag gets dropped - wait_for_gc(&mut gc).await; - assert!(!client.has(hash).await?); - Ok(()) -} - -/// This checks that dropping a tag makes the data eligible for garbage collection. -/// -/// Note that we might change this behavior in the future and only drop the data -/// once the batch is dropped. -#[tokio::test] -async fn tag_drop_raw() -> anyhow::Result<()> { - let (node, mut gc) = create_node().await?; - let client = &node.client().blobs(); - let batch = client.batch().await?; - let data: &[u8] = b"test"; - let tag = batch.add_bytes(data).await?; - let hash = *tag.hash(); - // Check that the store has the data and that it is protected from gc - wait_for_gc(&mut gc).await; - assert!(client.has(hash).await?); - drop(tag); - // Check that the store drops the data when the temp tag gets dropped - wait_for_gc(&mut gc).await; - assert!(!client.has(hash).await?); - Ok(()) -} - -/// Tests that data is preserved if a second temp tag is created for it -/// before the first temp tag is dropped. -#[tokio::test] -async fn temp_tag_copy() -> anyhow::Result<()> { - let (node, mut gc) = create_node().await?; - let client = &node.client().blobs(); - let batch = client.batch().await?; - let data: &[u8] = b"test"; - let tag = batch.add_bytes(data).await?; - let hash = *tag.hash(); - // Check that the store has the data and that it is protected from gc - wait_for_gc(&mut gc).await; - assert!(client.has(hash).await?); - // Create an additional temp tag for the same data - let tag2 = batch.temp_tag(tag.hash_and_format()).await?; - drop(tag); - // Check that the data is still present - wait_for_gc(&mut gc).await; - assert!(client.has(hash).await?); - drop(tag2); - // Check that the data is gone since both temp tags are dropped - wait_for_gc(&mut gc).await; - assert!(!client.has(hash).await?); - Ok(()) -} - -/// Tests that temp tags work properly for hash sequences, using add_dir -/// to add the data. -/// -/// Note that we might change this behavior in the future and only drop the data -/// once the batch is dropped. -#[tokio::test] -async fn tag_drop_hashseq() -> anyhow::Result<()> { - let (node, mut gc) = create_node().await?; - let client = &node.client().blobs(); - let batch = client.batch().await?; - let dir = tempfile::tempdir()?; - let data: [(&str, &[u8]); 2] = [("test1", b"test1"), ("test2", b"test2")]; - for (name, content) in &data { - let temp_path = dir.path().join(name); - std::fs::write(&temp_path, content)?; - } - let tag = batch.add_dir(dir.path().to_owned()).await?; - let hash = *tag.hash(); - // weird signature to avoid async move issues - let check_present = |present: &'static bool| async { - assert!(client.has(hash).await? == *present); - for (_, content) in &data { - let hash = blake3::hash(content).into(); - assert!(client.has(hash).await? == *present); - } - anyhow::Ok(()) - }; - // Check that the store has the data immediately after adding it - check_present(&true).await?; - // Check that it is protected from gc - wait_for_gc(&mut gc).await; - check_present(&true).await?; - drop(tag); - // Check that the store drops the data when the temp tag gets dropped - wait_for_gc(&mut gc).await; - check_present(&false).await?; - Ok(()) -} - -/// This checks that dropping a tag makes the data eligible for garbage collection. -/// -/// Note that we might change this behavior in the future and only drop the data -/// once the batch is dropped. -#[tokio::test] -async fn wrong_batch() -> anyhow::Result<()> { - let (node, _) = create_node().await?; - let client = &node.client().blobs(); - let batch = client.batch().await?; - let data: &[u8] = b"test"; - let tag = batch.add_bytes(data).await?; - drop(batch); - let batch = client.batch().await?; - assert!(batch.persist(tag).await.is_err()); - Ok(()) -} diff --git a/iroh/tests/gc.rs b/iroh/tests/gc.rs deleted file mode 100644 index e2aa6bf71cc..00000000000 --- a/iroh/tests/gc.rs +++ /dev/null @@ -1,459 +0,0 @@ -use std::{ - io::{Cursor, Write}, - time::Duration, -}; - -use anyhow::Result; -use bao_tree::{blake3, io::sync::Outboard, ChunkRanges}; -use bytes::Bytes; -use iroh::node::{self, Node}; -use iroh_blobs::{ - hashseq::HashSeq, - store::{EntryStatus, MapMut, Store}, - util::Tag, - BlobFormat, HashAndFormat, IROH_BLOCK_SIZE, -}; -use rand::RngCore; - -pub fn create_test_data(size: usize) -> Bytes { - let mut rand = rand::thread_rng(); - let mut res = vec![0u8; size]; - rand.fill_bytes(&mut res); - res.into() -} - -/// Take some data and encode it -pub fn simulate_remote(data: &[u8]) -> (blake3::Hash, Cursor) { - let outboard = bao_tree::io::outboard::PostOrderMemOutboard::create(data, IROH_BLOCK_SIZE); - let mut encoded = Vec::new(); - encoded - .write_all(outboard.tree.size().to_le_bytes().as_ref()) - .unwrap(); - bao_tree::io::sync::encode_ranges_validated(data, &outboard, &ChunkRanges::all(), &mut encoded) - .unwrap(); - let hash = outboard.root(); - (hash, Cursor::new(encoded.into())) -} - -/// Wrap a bao store in a node that has gc enabled. -async fn wrap_in_node( - bao_store: S, - gc_period: Duration, -) -> (Node, async_channel::Receiver<()>) -where - S: iroh_blobs::store::Store, -{ - let (gc_send, gc_recv) = async_channel::unbounded(); - let node = node::Builder::with_db_and_store(bao_store, iroh::node::StorageConfig::Mem) - .gc_policy(iroh::node::GcPolicy::Interval(gc_period)) - .register_gc_done_cb(Box::new(move || { - gc_send.send_blocking(()).ok(); - })) - .spawn() - .await - .unwrap(); - (node, gc_recv) -} - -async fn gc_test_node() -> ( - Node, - iroh_blobs::store::mem::Store, - async_channel::Receiver<()>, -) { - let bao_store = iroh_blobs::store::mem::Store::new(); - let (node, gc_recv) = wrap_in_node(bao_store.clone(), Duration::from_millis(500)).await; - (node, bao_store, gc_recv) -} - -async fn step(evs: &async_channel::Receiver<()>) { - // drain the event queue, we want a new GC - while evs.try_recv().is_ok() {} - // wait for several GC cycles - for _ in 0..3 { - evs.recv().await.unwrap(); - } -} - -/// Test the absolute basics of gc, temp tags and tags for blobs. -#[tokio::test] -async fn gc_basics() -> Result<()> { - let _ = tracing_subscriber::fmt::try_init(); - let (node, bao_store, evs) = gc_test_node().await; - let data1 = create_test_data(1234); - let tt1 = bao_store.import_bytes(data1, BlobFormat::Raw).await?; - let data2 = create_test_data(5678); - let tt2 = bao_store.import_bytes(data2, BlobFormat::Raw).await?; - let h1 = *tt1.hash(); - let h2 = *tt2.hash(); - // temp tags are still there, so the entries should be there - step(&evs).await; - assert_eq!(bao_store.entry_status(&h1).await?, EntryStatus::Complete); - assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::Complete); - - // drop the first tag, the entry should be gone after some time - drop(tt1); - step(&evs).await; - assert_eq!(bao_store.entry_status(&h1).await?, EntryStatus::NotFound); - assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::Complete); - - // create an explicit tag for h1 (as raw) and then delete the temp tag. Entry should still be there. - let tag = Tag::from("test"); - bao_store - .set_tag(tag.clone(), Some(HashAndFormat::raw(h2))) - .await?; - drop(tt2); - tracing::info!("dropped tt2"); - step(&evs).await; - assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::Complete); - - // delete the explicit tag, entry should be gone - bao_store.set_tag(tag, None).await?; - step(&evs).await; - assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::NotFound); - - node.shutdown().await?; - Ok(()) -} - -/// Test gc for sequences of hashes that protect their children from deletion. -#[tokio::test] -async fn gc_hashseq_impl() -> Result<()> { - let _ = tracing_subscriber::fmt::try_init(); - let (node, bao_store, evs) = gc_test_node().await; - let data1 = create_test_data(1234); - let tt1 = bao_store.import_bytes(data1, BlobFormat::Raw).await?; - let data2 = create_test_data(5678); - let tt2 = bao_store.import_bytes(data2, BlobFormat::Raw).await?; - let seq = vec![*tt1.hash(), *tt2.hash()] - .into_iter() - .collect::(); - let ttr = bao_store - .import_bytes(seq.into_inner(), BlobFormat::HashSeq) - .await?; - let h1 = *tt1.hash(); - let h2 = *tt2.hash(); - let hr = *ttr.hash(); - drop(tt1); - drop(tt2); - - // there is a temp tag for the link seq, so it and its entries should be there - step(&evs).await; - assert_eq!(bao_store.entry_status(&h1).await?, EntryStatus::Complete); - assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::Complete); - assert_eq!(bao_store.entry_status(&hr).await?, EntryStatus::Complete); - - // make a permanent tag for the link seq, then delete the temp tag. Entries should still be there. - let tag = Tag::from("test"); - bao_store - .set_tag(tag.clone(), Some(HashAndFormat::hash_seq(hr))) - .await?; - drop(ttr); - step(&evs).await; - assert_eq!(bao_store.entry_status(&h1).await?, EntryStatus::Complete); - assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::Complete); - assert_eq!(bao_store.entry_status(&hr).await?, EntryStatus::Complete); - - // change the permanent tag to be just for the linkseq itself as a blob. Only the linkseq should be there, not the entries. - bao_store - .set_tag(tag.clone(), Some(HashAndFormat::raw(hr))) - .await?; - step(&evs).await; - assert_eq!(bao_store.entry_status(&h1).await?, EntryStatus::NotFound); - assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::NotFound); - assert_eq!(bao_store.entry_status(&hr).await?, EntryStatus::Complete); - - // delete the permanent tag, everything should be gone - bao_store.set_tag(tag, None).await?; - step(&evs).await; - assert_eq!(bao_store.entry_status(&h1).await?, EntryStatus::NotFound); - assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::NotFound); - assert_eq!(bao_store.entry_status(&hr).await?, EntryStatus::NotFound); - - node.shutdown().await?; - Ok(()) -} - -#[cfg(feature = "fs-store")] -mod file { - use std::{io, path::PathBuf}; - - use bao_tree::{ - io::fsm::{BaoContentItem, ResponseDecoderNext}, - BaoTree, - }; - use iroh_blobs::{ - store::{BaoBatchWriter, ConsistencyCheckProgress, MapEntryMut, ReportLevel}, - util::progress::{AsyncChannelProgressSender, ProgressSender as _}, - TempTag, - }; - use testdir::testdir; - use tokio::io::AsyncReadExt; - - use super::*; - - fn path(root: PathBuf, suffix: &'static str) -> impl Fn(&iroh_blobs::Hash) -> PathBuf { - move |hash| root.join(format!("{}.{}", hash.to_hex(), suffix)) - } - - fn data_path(root: PathBuf) -> impl Fn(&iroh_blobs::Hash) -> PathBuf { - // this assumes knowledge of the internal directory structure of the flat store - path(root.join("data"), "data") - } - - fn outboard_path(root: PathBuf) -> impl Fn(&iroh_blobs::Hash) -> PathBuf { - // this assumes knowledge of the internal directory structure of the flat store - path(root.join("data"), "obao4") - } - - async fn check_consistency(store: &impl Store) -> anyhow::Result { - let mut max_level = ReportLevel::Trace; - let (tx, rx) = async_channel::bounded(1); - let task = tokio::task::spawn(async move { - while let Ok(ev) = rx.recv().await { - if let ConsistencyCheckProgress::Update { level, .. } = &ev { - max_level = max_level.max(*level); - } - } - }); - store - .consistency_check(false, AsyncChannelProgressSender::new(tx).boxed()) - .await?; - task.await?; - Ok(max_level) - } - - /// Test gc for sequences of hashes that protect their children from deletion. - #[tokio::test] - async fn gc_file_basics() -> Result<()> { - let _ = tracing_subscriber::fmt::try_init(); - let dir = testdir!(); - let path = data_path(dir.clone()); - let outboard_path = outboard_path(dir.clone()); - - let bao_store = iroh_blobs::store::fs::Store::load(dir.clone()).await?; - let (node, evs) = wrap_in_node(bao_store.clone(), Duration::from_millis(100)).await; - let data1 = create_test_data(10000000); - let tt1 = bao_store - .import_bytes(data1.clone(), BlobFormat::Raw) - .await?; - let data2 = create_test_data(1000000); - let tt2 = bao_store - .import_bytes(data2.clone(), BlobFormat::Raw) - .await?; - let seq = vec![*tt1.hash(), *tt2.hash()] - .into_iter() - .collect::(); - let ttr = bao_store - .import_bytes(seq.into_inner(), BlobFormat::HashSeq) - .await?; - - let h1 = *tt1.hash(); - let h2 = *tt2.hash(); - let hr = *ttr.hash(); - - // data is protected by the temp tag - step(&evs).await; - bao_store.sync().await?; - assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); - // h1 is for a giant file, so we will have both data and outboard files - assert!(path(&h1).exists()); - assert!(outboard_path(&h1).exists()); - // h2 is for a mid sized file, so we will have just the data file - assert!(path(&h2).exists()); - assert!(!outboard_path(&h2).exists()); - // hr so small that data will be inlined and outboard will not exist at all - assert!(!path(&hr).exists()); - assert!(!outboard_path(&hr).exists()); - - drop(tt1); - drop(tt2); - let tag = Tag::from("test"); - bao_store - .set_tag(tag.clone(), Some(HashAndFormat::hash_seq(*ttr.hash()))) - .await?; - drop(ttr); - - // data is now protected by a normal tag, nothing should be gone - step(&evs).await; - bao_store.sync().await?; - assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); - // h1 is for a giant file, so we will have both data and outboard files - assert!(path(&h1).exists()); - assert!(outboard_path(&h1).exists()); - // h2 is for a mid sized file, so we will have just the data file - assert!(path(&h2).exists()); - assert!(!outboard_path(&h2).exists()); - // hr so small that data will be inlined and outboard will not exist at all - assert!(!path(&hr).exists()); - assert!(!outboard_path(&hr).exists()); - - tracing::info!("changing tag from hashseq to raw, this should orphan the children"); - bao_store - .set_tag(tag.clone(), Some(HashAndFormat::raw(hr))) - .await?; - - // now only hr itself should be protected, but not its children - step(&evs).await; - bao_store.sync().await?; - assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); - // h1 should be gone - assert!(!path(&h1).exists()); - assert!(!outboard_path(&h1).exists()); - // h2 should still not be there - assert!(!path(&h2).exists()); - assert!(!outboard_path(&h2).exists()); - // hr should still not be there - assert!(!path(&hr).exists()); - assert!(!outboard_path(&hr).exists()); - - bao_store.set_tag(tag, None).await?; - step(&evs).await; - bao_store.sync().await?; - assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); - // h1 should be gone - assert!(!path(&h1).exists()); - assert!(!outboard_path(&h1).exists()); - // h2 should still not be there - assert!(!path(&h2).exists()); - assert!(!outboard_path(&h2).exists()); - // hr should still not be there - assert!(!path(&hr).exists()); - assert!(!outboard_path(&hr).exists()); - - node.shutdown().await?; - - Ok(()) - } - - /// Add a file to the store in the same way a download works. - /// - /// we know the hash in advance, create a partial entry, write the data to it and - /// the outboard file, then commit it to a complete entry. - /// - /// During this time, the partial entry is protected by a temp tag. - async fn simulate_download_partial( - bao_store: &S, - data: Bytes, - ) -> io::Result<(S::EntryMut, TempTag)> { - // simulate the remote side. - let (hash, mut response) = simulate_remote(data.as_ref()); - // simulate the local side. - // we got a hash and a response from the remote side. - let tt = bao_store.temp_tag(HashAndFormat::raw(hash.into())); - // get the size - let size = response.read_u64_le().await?; - // start reading the response - let mut reading = bao_tree::io::fsm::ResponseDecoder::new( - hash, - ChunkRanges::all(), - BaoTree::new(size, IROH_BLOCK_SIZE), - response, - ); - // create the partial entry - let entry = bao_store.get_or_create(hash.into(), size).await?; - // create the - let mut bw = entry.batch_writer().await?; - let mut buf = Vec::new(); - while let ResponseDecoderNext::More((next, res)) = reading.next().await { - let item = res?; - match &item { - BaoContentItem::Parent(_) => { - buf.push(item); - } - BaoContentItem::Leaf(_) => { - buf.push(item); - let batch = std::mem::take(&mut buf); - bw.write_batch(size, batch).await?; - } - } - reading = next; - } - bw.sync().await?; - drop(bw); - Ok((entry, tt)) - } - - async fn simulate_download_complete( - bao_store: &S, - data: Bytes, - ) -> io::Result { - let (entry, tt) = simulate_download_partial(bao_store, data).await?; - // commit the entry - bao_store.insert_complete(entry).await?; - Ok(tt) - } - - /// Test that partial files are deleted. - #[tokio::test] - async fn gc_file_partial() -> Result<()> { - let _ = tracing_subscriber::fmt::try_init(); - let dir = testdir!(); - let path = data_path(dir.clone()); - let outboard_path = outboard_path(dir.clone()); - - let bao_store = iroh_blobs::store::fs::Store::load(dir.clone()).await?; - let (node, evs) = wrap_in_node(bao_store.clone(), Duration::from_millis(10)).await; - - let data1: Bytes = create_test_data(10000000); - let (_entry, tt1) = simulate_download_partial(&bao_store, data1.clone()).await?; - drop(_entry); - let h1 = *tt1.hash(); - // partial data and outboard files should be there - step(&evs).await; - bao_store.sync().await?; - assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); - assert!(path(&h1).exists()); - assert!(outboard_path(&h1).exists()); - - drop(tt1); - // partial data and outboard files should be gone - step(&evs).await; - bao_store.sync().await?; - assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); - assert!(!path(&h1).exists()); - assert!(!outboard_path(&h1).exists()); - - node.shutdown().await?; - Ok(()) - } - - #[tokio::test] - async fn gc_file_stress() -> Result<()> { - let _ = tracing_subscriber::fmt::try_init(); - let dir = testdir!(); - - let bao_store = iroh_blobs::store::fs::Store::load(dir.clone()).await?; - let (node, evs) = wrap_in_node(bao_store.clone(), Duration::from_secs(1)).await; - - let mut deleted = Vec::new(); - let mut live = Vec::new(); - // download - for i in 0..100 { - let data: Bytes = create_test_data(16 * 1024 * 3 + 1); - let tt = simulate_download_complete(&bao_store, data).await.unwrap(); - if i % 100 == 0 { - let tag = Tag::from(format!("test{}", i)); - bao_store - .set_tag(tag.clone(), Some(HashAndFormat::raw(*tt.hash()))) - .await?; - live.push(*tt.hash()); - } else { - deleted.push(*tt.hash()); - } - } - step(&evs).await; - - for h in deleted.iter() { - assert_eq!(bao_store.entry_status(h).await?, EntryStatus::NotFound); - assert!(!dir.join(format!("data/{}.data", h.to_hex())).exists()); - } - - for h in live.iter() { - assert_eq!(bao_store.entry_status(h).await?, EntryStatus::Complete); - assert!(dir.join(format!("data/{}.data", h.to_hex())).exists()); - } - - node.shutdown().await?; - Ok(()) - } -} diff --git a/iroh/tests/provide.rs b/iroh/tests/provide.rs deleted file mode 100644 index eab8b84599d..00000000000 --- a/iroh/tests/provide.rs +++ /dev/null @@ -1,542 +0,0 @@ -use std::{ - collections::BTreeMap, - net::SocketAddr, - ops::Range, - time::{Duration, Instant}, -}; - -use anyhow::{Context, Result}; -use bao_tree::{blake3, ChunkNum, ChunkRanges}; -use bytes::Bytes; -use futures_lite::FutureExt; -use iroh::node::Builder; -use iroh_base::{node_addr::AddrInfoOptions, ticket::BlobTicket}; -use iroh_blobs::{ - format::collection::Collection, - get::{ - fsm::{self, ConnectedNext, DecodeError}, - Stats, - }, - protocol::{GetRequest, RangeSpecSeq}, - store::{MapMut, Store}, - Hash, -}; -use iroh_net::{defaults::staging::default_relay_map, key::SecretKey, NodeAddr, NodeId}; -use rand::RngCore; - -/// Create a new endpoint and dial a peer, returning the connection. -async fn dial(secret_key: SecretKey, peer: NodeAddr) -> anyhow::Result { - let endpoint = iroh_net::Endpoint::builder() - .secret_key(secret_key) - .bind() - .await?; - endpoint - .connect(peer, iroh::blobs::protocol::ALPN) - .await - .context("failed to connect to provider") -} - -fn test_node(db: D) -> Builder { - iroh::node::Builder::with_db_and_store(db, iroh::node::StorageConfig::Mem).bind_random_port() -} - -#[tokio::test] -async fn basics() -> Result<()> { - let _guard = iroh_test::logging::setup(); - transfer_data(vec![("hello_world", "hello world!".as_bytes().to_vec())]).await -} - -#[tokio::test] -async fn multi_file() -> Result<()> { - let _guard = iroh_test::logging::setup(); - - let file_opts = vec![ - ("1", 10), - ("2", 1024), - ("3", 1024 * 1024), - // overkill, but it works! Just annoying to wait for - // ("4", 1024 * 1024 * 90), - ]; - transfer_random_data(file_opts).await -} - -#[tokio::test] -async fn many_files() -> Result<()> { - let _guard = iroh_test::logging::setup(); - let num_files = [10, 100]; - for num in num_files { - println!("NUM_FILES: {num}"); - let file_opts = (0..num) - .map(|i| { - // use a long file name to test large collections - let name = i.to_string().repeat(50); - (name, 10) - }) - .collect(); - transfer_random_data(file_opts).await?; - } - Ok(()) -} - -#[tokio::test] -async fn sizes() -> Result<()> { - let _guard = iroh_test::logging::setup(); - - let sizes = [ - 0, - 10, - 100, - 1024, - 1024 * 100, - 1024 * 500, - 1024 * 1024, - 1024 * 1024 + 10, - 1024 * 1024 * 9, - ]; - - for size in sizes { - let now = Instant::now(); - transfer_random_data(vec![("hello_world", size)]).await?; - println!(" took {}ms", now.elapsed().as_millis()); - } - - Ok(()) -} - -#[tokio::test] -async fn empty_files() -> Result<()> { - // try to transfer as many files as possible without hitting a limit - // booo 400 is too small :( - let num_files = 400; - let mut file_opts = Vec::new(); - for i in 0..num_files { - file_opts.push((i.to_string(), 0)); - } - transfer_random_data(file_opts).await -} - -/// Create new get options with the given node id and addresses, using a -/// randomly generated secret key. -fn get_options( - node_id: NodeId, - addrs: impl IntoIterator, -) -> (SecretKey, NodeAddr) { - let relay_map = default_relay_map(); - let peer = iroh_net::NodeAddr::from_parts( - node_id, - relay_map.nodes().next().map(|n| n.url.clone()), - addrs, - ); - (SecretKey::generate(), peer) -} - -#[tokio::test(flavor = "multi_thread")] -async fn multiple_clients() -> Result<()> { - let content = b"hello world!"; - - let mut db = iroh_blobs::store::readonly_mem::Store::default(); - let expect_hash = db.insert(content.as_slice()); - let expect_name = "hello_world"; - let collection = Collection::from_iter([(expect_name, expect_hash)]); - let hash = db.insert_many(collection.to_blobs()).unwrap(); - let node = test_node(db).spawn().await?; - let mut tasks = Vec::new(); - for _i in 0..3 { - let file_hash: Hash = expect_hash; - let name = expect_name; - let addrs = node.local_address(); - let peer_id = node.node_id(); - let content = content.to_vec(); - - tasks.push(node.local_pool_handle().spawn(move || { - async move { - let (secret_key, peer) = get_options(peer_id, addrs); - let expected_data = &content; - let expected_name = name; - let request = GetRequest::all(hash); - let (collection, children, _stats) = - run_collection_get_request(secret_key, peer, request).await?; - assert_eq!(expected_name, &collection[0].0); - assert_eq!(&file_hash, &collection[0].1); - assert_eq!(expected_data, &children[&0]); - - anyhow::Ok(()) - } - .boxed_local() - })); - } - - futures_buffered::try_join_all(tasks).await?; - Ok(()) -} - -// Run the test creating random data for each blob, using the size specified by the file -// options -async fn transfer_random_data(file_opts: Vec<(S, usize)>) -> Result<()> -where - S: Into + std::fmt::Debug + std::cmp::PartialEq + Clone, -{ - let file_opts = file_opts - .into_iter() - .map(|(name, size)| { - let mut content = vec![0u8; size]; - rand::thread_rng().fill_bytes(&mut content); - (name, content) - }) - .collect(); - transfer_data(file_opts).await -} - -// Run the test for a vec of filenames and blob data -async fn transfer_data(file_opts: Vec<(S, Vec)>) -> Result<()> -where - S: Into + std::fmt::Debug + std::cmp::PartialEq + Clone, -{ - let mut expects = Vec::new(); - let num_blobs = file_opts.len(); - - let (mut mdb, _lookup) = iroh_blobs::store::readonly_mem::Store::new(file_opts.clone()); - let mut blobs = Vec::new(); - - for opt in file_opts.into_iter() { - let (name, data) = opt; - let name: String = name.into(); - println!("Sending {}: {}b", name, data.len()); - - // get expected hash of file - let hash = blake3::hash(&data); - let hash = Hash::from(hash); - let blob = (name.clone(), hash); - blobs.push(blob); - - // keep track of expected values - expects.push((name, hash)); - } - let collection_orig = Collection::from_iter(blobs); - let collection_hash = mdb.insert_many(collection_orig.to_blobs()).unwrap(); - - let node = test_node(mdb.clone()).spawn().await?; - - let addrs = node.local_endpoint_addresses().await?; - let (secret_key, peer) = get_options(node.node_id(), addrs); - let request = GetRequest::all(collection_hash); - let (collection, children, _stats) = - run_collection_get_request(secret_key, peer, request).await?; - assert_eq!(num_blobs, collection.len()); - for (i, (expected_name, expected_hash)) in expects.iter().enumerate() { - let (name, hash) = &collection[i]; - let got = &children[&(i as u64)]; - let expected = mdb.get_content(expected_hash).unwrap(); - assert_eq!(expected_name, name); - assert_eq!(expected_hash, hash); - assert_eq!(expected, got); - } - - node.shutdown().await?; - - Ok(()) -} - -#[tokio::test] -async fn test_server_close() { - let _guard = iroh_test::logging::setup(); - - // Prepare a Provider transferring a file. - let mut db = iroh_blobs::store::readonly_mem::Store::default(); - let child_hash = db.insert(b"hello there"); - let collection = Collection::from_iter([("hello", child_hash)]); - let hash = db.insert_many(collection.to_blobs()).unwrap(); - let node = test_node(db).spawn().await.unwrap(); - let node_addr = node.local_endpoint_addresses().await.unwrap(); - let peer_id = node.node_id(); - - let (secret_key, peer) = get_options(peer_id, node_addr); - let request = GetRequest::all(hash); - let (_collection, _children, _stats) = run_collection_get_request(secret_key, peer, request) - .await - .unwrap(); -} - -/// create an in memory test database containing the given entries and an iroh collection of all entries -/// -/// returns the database and the root hash of the collection -fn create_test_db( - entries: impl IntoIterator, impl AsRef<[u8]>)>, -) -> (iroh_blobs::store::readonly_mem::Store, Hash) { - let (mut db, hashes) = iroh_blobs::store::readonly_mem::Store::new(entries); - let collection = Collection::from_iter(hashes); - let hash = db.insert_many(collection.to_blobs()).unwrap(); - (db, hash) -} - -#[tokio::test] -#[ignore = "flaky"] -async fn test_ipv6() { - let _guard = iroh_test::logging::setup(); - - let (db, hash) = create_test_db([("test", b"hello")]); - let node = match test_node(db).spawn().await { - Ok(provider) => provider, - Err(_) => { - // We assume the problem here is IPv6 on this host. If the problem is - // not IPv6 then other tests will also fail. - return; - } - }; - let addrs = node.local_endpoint_addresses().await.unwrap(); - let peer_id = node.node_id(); - tokio::time::timeout(Duration::from_secs(10), async move { - let (secret_key, peer) = get_options(peer_id, addrs); - let request = GetRequest::all(hash); - run_collection_get_request(secret_key, peer, request).await - }) - .await - .expect("timeout") - .expect("get failed"); -} - -/// Simulate a node that has nothing -#[tokio::test] -async fn test_not_found() { - let _guard = iroh_test::logging::setup(); - - let db = iroh_blobs::store::readonly_mem::Store::default(); - let hash = blake3::hash(b"hello").into(); - let node = match test_node(db).spawn().await { - Ok(provider) => provider, - Err(_) => { - // We assume the problem here is IPv6 on this host. If the problem is - // not IPv6 then other tests will also fail. - return; - } - }; - let addrs = node.local_endpoint_addresses().await.unwrap(); - let peer_id = node.node_id(); - tokio::time::timeout(Duration::from_secs(10), async move { - let (secret_key, peer) = get_options(peer_id, addrs); - let request = GetRequest::single(hash); - let res = run_collection_get_request(secret_key, peer, request).await; - if let Err(cause) = res { - if let Some(e) = cause.downcast_ref::() { - if let DecodeError::NotFound = e { - Ok(()) - } else { - anyhow::bail!("expected DecodeError::NotFound, got {:?}", e); - } - } else { - anyhow::bail!("expected DecodeError, got {:?}", cause); - } - } else { - anyhow::bail!("expected error when getting non-existent blob"); - } - }) - .await - .expect("timeout") - .expect("get failed"); -} - -/// Simulate a node that has just begun downloading a blob, but does not yet have any data -#[tokio::test] -async fn test_chunk_not_found_1() { - let _guard = iroh_test::logging::setup(); - - let db = iroh_blobs::store::mem::Store::new(); - let data = (0..1024 * 64).map(|i| i as u8).collect::>(); - let hash = blake3::hash(&data).into(); - let _entry = db.get_or_create(hash, data.len() as u64).await.unwrap(); - let node = match test_node(db).spawn().await { - Ok(provider) => provider, - Err(_) => { - // We assume the problem here is IPv6 on this host. If the problem is - // not IPv6 then other tests will also fail. - return; - } - }; - let addrs = node.local_endpoint_addresses().await.unwrap(); - let peer_id = node.node_id(); - tokio::time::timeout(Duration::from_secs(10), async move { - let (secret_key, peer) = get_options(peer_id, addrs); - let request = GetRequest::single(hash); - let res = run_collection_get_request(secret_key, peer, request).await; - if let Err(cause) = res { - if let Some(e) = cause.downcast_ref::() { - if let DecodeError::NotFound = e { - Ok(()) - } else { - anyhow::bail!("expected DecodeError::ParentNotFound, got {:?}", e); - } - } else { - anyhow::bail!("expected DecodeError, got {:?}", cause); - } - } else { - anyhow::bail!("expected error when getting non-existent blob"); - } - }) - .await - .expect("timeout") - .expect("get failed"); -} - -#[tokio::test] -async fn test_run_ticket() { - let _guard = iroh_test::logging::setup(); - - let (db, hash) = create_test_db([("test", b"hello")]); - let node = test_node(db).spawn().await.unwrap(); - let _drop_guard = node.cancel_token().drop_guard(); - - let mut addr = node.net().node_addr().await.unwrap(); - addr.apply_options(AddrInfoOptions::RelayAndAddresses); - let ticket = BlobTicket::new(addr, hash, iroh_blobs::BlobFormat::HashSeq) - .expect("ticket creation failed"); - - tokio::time::timeout(Duration::from_secs(10), async move { - let request = GetRequest::all(hash); - run_collection_get_request(SecretKey::generate(), ticket.node_addr().clone(), request).await - }) - .await - .expect("timeout") - .expect("get ticket failed"); -} - -/// Utility to validate that the children of a collection are correct -fn validate_children(collection: Collection, children: BTreeMap) -> anyhow::Result<()> { - let blobs = collection.into_iter().collect::>(); - anyhow::ensure!(blobs.len() == children.len()); - for (child, (_name, hash)) in blobs.into_iter().enumerate() { - let child = child as u64; - let data = children.get(&child).unwrap(); - anyhow::ensure!(hash == blake3::hash(data).into()); - } - Ok(()) -} - -async fn run_collection_get_request( - secret_key: SecretKey, - peer: NodeAddr, - request: GetRequest, -) -> anyhow::Result<(Collection, BTreeMap, Stats)> { - let connection = dial(secret_key, peer).await?; - let initial = fsm::start(connection, request); - let connected = initial.next().await?; - let ConnectedNext::StartRoot(fsm_at_start_root) = connected.next().await? else { - anyhow::bail!("request did not include collection"); - }; - Collection::read_fsm_all(fsm_at_start_root).await -} - -#[tokio::test] -async fn test_run_fsm() { - let _guard = iroh_test::logging::setup(); - - let (db, hash) = create_test_db([("a", b"hello"), ("b", b"world")]); - let node = test_node(db).spawn().await.unwrap(); - let addrs = node.local_endpoint_addresses().await.unwrap(); - let peer_id = node.node_id(); - tokio::time::timeout(Duration::from_secs(10), async move { - let (secret_key, peer) = get_options(peer_id, addrs); - let request = GetRequest::all(hash); - let (collection, children, _) = - run_collection_get_request(secret_key, peer, request).await?; - validate_children(collection, children)?; - anyhow::Ok(()) - }) - .await - .expect("timeout") - .expect("get failed"); -} - -/// compute the range of the last chunk of a blob of the given size -fn last_chunk_range(size: usize) -> Range { - const CHUNK_LEN: usize = 1024; - const MASK: usize = CHUNK_LEN - 1; - if (size & MASK) == 0 { - size - CHUNK_LEN..size - } else { - (size & !MASK)..size - } -} - -fn last_chunk(data: &[u8]) -> &[u8] { - let range = last_chunk_range(data.len()); - &data[range] -} - -fn make_test_data(n: usize) -> Vec { - let mut data = Vec::with_capacity(n); - for i in 0..n { - data.push((i / 1024) as u8); - } - data -} - -/// Ask for the last chunk of a blob, even if we don't know the size yet. -/// -/// The verified last chunk also verifies the size. -#[tokio::test] -async fn test_size_request_blob() { - let _guard = iroh_test::logging::setup(); - - let expected = make_test_data(1024 * 64 + 1234); - let last_chunk = last_chunk(&expected); - let (db, hashes) = iroh_blobs::store::readonly_mem::Store::new([("test", &expected)]); - let hash = Hash::from(*hashes.values().next().unwrap()); - let node = test_node(db).spawn().await.unwrap(); - let addrs = node.local_endpoint_addresses().await.unwrap(); - let peer_id = node.node_id(); - tokio::time::timeout(Duration::from_secs(10), async move { - let request = GetRequest::last_chunk(hash); - let (secret_key, peer) = get_options(peer_id, addrs); - let connection = dial(secret_key, peer).await?; - let response = fsm::start(connection, request); - let connected = response.next().await?; - let ConnectedNext::StartRoot(start) = connected.next().await? else { - panic!() - }; - let header = start.next(); - let (_, actual) = header.concatenate_into_vec().await?; - assert_eq!(actual, last_chunk); - anyhow::Ok(()) - }) - .await - .expect("timeout") - .expect("get failed"); -} - -#[tokio::test] -async fn test_collection_stat() { - let _guard = iroh_test::logging::setup(); - - let child1 = make_test_data(123456); - let child2 = make_test_data(345678); - let (db, hash) = create_test_db([("a", &child1), ("b", &child2)]); - let node = test_node(db.clone()).spawn().await.unwrap(); - let addrs = node.local_endpoint_addresses().await.unwrap(); - let peer_id = node.node_id(); - tokio::time::timeout(Duration::from_secs(10), async move { - // first 1024 bytes - let header = ChunkRanges::from(..ChunkNum(1)); - // last chunk, whatever it is, to verify the size - let end = ChunkRanges::from(ChunkNum(u64::MAX)..); - // combine them - let ranges = &header | &end; - let request = GetRequest::new( - hash, - RangeSpecSeq::from_ranges_infinite([ChunkRanges::all(), ranges]), - ); - let (secret_key, peer) = get_options(peer_id, addrs); - let (_collection, items, _stats) = - run_collection_get_request(secret_key, peer, request).await?; - // we should get the first <=1024 bytes and the last chunk of each child - // so now we know the size and can guess the type by inspecting the header - assert_eq!(items.len(), 2); - assert_eq!(&items[&0][..1024], &child1[..1024]); - assert!(items[&0].ends_with(last_chunk(&child1))); - assert_eq!(&items[&1][..1024], &child2[..1024]); - assert!(items[&1].ends_with(last_chunk(&child2))); - anyhow::Ok(()) - }) - .await - .expect("timeout") - .expect("get failed"); -}