Skip to content

Commit

Permalink
feat!: simplify LocalPool handling (#47)
Browse files Browse the repository at this point in the history
## Description

It has been a footgun for a few users, including myself, to make sure to
keep around the `LocalPool`. This changes the behaviour to construct a
`LocalPool` and keep it around by default. If necessary, in the builder
one can provide a custom handle, if there is the need for a custom pool.

## Breaking Changes

- remove `net_protocol::Blobs::new`, use the builder instead
- remove the `LocalPoolHandle` argument from
`net_protocol::Builder::build`
  • Loading branch information
dignifiedquire authored Jan 21, 2025
1 parent 5cacccb commit b29991d
Show file tree
Hide file tree
Showing 12 changed files with 71 additions and 71 deletions.
11 changes: 2 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,18 @@ Here is a basic example of how to set up `iroh-blobs` with `iroh`:

```rust
use iroh::{protocol::Router, Endpoint};
use iroh_blobs::{net_protocol::Blobs, util::local_pool::LocalPool};
use iroh_blobs::net_protocol::Blobs;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
// create an iroh endpoint that includes the standard discovery mechanisms
// we've built at number0
let endpoint = Endpoint::builder().discovery_n0().bind().await?;

// spawn a local pool with one thread per CPU
// for a single threaded pool use `LocalPool::single`
let local_pool = LocalPool::default();

// create an in-memory blob store
// use `iroh_blobs::net_protocol::Blobs::persistent` to load or create a
// persistent blob store from a path
let blobs = Blobs::memory().build(local_pool.handle(), &endpoint);
let blobs = Blobs::memory().build(&endpoint);

// turn on the "rpc" feature if you need to create blobs and tags clients
let blobs_client = blobs.client();
Expand All @@ -60,9 +56,7 @@ async fn main() -> anyhow::Result<()> {
.await?;

// do fun stuff with the blobs protocol!
// make sure not to drop the local_pool before you are finished
router.shutdown().await?;
drop(local_pool);
drop(tags_client);
Ok(())
}
Expand All @@ -89,4 +83,3 @@ at your option.
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in this project by you, as defined in the Apache-2.0 license,
shall be dual licensed as above, without any additional terms or conditions.

7 changes: 2 additions & 5 deletions examples/custom-protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ use iroh::{
protocol::{ProtocolHandler, Router},
Endpoint, NodeId,
};
use iroh_blobs::{
net_protocol::Blobs, rpc::client::blobs::MemClient, util::local_pool::LocalPool, Hash,
};
use iroh_blobs::{net_protocol::Blobs, rpc::client::blobs::MemClient, Hash};
use tracing_subscriber::{prelude::*, EnvFilter};

#[derive(Debug, Parser)]
Expand Down Expand Up @@ -89,8 +87,7 @@ async fn main() -> Result<()> {
// Build a in-memory node. For production code, you'd want a persistent node instead usually.
let endpoint = Endpoint::builder().bind().await?;
let builder = Router::builder(endpoint);
let local_pool = LocalPool::default();
let blobs = Blobs::memory().build(local_pool.handle(), builder.endpoint());
let blobs = Blobs::memory().build(builder.endpoint());
let builder = builder.accept(iroh_blobs::ALPN, blobs.clone());
let blobs_client = blobs.client();

Expand Down
7 changes: 2 additions & 5 deletions examples/hello-world-fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use std::{env, str::FromStr};

use anyhow::{bail, ensure, Context, Result};
use iroh::{protocol::Router, Endpoint};
use iroh_blobs::{
net_protocol::Blobs, ticket::BlobTicket, util::local_pool::LocalPool, BlobFormat,
};
use iroh_blobs::{net_protocol::Blobs, ticket::BlobTicket, BlobFormat};
use tracing_subscriber::{prelude::*, EnvFilter};

// set the RUST_LOG env var to one of {debug,info,warn} to see logging info
Expand Down Expand Up @@ -39,8 +37,7 @@ async fn main() -> Result<()> {
// create a new node
let endpoint = Endpoint::builder().bind().await?;
let builder = Router::builder(endpoint);
let local_pool = LocalPool::default();
let blobs = Blobs::memory().build(local_pool.handle(), builder.endpoint());
let blobs = Blobs::memory().build(builder.endpoint());
let builder = builder.accept(iroh_blobs::ALPN, blobs.clone());
let node = builder.spawn().await?;
let blobs_client = blobs.client();
Expand Down
5 changes: 2 additions & 3 deletions examples/hello-world-provide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! run this example from the project root:
//! $ cargo run --example hello-world-provide
use iroh::{protocol::Router, Endpoint};
use iroh_blobs::{net_protocol::Blobs, ticket::BlobTicket, util::local_pool::LocalPool};
use iroh_blobs::{net_protocol::Blobs, ticket::BlobTicket};
use tracing_subscriber::{prelude::*, EnvFilter};

// set the RUST_LOG env var to one of {debug,info,warn} to see logging info
Expand All @@ -24,8 +24,7 @@ async fn main() -> anyhow::Result<()> {
// create a new node
let endpoint = Endpoint::builder().bind().await?;
let builder = Router::builder(endpoint);
let local_pool = LocalPool::default();
let blobs = Blobs::memory().build(local_pool.handle(), builder.endpoint());
let blobs = Blobs::memory().build(builder.endpoint());
let builder = builder.accept(iroh_blobs::ALPN, blobs.clone());
let blobs_client = blobs.client();
let node = builder.spawn().await?;
Expand Down
7 changes: 2 additions & 5 deletions examples/local-swarm-discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ use iroh::{
discovery::local_swarm_discovery::LocalSwarmDiscovery, protocol::Router, Endpoint, NodeAddr,
PublicKey, RelayMode, SecretKey,
};
use iroh_blobs::{
net_protocol::Blobs, rpc::client::blobs::WrapOption, util::local_pool::LocalPool, Hash,
};
use iroh_blobs::{net_protocol::Blobs, rpc::client::blobs::WrapOption, Hash};
use tracing_subscriber::{prelude::*, EnvFilter};

use self::progress::show_download_progress;
Expand Down Expand Up @@ -73,8 +71,7 @@ async fn main() -> anyhow::Result<()> {
.bind()
.await?;
let builder = Router::builder(endpoint);
let local_pool = LocalPool::default();
let blobs = Blobs::memory().build(local_pool.handle(), builder.endpoint());
let blobs = Blobs::memory().build(builder.endpoint());
let builder = builder.accept(iroh_blobs::ALPN, blobs.clone());
let node = builder.spawn().await?;
let blobs_client = blobs.client();
Expand Down
7 changes: 2 additions & 5 deletions examples/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,16 @@ use iroh_blobs::{
net_protocol::Blobs,
rpc::client::blobs::{ReadAtLen, WrapOption},
ticket::BlobTicket,
util::{local_pool::LocalPool, SetTagOption},
util::SetTagOption,
};

#[tokio::main]
async fn main() -> Result<()> {
// Create an endpoint, it allows creating and accepting
// connections in the iroh p2p world
let endpoint = Endpoint::builder().discovery_n0().bind().await?;

// We initialize the Blobs protocol in-memory
let local_pool = LocalPool::default();
let blobs = Blobs::memory().build(&local_pool, &endpoint);
let blobs = Blobs::memory().build(&endpoint);

// Now we build a router that accepts blobs connections & routes them
// to the blobs protocol.
Expand Down Expand Up @@ -85,7 +83,6 @@ async fn main() -> Result<()> {
// Gracefully shut down the node
println!("Shutting down.");
node.shutdown().await?;
local_pool.shutdown().await;

Ok(())
}
58 changes: 49 additions & 9 deletions src/net_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
// TODO: reduce API surface and add documentation
#![allow(missing_docs)]

use std::{collections::BTreeSet, fmt::Debug, ops::DerefMut, sync::Arc};
use std::{
collections::BTreeSet,
fmt::Debug,
ops::{Deref, DerefMut},
sync::Arc,
};

use anyhow::{bail, Result};
use futures_lite::future::Boxed as BoxedFuture;
Expand All @@ -17,7 +22,7 @@ use crate::{
provider::EventSender,
store::GcConfig,
util::{
local_pool::{self, LocalPoolHandle},
local_pool::{self, LocalPool, LocalPoolHandle},
SetTagOption,
},
BlobFormat, Hash,
Expand All @@ -41,9 +46,26 @@ impl Default for GcState {
}
}

#[derive(Debug)]
enum Rt {
Handle(LocalPoolHandle),
Owned(LocalPool),
}

impl Deref for Rt {
type Target = LocalPoolHandle;

fn deref(&self) -> &Self::Target {
match self {
Self::Handle(ref handle) => handle,
Self::Owned(ref pool) => pool.handle(),
}
}
}

#[derive(Debug)]
pub(crate) struct BlobsInner<S> {
pub(crate) rt: LocalPoolHandle,
rt: Rt,
pub(crate) store: S,
events: EventSender,
pub(crate) downloader: Downloader,
Expand All @@ -53,6 +75,12 @@ pub(crate) struct BlobsInner<S> {
pub(crate) batches: tokio::sync::Mutex<BlobBatches>,
}

impl<S> BlobsInner<S> {
pub(crate) fn rt(&self) -> &LocalPoolHandle {
&self.rt
}
}

#[derive(Debug, Clone)]
pub struct Blobs<S> {
pub(crate) inner: Arc<BlobsInner<S>>,
Expand Down Expand Up @@ -119,6 +147,7 @@ impl BlobBatches {
pub struct Builder<S> {
store: S,
events: Option<EventSender>,
rt: Option<LocalPoolHandle>,
}

impl<S: crate::store::Store> Builder<S> {
Expand All @@ -128,13 +157,23 @@ impl<S: crate::store::Store> Builder<S> {
self
}

/// Set a custom `LocalPoolHandle` to use.
pub fn local_pool(mut self, rt: LocalPoolHandle) -> Self {
self.rt = Some(rt);
self
}

/// Build the Blobs protocol handler.
/// You need to provide a local pool handle and an endpoint.
pub fn build(self, rt: &LocalPoolHandle, endpoint: &Endpoint) -> Blobs<S> {
/// You need to provide a the endpoint.
pub fn build(self, endpoint: &Endpoint) -> Blobs<S> {
let rt = self
.rt
.map(Rt::Handle)
.unwrap_or_else(|| Rt::Owned(LocalPool::default()));
let downloader = Downloader::new(self.store.clone(), endpoint.clone(), rt.clone());
Blobs::new(
self.store,
rt.clone(),
rt,
self.events.unwrap_or_default(),
downloader,
endpoint.clone(),
Expand All @@ -148,6 +187,7 @@ impl<S> Blobs<S> {
Builder {
store,
events: None,
rt: None,
}
}
}
Expand All @@ -169,9 +209,9 @@ impl Blobs<crate::store::fs::Store> {
}

impl<S: crate::store::Store> Blobs<S> {
pub fn new(
fn new(
store: S,
rt: LocalPoolHandle,
rt: Rt,
events: EventSender,
downloader: Downloader,
endpoint: Endpoint,
Expand Down Expand Up @@ -201,7 +241,7 @@ impl<S: crate::store::Store> Blobs<S> {
}

pub fn rt(&self) -> &LocalPoolHandle {
&self.inner.rt
self.inner.rt()
}

pub fn downloader(&self) -> &Downloader {
Expand Down
2 changes: 1 addition & 1 deletion src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl<D: crate::store::Store> Handler<D> {
}

fn rt(&self) -> &LocalPoolHandle {
&self.0.rt
self.0.rt()
}

fn endpoint(&self) -> &Endpoint {
Expand Down
17 changes: 3 additions & 14 deletions src/rpc/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1015,11 +1015,9 @@ mod tests {

use super::RpcService;
use crate::{
downloader::Downloader,
net_protocol::Blobs,
provider::{CustomEventSender, EventSender},
rpc::client::{blobs, tags},
util::local_pool::LocalPool,
};

type RpcClient = quic_rpc::RpcClient<RpcService>;
Expand All @@ -1029,7 +1027,6 @@ mod tests {
pub struct Node {
router: iroh::protocol::Router,
client: RpcClient,
_local_pool: LocalPool,
_rpc_task: AbortOnDropHandle<()>,
}

Expand Down Expand Up @@ -1067,19 +1064,12 @@ mod tests {
.unwrap_or_else(|| Endpoint::builder().discovery_n0())
.bind()
.await?;
let local_pool = LocalPool::single();
let mut router = Router::builder(endpoint.clone());

// Setup blobs
let downloader =
Downloader::new(store.clone(), endpoint.clone(), local_pool.handle().clone());
let blobs = Blobs::new(
store.clone(),
local_pool.handle().clone(),
events,
downloader,
endpoint.clone(),
);
let blobs = Blobs::builder(store.clone())
.events(events)
.build(&endpoint);
router = router.accept(crate::ALPN, blobs.clone());

// Build the router
Expand All @@ -1096,7 +1086,6 @@ mod tests {
router,
client,
_rpc_task,
_local_pool: local_pool,
})
}
}
Expand Down
8 changes: 3 additions & 5 deletions tests/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ use std::{
};

use iroh::Endpoint;
use iroh_blobs::{net_protocol::Blobs, store::GcConfig, util::local_pool::LocalPool};
use iroh_blobs::{net_protocol::Blobs, store::GcConfig};
use testresult::TestResult;

#[tokio::test]
async fn blobs_gc_smoke() -> TestResult<()> {
let pool = LocalPool::default();
let endpoint = Endpoint::builder().bind().await?;
let blobs = Blobs::memory().build(pool.handle(), &endpoint);
let blobs = Blobs::memory().build(&endpoint);
let client = blobs.client();
blobs.start_gc(GcConfig {
period: Duration::from_millis(1),
Expand All @@ -29,9 +28,8 @@ async fn blobs_gc_smoke() -> TestResult<()> {

#[tokio::test]
async fn blobs_gc_protected() -> TestResult<()> {
let pool = LocalPool::default();
let endpoint = Endpoint::builder().bind().await?;
let blobs = Blobs::memory().build(pool.handle(), &endpoint);
let blobs = Blobs::memory().build(&endpoint);
let client = blobs.client();
let h1 = client.add_bytes(b"test".to_vec()).await?;
let protected = Arc::new(Mutex::new(Vec::new()));
Expand Down
Loading

0 comments on commit b29991d

Please sign in to comment.