Skip to content

Commit

Permalink
feat: Add bitswap server
Browse files Browse the repository at this point in the history
  • Loading branch information
fl0rek committed Feb 23, 2024
1 parent 239705f commit 14464e6
Show file tree
Hide file tree
Showing 9 changed files with 3,059 additions and 50 deletions.
2,063 changes: 2,035 additions & 28 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,19 @@ unsigned-varint = "0.8"
void = "1"

[dev-dependencies]
anyhow = "1.0"
clap = { version = "4.4", features = ["derive"] }
hex = "0.4"
libp2p = { version = "0.53", features = ["tokio", "tcp", "identify", "macros"] }
libp2p-identity = { version = "0.2", features = ["rand"] }
libp2p-identify = "0.44"
libp2p-noise = "0.44"
libp2p-yamux = "0.45"
multihash-codetable = { version = "0.1", features = ["digest", "sha2"] }
tokio = { version = "1", features = ["rt", "macros", "time"] }
tokio = { version = "1", features = ["rt", "macros", "time", "sync"] }

tracing-appender = "0.2.2"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }

[features]
wasm-bindgen = ["futures-timer/wasm-bindgen"]
Expand Down
150 changes: 150 additions & 0 deletions examples/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
use std::collections::HashMap;
use std::time::Duration;

use anyhow::Result;
use blockstore::{
block::{Block, CidError},
Blockstore, InMemoryBlockstore,
};
use cid::Cid;
use clap::Parser;
use libp2p::{
futures::StreamExt, identify, swarm::NetworkBehaviour, swarm::SwarmEvent, tcp, Multiaddr,
SwarmBuilder,
};
use multihash_codetable::{Code, MultihashDigest};
use tracing::{debug, info};

const MAX_MULTIHASH_LENGHT: usize = 64;
const RAW_CODEC: u64 = 0x55;

#[derive(Debug, Parser)]
struct Args {
/// Peers to connect to
#[arg(short, long = "peer")]
pub(crate) peers: Vec<Multiaddr>,

/// CIDs to request
pub(crate) cids: Vec<Cid>,

/// Listen on provided port
#[arg(short, long = "listen")]
pub(crate) listen_port: Option<u16>,

/// Load provided string into blockstore on start
#[arg(long)]
pub(crate) preload_blockstore_string: Vec<String>,
}

#[derive(NetworkBehaviour)]
struct Behaviour {
identify: identify::Behaviour,
bitswap: beetswap::Behaviour<MAX_MULTIHASH_LENGHT, InMemoryBlockstore<MAX_MULTIHASH_LENGHT>>,
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
let args = Args::parse();

let _guard = init_tracing();

let store = InMemoryBlockstore::new();
for preload_string in args.preload_blockstore_string {
let block = StringBlock(preload_string);
let cid = block.cid()?;
info!("inserted {cid} with content '{}'", block.0);
store.put_keyed(&cid, block.data()).await?;
}

let mut swarm = SwarmBuilder::with_new_identity()
.with_tokio()
.with_tcp(
tcp::Config::default(),
libp2p_noise::Config::new,
libp2p_yamux::Config::default,
)?
.with_behaviour(|key| Behaviour {
identify: identify::Behaviour::new(identify::Config::new(
"/ipfs/id/1.0.0".to_string(),
key.public(),
)),
bitswap: beetswap::Behaviour::new(store),
})?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
.build();

if let Some(port) = args.listen_port {
swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{port}").parse()?)?;
}

for peer in args.peers {
swarm.dial(peer)?;
}

let mut queries = HashMap::new();
for cid in args.cids {
let query_id = swarm.behaviour_mut().bitswap.get(&cid);
queries.insert(query_id, cid);
info!("requested cid {cid}: {query_id:?}");
}

loop {
match swarm.select_next_some().await {
SwarmEvent::NewListenAddr { address, .. } => debug!("Listening on {address:?}"),
// Prints peer id identify info is being sent to.
SwarmEvent::Behaviour(BehaviourEvent::Identify(identify)) => match identify {
identify::Event::Sent { peer_id, .. } => {
info!("Sent identify info to {peer_id:?}");
}
identify::Event::Received { info, .. } => {
info!("Received {info:?}")
}
_ => (),
},
SwarmEvent::Behaviour(BehaviourEvent::Bitswap(bitswap)) => match bitswap {
beetswap::Event::GetQueryResponse { query_id, data } => {
let cid = queries.get(&query_id).expect("unknown cid received");
info!("received response for {cid:?}: {data:?}");
}
beetswap::Event::GetQueryError { query_id, error } => {
let cid = queries.get(&query_id).expect("unknown cid received");
info!("received error for {cid:?}: {error}");
}
},
_ => (),
}
}
}

struct StringBlock(pub String);

impl Block<64> for StringBlock {
fn cid(&self) -> Result<Cid, CidError> {
let hash = Code::Sha2_256.digest(self.0.as_ref());
Ok(Cid::new_v1(RAW_CODEC, hash))
}

fn data(&self) -> &[u8] {
self.0.as_ref()
}
}

fn init_tracing() -> tracing_appender::non_blocking::WorkerGuard {
let (non_blocking, guard) = tracing_appender::non_blocking(std::io::stdout());

let filter = tracing_subscriber::EnvFilter::builder()
.with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
.from_env_lossy();

tracing_subscriber::fmt()
.event_format(
tracing_subscriber::fmt::format()
.with_file(true)
.with_line_number(true),
)
.with_env_filter(filter)
.with_writer(non_blocking)
.init();

guard
}
4 changes: 3 additions & 1 deletion src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use blockstore::Blockstore;

use crate::client::{ClientBehaviour, ClientConfig};
use crate::multihasher::{Multihasher, MultihasherTable};
use crate::server::ServerBehaviour;
use crate::utils::stream_protocol;
use crate::{Behaviour, Error, Result};

Expand Down Expand Up @@ -117,7 +118,8 @@ where
Behaviour {
protocol: stream_protocol(protocol_prefix, "/ipfs/bitswap/1.2.0")
.expect("prefix checked by beetswap::BehaviourBuilder::protocol_prefix"),
client: ClientBehaviour::new(self.client, blockstore, protocol_prefix),
client: ClientBehaviour::new(self.client, blockstore.clone(), protocol_prefix),
server: ServerBehaviour::new(blockstore, protocol_prefix),
multihasher,
}
}
Expand Down
46 changes: 35 additions & 11 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::{hash_map, VecDeque};
use std::fmt;
use std::mem::take;
use std::sync::Arc;
use std::task::{ready, Context, Poll};
use std::time::Duration;
Expand Down Expand Up @@ -27,6 +28,7 @@ use crate::proto::message::mod_Message::{BlockPresenceType, Wantlist as ProtoWan
use crate::proto::message::Message;
use crate::utils::{convert_cid, stream_protocol};
use crate::wantlist::{Wantlist, WantlistState};
use crate::StreamRequester;
use crate::{Error, Event, Result, ToBehaviourEvent, ToHandlerEvent};

const SEND_FULL_INTERVAL: Duration = Duration::from_secs(30);
Expand Down Expand Up @@ -54,7 +56,7 @@ enum TaskResult<const S: usize> {
CidGeneric<S>,
Result<Option<Vec<u8>>, BlockstoreError>,
),
Set(Result<(), BlockstoreError>),
Set(Result<Vec<(CidGeneric<S>, Vec<u8>)>, BlockstoreError>),
Cancelled,
}

Expand All @@ -74,6 +76,8 @@ where
next_query_id: u64,
waker: Arc<AtomicWaker>,
send_full_timer: Delay,

new_blocks: Vec<(CidGeneric<S>, Vec<u8>)>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -113,6 +117,7 @@ where
next_query_id: 0,
waker: Arc::new(AtomicWaker::new()),
send_full_timer: Delay::new(SEND_FULL_INTERVAL),
new_blocks: Vec::new(),
}
}

Expand Down Expand Up @@ -178,7 +183,10 @@ where

self.tasks.push(
async move {
let res = store.put_many_keyed(blocks.into_iter()).await;
let res = store
.put_many_keyed(blocks.clone().into_iter())
.await
.map(|_| blocks);
TaskResult::Set(res)
}
.boxed(),
Expand Down Expand Up @@ -372,7 +380,9 @@ where
}));
}

TaskResult::Set(Ok(_)) => {}
TaskResult::Set(Ok(blocks)) => {
self.new_blocks.extend(blocks);
}

// TODO: log it
TaskResult::Set(Err(_e)) => {}
Expand All @@ -383,7 +393,7 @@ where

// If we didn't return an event, we need to retry the whole loop
continue;
}
};

if self.update_handlers() {
// New events generated, loop again to send them.
Expand All @@ -393,6 +403,10 @@ where
return Poll::Pending;
}
}

pub(crate) fn get_new_blocks(&mut self) -> Vec<(CidGeneric<S>, Vec<u8>)> {
take(&mut self.new_blocks)
}
}

pub(crate) struct ClientConnectionHandler<const S: usize> {
Expand Down Expand Up @@ -434,7 +448,9 @@ impl<const S: usize> ClientConnectionHandler<S> {

fn poll_outgoing_no_stream(
&mut self,
) -> Poll<ConnectionHandlerEvent<ReadyUpgrade<StreamProtocol>, (), ToBehaviourEvent<S>>> {
) -> Poll<
ConnectionHandlerEvent<ReadyUpgrade<StreamProtocol>, StreamRequester, ToBehaviourEvent<S>>,
> {
// `stream_requested` already checked in `poll_outgoing`
debug_assert!(!self.stream_requested);
// `wantlist` and `sending_state` must be both `Some` or both `None`
Expand All @@ -445,13 +461,13 @@ impl<const S: usize> ClientConnectionHandler<S> {
return Poll::Pending;
}

// There are data to send, so request a new stream.
// There is data to send, so request a new stream.
self.stream_requested = true;

Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
ReadyUpgrade::new(self.protocol.clone()),
(), // TODO: maybe we can say here that we are the client?
StreamRequester::Client,
),
})
}
Expand All @@ -470,9 +486,11 @@ impl<const S: usize> ClientConnectionHandler<S> {
fn poll_outgoing(
&mut self,
cx: &mut Context,
) -> Poll<ConnectionHandlerEvent<ReadyUpgrade<StreamProtocol>, (), ToBehaviourEvent<S>>> {
) -> Poll<
ConnectionHandlerEvent<ReadyUpgrade<StreamProtocol>, StreamRequester, ToBehaviourEvent<S>>,
> {
loop {
if self.stream_requested {
if self.stream_requested() {
// We can not progress until we have a stream
return Poll::Pending;
}
Expand Down Expand Up @@ -514,8 +532,14 @@ impl<const S: usize> ClientConnectionHandler<S> {
pub(crate) fn poll(
&mut self,
cx: &mut Context,
) -> Poll<ConnectionHandlerEvent<ReadyUpgrade<StreamProtocol>, (), ToBehaviourEvent<S>>> {
self.poll_outgoing(cx)
) -> Poll<
ConnectionHandlerEvent<ReadyUpgrade<StreamProtocol>, StreamRequester, ToBehaviourEvent<S>>,
> {
if let Poll::Ready(ready) = self.poll_outgoing(cx) {
return Poll::Ready(ready);
}

Poll::Pending
}
}

Expand Down
Loading

0 comments on commit 14464e6

Please sign in to comment.