Skip to content

Commit

Permalink
feat: Allow user to register its own hashers (#6)
Browse files Browse the repository at this point in the history
* feat: Allow user to register its own hashers

* fix errors

* Update src/builder.rs

Co-authored-by: Maciej Zwoliński <[email protected]>

---------

Co-authored-by: Maciej Zwoliński <[email protected]>
  • Loading branch information
oblique and zvolin authored Jan 18, 2024
1 parent bf313a8 commit 95a90d7
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 55 deletions.
102 changes: 102 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use std::sync::Arc;

use blockstore::Blockstore;

use crate::client::{ClientBehaviour, ClientConfig};
use crate::multihasher::{Multihasher, MultihasherTable};
use crate::utils::stream_protocol;
use crate::{BitswapBehaviour, Result};

/// Builder for [`BitswapBehaviour`].
///
/// # Example
///
/// ```rust,no_run
/// # use blockstore::InMemoryBlockstore;
/// # use bitmingle::BitswapBehaviour;
/// # fn main() -> bitmingle::Result<()> {
/// BitswapBehaviour::<64, _>::builder(InMemoryBlockstore::<64>::new())
/// .build()?;
/// # Ok(())
/// # }
pub struct BitswapBehaviourBuilder<const S: usize, B>
where
B: Blockstore + Send + Sync + 'static,
{
protocol_prefix: Option<String>,
blockstore: B,
client: ClientConfig,
multihasher: MultihasherTable<S>,
}

impl<const S: usize, B> BitswapBehaviourBuilder<S, B>
where
B: Blockstore + Send + Sync + 'static,
{
/// Creates a new builder for [`BitswapBehaviour`].
pub(crate) fn new(blockstore: B) -> Self {
BitswapBehaviourBuilder {
protocol_prefix: None,
blockstore,
client: ClientConfig {
set_send_dont_have: true,
},
multihasher: MultihasherTable::<S>::new(),
}
}

/// Set a prefix on the protocol name.
///
/// The prefix will be added on `/ipfs/bitswap/1.2.0` and it must start with `/`.
///
/// # Example
///
/// ```rust,no_run
/// # use blockstore::InMemoryBlockstore;
/// # use bitmingle::BitswapBehaviour;
/// # fn main() -> bitmingle::Result<()> {
/// BitswapBehaviour::<64, _>::builder(InMemoryBlockstore::<64>::new())
/// .protocol_prefix("/celestia/celestia")
/// .build()?;
/// # Ok(())
/// # }
/// ```
pub fn protocol_prefix(mut self, prefix: &str) -> Self {
self.protocol_prefix = Some(prefix.to_owned());
self
}

/// Client will set `send_dont_have` flag on each query.
pub fn client_set_send_dont_have(mut self, enable: bool) -> Self {
self.client.set_send_dont_have = enable;
self
}

/// Register extra [`Multihasher`].
///
/// Every registration adds new hasher to `BitswapBehavior`. Hashers are used to reconstruct the `Cid` from the received data.
/// `BitswapBehavior` will try them in the reverse order they were registered until some successfully constructs `Multihash`.
/// By default `BitswapBehaviourBuilder` is pre-loaded with [`StandardMultihasher`].
///
/// [`StandardMultihasher`]: crate::multihasher::StandardMultihasher
pub fn register_multihasher<M>(mut self, multihasher: M) -> Self
where
M: Multihasher<S> + Send + Sync + 'static,
{
self.multihasher.register(multihasher);
self
}

/// Build a [`BitswapBehaviour`].
pub fn build(self) -> Result<BitswapBehaviour<S, B>> {
let blockstore = Arc::new(self.blockstore);
let multihasher = Arc::new(self.multihasher);
let protocol_prefix = self.protocol_prefix.as_deref();

Ok(BitswapBehaviour {
protocol: stream_protocol(protocol_prefix, "/ipfs/bitswap/1.2.0")?,
client: ClientBehaviour::new(self.client, blockstore, multihasher, protocol_prefix)?,
})
}
}
13 changes: 7 additions & 6 deletions src/cid_prefix.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use cid::{CidGeneric, Version};
use multihash_codetable::{Code, MultihashDigest};

use crate::utils::convert_multihash;
use crate::multihasher::MultihasherTable;

const DAG_PB: u64 = 0x70;
const SHA2_256: u64 = 0x12;
Expand Down Expand Up @@ -83,14 +82,16 @@ impl CidPrefix {
}
}

pub(crate) fn to_cid<const S: usize>(&self, data: &[u8]) -> Option<CidGeneric<S>> {
pub(crate) fn to_cid<const S: usize>(
&self,
hasher: &MultihasherTable<S>,
data: &[u8],
) -> Option<CidGeneric<S>> {
if self.multihash_size > S {
return None;
}

// TODO: Provide a way for user to have additional codetable
let hash = Code::try_from(self.multihash_code).ok()?.digest(data);
let hash = convert_multihash(&hash)?;
let hash = hasher.digest(self.multihash_code, data)?;

CidGeneric::new(self.version, self.codec, hash).ok()
}
Expand Down
27 changes: 16 additions & 11 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::sync::Mutex;

use crate::cid_prefix::CidPrefix;
use crate::message::Codec;
use crate::multihasher::MultihasherTable;
use crate::proto::message::mod_Message::{BlockPresenceType, Wantlist as ProtoWantlist};
use crate::proto::message::Message;
use crate::utils::{convert_cid, stream_protocol};
Expand All @@ -33,7 +34,7 @@ use crate::{BitswapError, BitswapEvent, Result, ToBehaviourEvent, ToHandlerEvent
const SEND_FULL_INTERVAL: Duration = Duration::from_secs(30);

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct QueryId(u64);
pub struct BitswapQueryId(u64);

#[derive(Debug)]
pub struct ClientConfig {
Expand All @@ -50,7 +51,7 @@ impl Default for ClientConfig {

enum TaskResult<const S: usize> {
Get(
QueryId,
BitswapQueryId,
CidGeneric<S>,
Result<Option<Vec<u8>>, BlockstoreError>,
),
Expand All @@ -68,11 +69,12 @@ where
queue: VecDeque<ToSwarm<BitswapEvent, ToHandlerEvent>>,
wantlist: Wantlist<S>,
peers: FnvHashMap<PeerId, PeerState<S>>,
cid_to_queries: FnvHashMap<CidGeneric<S>, SmallVec<[QueryId; 1]>>,
cid_to_queries: FnvHashMap<CidGeneric<S>, SmallVec<[BitswapQueryId; 1]>>,
tasks: FuturesUnordered<BoxFuture<'static, TaskResult<S>>>,
query_abort_handle: FnvHashMap<QueryId, AbortHandle>,
query_abort_handle: FnvHashMap<BitswapQueryId, AbortHandle>,
next_query_id: u64,
waker: Arc<AtomicWaker>,
multihasher: Arc<MultihasherTable<S>>,
}

#[derive(Debug)]
Expand All @@ -97,6 +99,7 @@ where
pub(crate) fn new(
config: ClientConfig,
store: Arc<B>,
multihasher: Arc<MultihasherTable<S>>,
protocol_prefix: Option<&str>,
) -> Result<Self> {
let protocol = stream_protocol(protocol_prefix, "/ipfs/bitswap/1.2.0")?;
Expand All @@ -113,6 +116,7 @@ where
query_abort_handle: FnvHashMap::default(),
next_query_id: 0,
waker: Arc::new(AtomicWaker::new()),
multihasher,
})
}

Expand All @@ -136,14 +140,14 @@ where
}
}

fn next_query_id(&mut self) -> QueryId {
let id = QueryId(self.next_query_id);
fn next_query_id(&mut self) -> BitswapQueryId {
let id = BitswapQueryId(self.next_query_id);
self.next_query_id += 1;
id
}

/// Schedule a `Blockstore::get` for the specified cid
fn schedule_store_get(&mut self, query_id: QueryId, cid: CidGeneric<S>) {
fn schedule_store_get(&mut self, query_id: BitswapQueryId, cid: CidGeneric<S>) {
let store = self.store.clone();
let (handle, reg) = AbortHandle::new_pair();

Expand Down Expand Up @@ -175,7 +179,7 @@ where
);
}

pub(crate) fn get<const CS: usize>(&mut self, cid: &CidGeneric<CS>) -> QueryId {
pub(crate) fn get<const CS: usize>(&mut self, cid: &CidGeneric<CS>) -> BitswapQueryId {
let query_id = self.next_query_id();

match convert_cid(cid) {
Expand All @@ -192,7 +196,7 @@ where
query_id
}

pub(crate) fn cancel(&mut self, query_id: QueryId) {
pub(crate) fn cancel(&mut self, query_id: BitswapQueryId) {
if let Some(abort_handle) = self.query_abort_handle.remove(&query_id) {
abort_handle.abort();
}
Expand Down Expand Up @@ -240,7 +244,7 @@ where
continue;
};

let Some(cid) = cid_prefix.to_cid(&block.data) else {
let Some(cid) = cid_prefix.to_cid(&self.multihasher, &block.data) else {
continue;
};

Expand Down Expand Up @@ -891,7 +895,8 @@ mod tests {

async fn new_client() -> ClientBehaviour<64, InMemoryBlockstore<64>> {
let store = blockstore().await;
ClientBehaviour::<64, _>::new(ClientConfig::default(), store, None).unwrap()
let multihasher = Arc::new(MultihasherTable::<64>::new());
ClientBehaviour::<64, _>::new(ClientConfig::default(), store, multihasher, None).unwrap()
}

fn expect_send_wantlist_event(
Expand Down
57 changes: 22 additions & 35 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,29 @@ use libp2p::{
Multiaddr, PeerId,
};

mod builder;
mod cid_prefix;
mod client;
mod message;
pub mod multihasher;
mod proto;
#[cfg(test)]
mod test_utils;
mod utils;
pub mod utils;
mod wantlist;

use crate::client::{ClientBehaviour, ClientConnectionHandler};
use crate::message::Codec;
use crate::proto::message::mod_Message::Wantlist as ProtoWantlist;
use crate::proto::message::Message;
use crate::utils::stream_protocol;

pub use crate::client::{ClientConfig, QueryId};
pub use crate::builder::BitswapBehaviourBuilder;
pub use crate::client::BitswapQueryId;

#[derive(Debug)]
pub struct BitswapBehaviour<const MAX_MULTIHASH_SIZE: usize, B>
where
B: Blockstore + Send + Sync,
B: Blockstore + Send + Sync + 'static,
{
protocol: StreamProtocol,
client: ClientBehaviour<MAX_MULTIHASH_SIZE, B>,
Expand All @@ -47,11 +49,11 @@ where
#[derive(Debug)]
pub enum BitswapEvent {
GetQueryResponse {
query_id: QueryId,
query_id: BitswapQueryId,
data: Vec<u8>,
},
GetQueryError {
query_id: QueryId,
query_id: BitswapQueryId,
error: BitswapError,
},
}
Expand All @@ -61,46 +63,28 @@ pub enum BitswapError {
#[error("Invalid multihash size")]
InvalidMultihashSize,

#[error("Invalid protocol prefix")]
InvalidProtocolPrefix,
#[error("Invalid protocol prefix: {0}")]
InvalidProtocolPrefix(String),

#[error("Blockstore error: {0}")]
Blockstore(#[from] BlockstoreError),
}

pub type Result<T, E = BitswapError> = std::result::Result<T, E>;

#[derive(Debug, Default)]
pub struct BitswapConfig<B>
where
B: Blockstore + Send + Sync,
{
/// The prefix that will be added on `/ipfs/bitswap/1.2.0`. It must start with `/`.
pub protocol_prefix: Option<String>,
pub store: B,
pub client: ClientConfig,
}

impl<const MAX_MULTIHASH_SIZE: usize, B> BitswapBehaviour<MAX_MULTIHASH_SIZE, B>
where
B: Blockstore + Send + Sync + 'static,
{
pub fn new(config: BitswapConfig<B>) -> Result<Self> {
let store = Arc::new(config.store);
let protocol_prefix = config.protocol_prefix.as_deref();
let protocol = stream_protocol(protocol_prefix, "/ipfs/bitswap/1.2.0")?;

Ok(BitswapBehaviour {
protocol: protocol.clone(),
client: ClientBehaviour::new(config.client, store, protocol_prefix)?,
})
pub fn builder(blockstore: B) -> BitswapBehaviourBuilder<MAX_MULTIHASH_SIZE, B> {
BitswapBehaviourBuilder::new(blockstore)
}

pub fn get<const S: usize>(&mut self, cid: &CidGeneric<S>) -> QueryId {
pub fn get<const S: usize>(&mut self, cid: &CidGeneric<S>) -> BitswapQueryId {
self.client.get(cid)
}

pub fn cancel(&mut self, query_id: QueryId) {
pub fn cancel(&mut self, query_id: BitswapQueryId) {
self.client.cancel(query_id)
}
}
Expand All @@ -110,7 +94,7 @@ impl<const MAX_MULTIHASH_SIZE: usize, B> NetworkBehaviour
where
B: Blockstore + Send + Sync + 'static,
{
type ConnectionHandler = Handler<MAX_MULTIHASH_SIZE>;
type ConnectionHandler = BitswapConnectionHandler<MAX_MULTIHASH_SIZE>;
type ToSwarm = BitswapEvent;

fn handle_established_inbound_connection(
Expand All @@ -120,7 +104,7 @@ where
_local_addr: &Multiaddr,
_remote_addr: &Multiaddr,
) -> Result<Self::ConnectionHandler, ConnectionDenied> {
Ok(Handler {
Ok(BitswapConnectionHandler {
peer,
protocol: self.protocol.clone(),
client_handler: self.client.new_connection_handler(peer),
Expand All @@ -135,7 +119,7 @@ where
_addr: &Multiaddr,
_role_override: Endpoint,
) -> Result<Self::ConnectionHandler, ConnectionDenied> {
Ok(Handler {
Ok(BitswapConnectionHandler {
peer,
protocol: self.protocol.clone(),
client_handler: self.client.new_connection_handler(peer),
Expand Down Expand Up @@ -167,6 +151,7 @@ where
}

#[derive(Debug)]
#[doc(hidden)]
pub enum ToBehaviourEvent<const S: usize> {
IncomingMessage(PeerId, Message),
}
Expand All @@ -178,14 +163,16 @@ pub enum ToHandlerEvent {
}

#[derive(Debug)]
pub struct Handler<const MAX_MULTIHASH_SIZE: usize> {
pub struct BitswapConnectionHandler<const MAX_MULTIHASH_SIZE: usize> {
peer: PeerId,
protocol: StreamProtocol,
client_handler: ClientConnectionHandler<MAX_MULTIHASH_SIZE>,
incoming_streams: SelectAll<StreamFramedRead>,
}

impl<const MAX_MULTIHASH_SIZE: usize> ConnectionHandler for Handler<MAX_MULTIHASH_SIZE> {
impl<const MAX_MULTIHASH_SIZE: usize> ConnectionHandler
for BitswapConnectionHandler<MAX_MULTIHASH_SIZE>
{
type ToBehaviour = ToBehaviourEvent<MAX_MULTIHASH_SIZE>;
type FromBehaviour = ToHandlerEvent;
type InboundProtocol = ReadyUpgrade<StreamProtocol>;
Expand Down
Loading

0 comments on commit 95a90d7

Please sign in to comment.