Skip to content

Commit

Permalink
Simplify test node setup
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn committed Nov 21, 2024
1 parent 0f10a98 commit 815e7f2
Showing 1 changed file with 19 additions and 51 deletions.
70 changes: 19 additions & 51 deletions src/rpc/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1005,12 +1005,14 @@ mod tests {
//! An iroh node that just has the blobs transport
use std::{path::Path, sync::Arc};

use iroh_net::{NodeAddr, NodeId};
use quic_rpc::transport::{Connector, Listener};
use iroh_net::{Endpoint, NodeAddr, NodeId};
use iroh_router::Router;
use tokio_util::task::AbortOnDropHandle;

use super::RpcService;
use crate::{
downloader::Downloader,
net_protocol::Blobs,
provider::{CustomEventSender, EventSender},
rpc::client::{blobs, tags},
util::local_pool::LocalPool,
Expand Down Expand Up @@ -1054,40 +1056,20 @@ mod tests {

/// Spawns the node
pub async fn spawn(self) -> anyhow::Result<Node> {
let (client, router, rpc_task, _local_pool) = self.setup_router().await?;
Ok(Node {
router,
client,
_rpc_task: AbortOnDropHandle::new(rpc_task),
_local_pool,
})
}

async fn setup_router(
self,
) -> anyhow::Result<(
RpcClient,
iroh_router::Router,
tokio::task::JoinHandle<()>,
LocalPool,
)> {
let store = self.store;
let events = self.events;
let endpoint = self
.endpoint
.unwrap_or_else(|| iroh_net::Endpoint::builder().discovery_n0())
.unwrap_or_else(|| Endpoint::builder().discovery_n0())
.bind()
.await?;
let local_pool = LocalPool::single();
let mut router = iroh_router::Router::builder(endpoint.clone());
let mut router = Router::builder(endpoint.clone());

// Setup blobs
let downloader = crate::downloader::Downloader::new(
store.clone(),
endpoint.clone(),
local_pool.handle().clone(),
);
let blobs = Arc::new(crate::net_protocol::Blobs::new_with_events(
let downloader =
Downloader::new(store.clone(), endpoint.clone(), local_pool.handle().clone());
let blobs = Arc::new(Blobs::new_with_events(
store.clone(),
local_pool.handle().clone(),
events,
Expand All @@ -1101,31 +1083,17 @@ mod tests {

// Setup RPC
let (internal_rpc, controller) = quic_rpc::transport::flume::channel(32);
let controller = controller.boxed();
let internal_rpc = internal_rpc.boxed();
let internal_rpc = quic_rpc::RpcServer::new(internal_rpc);

let rpc_server_task: tokio::task::JoinHandle<()> = tokio::task::spawn(async move {
loop {
let request = internal_rpc.accept().await;
match request {
Ok(accepting) => {
let blobs = blobs.clone();
tokio::task::spawn(async move {
let (msg, chan) = accepting.read_first().await.unwrap();
blobs.handle_rpc_request(msg, chan).await.unwrap();
});
}
Err(err) => {
tracing::warn!("rpc error: {:?}", err);
}
}
}
let internal_rpc = quic_rpc::RpcServer::new(internal_rpc).boxed();
let _rpc_task = internal_rpc.spawn_accept_loop(move |msg, chan| {
blobs.clone().handle_rpc_request(msg, chan)
});

let client = quic_rpc::RpcClient::new(controller);

Ok((client, router, rpc_server_task, local_pool))
let client = quic_rpc::RpcClient::new(controller).boxed();
Ok(Node {
router,
client,
_rpc_task,
_local_pool: local_pool,
})
}
}

Expand Down

0 comments on commit 815e7f2

Please sign in to comment.