Skip to content

Commit

Permalink
feat: Add iroh-router support & optional in-memory rpc client (#2)
Browse files Browse the repository at this point in the history
* feat: Add an easy way to run an RPC server/get an in-memory client

* Use iroh-router
  • Loading branch information
matheus23 authored Nov 15, 2024
1 parent ff9985a commit 0de31dd
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 186 deletions.
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ iroh-blobs = { version = "0.28.0" }
iroh-io = { version = "0.6.0", features = ["stats"] }
iroh-metrics = { version = "0.28.0", optional = true }
iroh-net = { version = "0.28.0" }
iroh-router = "0.28.0"
meadowcap = "0.1.0"
nested_enum_utils = "0.1.0"
postcard = { version = "1", default-features = false, features = [ "alloc", "use-std", "experimental-derive", ] }
quic-rpc = "0.15.0"
quic-rpc = "0.15.1"
quic-rpc-derive = "0.15.0"
rand = "0.8.5"
rand_core = "0.6.4"
Expand Down
42 changes: 39 additions & 3 deletions src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
//! Engine for driving a willow store and synchronisation sessions.

use std::sync::{Arc, OnceLock};

use anyhow::Result;
use futures_lite::future::Boxed;
use futures_util::{
future::{MapErr, Shared},
FutureExt, TryFutureExt,
};
use iroh_net::{endpoint::Connection, Endpoint, NodeId};
use iroh_net::{
endpoint::{Connecting, Connection},
Endpoint, NodeId,
};
use iroh_router::ProtocolHandler;
use tokio::{
sync::{mpsc, oneshot},
task::JoinError,
Expand All @@ -14,6 +21,7 @@ use tokio_util::task::AbortOnDropHandle;
use tracing::{debug, error, error_span, Instrument};

use crate::{
rpc::{client::MemClient, handler::RpcHandler},
session::{
intents::{Intent, IntentHandle},
SessionInit,
Expand All @@ -39,6 +47,7 @@ const PEER_MANAGER_INBOX_CAP: usize = 128;
#[derive(Debug, Clone)]
pub struct Engine {
actor_handle: ActorHandle,
pub(crate) endpoint: Endpoint,
peer_manager_inbox: mpsc::Sender<peer_manager::Input>,
// `Engine` needs to be `Clone + Send`, and we need to `task.await` in its `shutdown()` impl.
// So we need
Expand All @@ -47,11 +56,20 @@ pub struct Engine {
// - `AbortOnDropHandle` to make sure that the `task` is cancelled when all `Node`s are dropped
// (`Shared` acts like an `Arc` around its inner future).
peer_manager_task: Shared<MapErr<AbortOnDropHandle<Result<(), String>>, JoinErrToStr>>,
rpc_handler: Arc<OnceLock<crate::rpc::handler::RpcHandler>>,
}

pub(crate) type JoinErrToStr = Box<dyn Fn(JoinError) -> String + Send + Sync + 'static>;

impl Engine {
/// Get an in memory client to interact with the willow engine.
pub fn client(&self) -> &MemClient {
&self
.rpc_handler
.get_or_init(|| RpcHandler::new(self.clone()))
.client
}

/// Start the Willow engine.
///
/// This needs an `endpoint` to connect to other peers, and a `create_store` closure which
Expand All @@ -74,8 +92,12 @@ impl Engine {
let me = endpoint.node_id();
let actor_handle = ActorHandle::spawn(create_store, me);
let (pm_inbox_tx, pm_inbox_rx) = mpsc::channel(PEER_MANAGER_INBOX_CAP);
let peer_manager =
PeerManager::new(actor_handle.clone(), endpoint, pm_inbox_rx, accept_opts);
let peer_manager = PeerManager::new(
actor_handle.clone(),
endpoint.clone(),
pm_inbox_rx,
accept_opts,
);
let peer_manager_task = tokio::task::spawn(
async move { peer_manager.run().await.map_err(|e| e.to_string()) }
.instrument(error_span!("peer_manager", me=%me.fmt_short())),
Expand All @@ -85,8 +107,10 @@ impl Engine {
.shared();
Engine {
actor_handle,
endpoint,
peer_manager_inbox: pm_inbox_tx,
peer_manager_task,
rpc_handler: Default::default(),
}
}

Expand Down Expand Up @@ -148,3 +172,15 @@ impl std::ops::Deref for Engine {
&self.actor_handle
}
}

impl ProtocolHandler for Engine {
fn accept(self: Arc<Self>, conn: Connecting) -> Boxed<Result<()>> {
Box::pin(async move { self.handle_connection(conn.await?).await })
}

fn shutdown(self: Arc<Self>) -> Boxed<()> {
Box::pin(async move {
(&**self).shutdown().await.ok();
})
}
}
44 changes: 27 additions & 17 deletions src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,43 @@ use crate::{
store::traits::{StoreEvent, SubscribeParams},
};

/// Type alias for a memory-backed client.
pub type MemClient = Client<
quic_rpc::transport::flume::FlumeConnector<
crate::rpc::proto::Response,
crate::rpc::proto::Request,
>,
>;

/// Iroh Willow client.
#[derive(Debug, Clone, RefCast)]
#[repr(transparent)]
pub struct Client<C: quic_rpc::Connector<RpcService> = quic_rpc::client::BoxedConnector<RpcService>>
where
C: ConnectionErrors<SendError = anyhow::Error>,
{
pub(super) rpc: RpcClient<C>,
}

impl<C: quic_rpc::Connector<RpcService>> Client<C>
where
C: ConnectionErrors<SendError = anyhow::Error>,
{
impl<C: quic_rpc::Connector<RpcService>> Client<C> {
pub fn new(rpc: RpcClient<C>) -> Self {
Self { rpc }
}

pub fn boxed(self) -> Client<quic_rpc::client::BoxedConnector<RpcService>>
where
C: quic_rpc::transport::boxed::BoxableConnector<
crate::rpc::proto::Response,
crate::rpc::proto::Request,
>,
{
Client {
rpc: self.rpc.boxed(),
}
}

/// Create a new namespace in the Willow store.
pub async fn create(&self, kind: NamespaceKind, owner: UserId) -> Result<Space<C>> {
let req = CreateNamespaceRequest { kind, owner };
let res = self.rpc.rpc(req).await??;
let res = self.rpc.rpc(req).await.map_err(|e| anyhow::anyhow!(e))??;
Ok(Space::new(self.rpc.clone(), res.0))
}

Expand Down Expand Up @@ -138,11 +153,11 @@ where
let req = SyncWithPeerRequest { peer, init };
let (update_tx, event_rx) = self.rpc.bidi(req).await?;

let update_tx = SinkExt::with(
update_tx,
|update| async move { Ok(SyncWithPeerUpdate(update)) },
let update_tx: UpdateSender = Box::pin(
update_tx
.with(|update| async move { Ok(SyncWithPeerUpdate(update)) })
.sink_map_err(|e: <C as ConnectionErrors>::SendError| e.into()),
);
let update_tx: UpdateSender = Box::pin(update_tx);

let event_rx = Box::pin(event_rx.map(|res| match res {
Ok(Ok(SyncWithPeerResponse::Event(event))) => event,
Expand Down Expand Up @@ -187,17 +202,12 @@ where
/// A space to store entries in.
#[derive(Debug, Clone)]
pub struct Space<C: quic_rpc::Connector<RpcService> = quic_rpc::client::BoxedConnector<RpcService>>
where
C: ConnectionErrors<SendError = anyhow::Error>,
{
rpc: RpcClient<C>,
namespace_id: NamespaceId,
}

impl<C: quic_rpc::Connector<RpcService>> Space<C>
where
C: ConnectionErrors<SendError = anyhow::Error>,
{
impl<C: quic_rpc::Connector<RpcService>> Space<C> {
fn new(rpc: RpcClient<C>, namespace_id: NamespaceId) -> Self {
Self { rpc, namespace_id }
}
Expand Down
50 changes: 38 additions & 12 deletions src/rpc/handler.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
use anyhow::Result;
use futures_lite::Stream;
use futures_util::{SinkExt, StreamExt};
use iroh_net::Endpoint;
use quic_rpc::server::{ChannelTypes, RpcChannel, RpcServerError};
use quic_rpc::{
server::{ChannelTypes, RpcChannel, RpcServerError},
RpcClient, RpcServer,
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::task::AbortOnDropHandle;

use crate::{form::EntryOrForm, rpc::proto::*, Engine};

fn map_err(err: anyhow::Error) -> RpcError {
RpcError::new(&*err)
}
use crate::{
form::EntryOrForm,
rpc::{client::MemClient, proto::*},
Engine,
};

impl Engine {
pub async fn handle_spaces_request<C: ChannelTypes<RpcService>>(
self,
endpoint: Endpoint,
msg: Request,
chan: RpcChannel<RpcService, C>,
) -> Result<(), RpcServerError<C>> {
Expand Down Expand Up @@ -165,15 +167,15 @@ impl Engine {
.await
}
Addr(msg) => {
chan.rpc(msg, endpoint, |endpoint, _req| async move {
let addr = endpoint.node_addr().await.map_err(map_err)?;
chan.rpc(msg, self, |engine, _req| async move {
let addr = engine.endpoint.node_addr().await.map_err(map_err)?;
Ok(addr)
})
.await
}
AddAddr(msg) => {
chan.rpc(msg, endpoint, |endpoint, req| async move {
endpoint.add_node_addr(req.addr).map_err(map_err)?;
chan.rpc(msg, self, |engine, req| async move {
engine.endpoint.add_node_addr(req.addr).map_err(map_err)?;
Ok(())
})
.await
Expand All @@ -182,6 +184,26 @@ impl Engine {
}
}

#[derive(derive_more::Debug)]
pub(crate) struct RpcHandler {
/// Client to hand out
#[debug("MemClient")]
pub(crate) client: MemClient,
/// Handler task
pub(crate) _handler: AbortOnDropHandle<()>,
}

impl RpcHandler {
pub(crate) fn new(engine: Engine) -> Self {
let (listener, connector) = quic_rpc::transport::flume::channel(1);
let listener = RpcServer::new(listener);
let client = MemClient::new(RpcClient::new(connector));
let _handler = listener
.spawn_accept_loop(move |req, chan| engine.clone().handle_spaces_request(req, chan));
Self { client, _handler }
}
}

// TODO: Try to use the streams directly instead of spawning two tasks.
async fn sync_with_peer(
engine: &Engine,
Expand Down Expand Up @@ -214,3 +236,7 @@ async fn sync_with_peer(
});
Ok(())
}

fn map_err(err: anyhow::Error) -> RpcError {
RpcError::new(&*err)
}
Loading

0 comments on commit 0de31dd

Please sign in to comment.