From 95a90d7b9d8d5aecb881f0379fd2e74d83161e9b Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Thu, 18 Jan 2024 22:25:47 +0200 Subject: [PATCH] feat: Allow user to register its own hashers (#6) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Allow user to register its own hashers * fix errors * Update src/builder.rs Co-authored-by: Maciej Zwoliński --------- Co-authored-by: Maciej Zwoliński --- src/builder.rs | 102 +++++++++++++++++++++++++++++++++++++++++++++ src/cid_prefix.rs | 13 +++--- src/client.rs | 27 +++++++----- src/lib.rs | 57 ++++++++++--------------- src/multihasher.rs | 62 +++++++++++++++++++++++++++ src/utils.rs | 8 ++-- 6 files changed, 214 insertions(+), 55 deletions(-) create mode 100644 src/builder.rs create mode 100644 src/multihasher.rs diff --git a/src/builder.rs b/src/builder.rs new file mode 100644 index 0000000..6480edf --- /dev/null +++ b/src/builder.rs @@ -0,0 +1,102 @@ +use std::sync::Arc; + +use blockstore::Blockstore; + +use crate::client::{ClientBehaviour, ClientConfig}; +use crate::multihasher::{Multihasher, MultihasherTable}; +use crate::utils::stream_protocol; +use crate::{BitswapBehaviour, Result}; + +/// Builder for [`BitswapBehaviour`]. +/// +/// # Example +/// +/// ```rust,no_run +/// # use blockstore::InMemoryBlockstore; +/// # use bitmingle::BitswapBehaviour; +/// # fn main() -> bitmingle::Result<()> { +/// BitswapBehaviour::<64, _>::builder(InMemoryBlockstore::<64>::new()) +/// .build()?; +/// # Ok(()) +/// # } +pub struct BitswapBehaviourBuilder +where + B: Blockstore + Send + Sync + 'static, +{ + protocol_prefix: Option, + blockstore: B, + client: ClientConfig, + multihasher: MultihasherTable, +} + +impl BitswapBehaviourBuilder +where + B: Blockstore + Send + Sync + 'static, +{ + /// Creates a new builder for [`BitswapBehaviour`]. + pub(crate) fn new(blockstore: B) -> Self { + BitswapBehaviourBuilder { + protocol_prefix: None, + blockstore, + client: ClientConfig { + set_send_dont_have: true, + }, + multihasher: MultihasherTable::::new(), + } + } + + /// Set a prefix on the protocol name. + /// + /// The prefix will be added on `/ipfs/bitswap/1.2.0` and it must start with `/`. + /// + /// # Example + /// + /// ```rust,no_run + /// # use blockstore::InMemoryBlockstore; + /// # use bitmingle::BitswapBehaviour; + /// # fn main() -> bitmingle::Result<()> { + /// BitswapBehaviour::<64, _>::builder(InMemoryBlockstore::<64>::new()) + /// .protocol_prefix("/celestia/celestia") + /// .build()?; + /// # Ok(()) + /// # } + /// ``` + pub fn protocol_prefix(mut self, prefix: &str) -> Self { + self.protocol_prefix = Some(prefix.to_owned()); + self + } + + /// Client will set `send_dont_have` flag on each query. + pub fn client_set_send_dont_have(mut self, enable: bool) -> Self { + self.client.set_send_dont_have = enable; + self + } + + /// Register extra [`Multihasher`]. + /// + /// Every registration adds new hasher to `BitswapBehavior`. Hashers are used to reconstruct the `Cid` from the received data. + /// `BitswapBehavior` will try them in the reverse order they were registered until some successfully constructs `Multihash`. + /// By default `BitswapBehaviourBuilder` is pre-loaded with [`StandardMultihasher`]. + + /// + /// [`StandardMultihasher`]: crate::multihasher::StandardMultihasher + pub fn register_multihasher(mut self, multihasher: M) -> Self + where + M: Multihasher + Send + Sync + 'static, + { + self.multihasher.register(multihasher); + self + } + + /// Build a [`BitswapBehaviour`]. + pub fn build(self) -> Result> { + let blockstore = Arc::new(self.blockstore); + let multihasher = Arc::new(self.multihasher); + let protocol_prefix = self.protocol_prefix.as_deref(); + + Ok(BitswapBehaviour { + protocol: stream_protocol(protocol_prefix, "/ipfs/bitswap/1.2.0")?, + client: ClientBehaviour::new(self.client, blockstore, multihasher, protocol_prefix)?, + }) + } +} diff --git a/src/cid_prefix.rs b/src/cid_prefix.rs index 491e6b3..60d02b1 100644 --- a/src/cid_prefix.rs +++ b/src/cid_prefix.rs @@ -1,7 +1,6 @@ use cid::{CidGeneric, Version}; -use multihash_codetable::{Code, MultihashDigest}; -use crate::utils::convert_multihash; +use crate::multihasher::MultihasherTable; const DAG_PB: u64 = 0x70; const SHA2_256: u64 = 0x12; @@ -83,14 +82,16 @@ impl CidPrefix { } } - pub(crate) fn to_cid(&self, data: &[u8]) -> Option> { + pub(crate) fn to_cid( + &self, + hasher: &MultihasherTable, + data: &[u8], + ) -> Option> { if self.multihash_size > S { return None; } - // TODO: Provide a way for user to have additional codetable - let hash = Code::try_from(self.multihash_code).ok()?.digest(data); - let hash = convert_multihash(&hash)?; + let hash = hasher.digest(self.multihash_code, data)?; CidGeneric::new(self.version, self.codec, hash).ok() } diff --git a/src/client.rs b/src/client.rs index 596d3ca..27b5996 100644 --- a/src/client.rs +++ b/src/client.rs @@ -24,6 +24,7 @@ use std::sync::Mutex; use crate::cid_prefix::CidPrefix; use crate::message::Codec; +use crate::multihasher::MultihasherTable; use crate::proto::message::mod_Message::{BlockPresenceType, Wantlist as ProtoWantlist}; use crate::proto::message::Message; use crate::utils::{convert_cid, stream_protocol}; @@ -33,7 +34,7 @@ use crate::{BitswapError, BitswapEvent, Result, ToBehaviourEvent, ToHandlerEvent const SEND_FULL_INTERVAL: Duration = Duration::from_secs(30); #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub struct QueryId(u64); +pub struct BitswapQueryId(u64); #[derive(Debug)] pub struct ClientConfig { @@ -50,7 +51,7 @@ impl Default for ClientConfig { enum TaskResult { Get( - QueryId, + BitswapQueryId, CidGeneric, Result>, BlockstoreError>, ), @@ -68,11 +69,12 @@ where queue: VecDeque>, wantlist: Wantlist, peers: FnvHashMap>, - cid_to_queries: FnvHashMap, SmallVec<[QueryId; 1]>>, + cid_to_queries: FnvHashMap, SmallVec<[BitswapQueryId; 1]>>, tasks: FuturesUnordered>>, - query_abort_handle: FnvHashMap, + query_abort_handle: FnvHashMap, next_query_id: u64, waker: Arc, + multihasher: Arc>, } #[derive(Debug)] @@ -97,6 +99,7 @@ where pub(crate) fn new( config: ClientConfig, store: Arc, + multihasher: Arc>, protocol_prefix: Option<&str>, ) -> Result { let protocol = stream_protocol(protocol_prefix, "/ipfs/bitswap/1.2.0")?; @@ -113,6 +116,7 @@ where query_abort_handle: FnvHashMap::default(), next_query_id: 0, waker: Arc::new(AtomicWaker::new()), + multihasher, }) } @@ -136,14 +140,14 @@ where } } - fn next_query_id(&mut self) -> QueryId { - let id = QueryId(self.next_query_id); + fn next_query_id(&mut self) -> BitswapQueryId { + let id = BitswapQueryId(self.next_query_id); self.next_query_id += 1; id } /// Schedule a `Blockstore::get` for the specified cid - fn schedule_store_get(&mut self, query_id: QueryId, cid: CidGeneric) { + fn schedule_store_get(&mut self, query_id: BitswapQueryId, cid: CidGeneric) { let store = self.store.clone(); let (handle, reg) = AbortHandle::new_pair(); @@ -175,7 +179,7 @@ where ); } - pub(crate) fn get(&mut self, cid: &CidGeneric) -> QueryId { + pub(crate) fn get(&mut self, cid: &CidGeneric) -> BitswapQueryId { let query_id = self.next_query_id(); match convert_cid(cid) { @@ -192,7 +196,7 @@ where query_id } - pub(crate) fn cancel(&mut self, query_id: QueryId) { + pub(crate) fn cancel(&mut self, query_id: BitswapQueryId) { if let Some(abort_handle) = self.query_abort_handle.remove(&query_id) { abort_handle.abort(); } @@ -240,7 +244,7 @@ where continue; }; - let Some(cid) = cid_prefix.to_cid(&block.data) else { + let Some(cid) = cid_prefix.to_cid(&self.multihasher, &block.data) else { continue; }; @@ -891,7 +895,8 @@ mod tests { async fn new_client() -> ClientBehaviour<64, InMemoryBlockstore<64>> { let store = blockstore().await; - ClientBehaviour::<64, _>::new(ClientConfig::default(), store, None).unwrap() + let multihasher = Arc::new(MultihasherTable::<64>::new()); + ClientBehaviour::<64, _>::new(ClientConfig::default(), store, multihasher, None).unwrap() } fn expect_send_wantlist_event( diff --git a/src/lib.rs b/src/lib.rs index 67c106b..f5aeb9b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,27 +18,29 @@ use libp2p::{ Multiaddr, PeerId, }; +mod builder; mod cid_prefix; mod client; mod message; +pub mod multihasher; mod proto; #[cfg(test)] mod test_utils; -mod utils; +pub mod utils; mod wantlist; use crate::client::{ClientBehaviour, ClientConnectionHandler}; use crate::message::Codec; use crate::proto::message::mod_Message::Wantlist as ProtoWantlist; use crate::proto::message::Message; -use crate::utils::stream_protocol; -pub use crate::client::{ClientConfig, QueryId}; +pub use crate::builder::BitswapBehaviourBuilder; +pub use crate::client::BitswapQueryId; #[derive(Debug)] pub struct BitswapBehaviour where - B: Blockstore + Send + Sync, + B: Blockstore + Send + Sync + 'static, { protocol: StreamProtocol, client: ClientBehaviour, @@ -47,11 +49,11 @@ where #[derive(Debug)] pub enum BitswapEvent { GetQueryResponse { - query_id: QueryId, + query_id: BitswapQueryId, data: Vec, }, GetQueryError { - query_id: QueryId, + query_id: BitswapQueryId, error: BitswapError, }, } @@ -61,8 +63,8 @@ pub enum BitswapError { #[error("Invalid multihash size")] InvalidMultihashSize, - #[error("Invalid protocol prefix")] - InvalidProtocolPrefix, + #[error("Invalid protocol prefix: {0}")] + InvalidProtocolPrefix(String), #[error("Blockstore error: {0}")] Blockstore(#[from] BlockstoreError), @@ -70,37 +72,19 @@ pub enum BitswapError { pub type Result = std::result::Result; -#[derive(Debug, Default)] -pub struct BitswapConfig -where - B: Blockstore + Send + Sync, -{ - /// The prefix that will be added on `/ipfs/bitswap/1.2.0`. It must start with `/`. - pub protocol_prefix: Option, - pub store: B, - pub client: ClientConfig, -} - impl BitswapBehaviour where B: Blockstore + Send + Sync + 'static, { - pub fn new(config: BitswapConfig) -> Result { - let store = Arc::new(config.store); - let protocol_prefix = config.protocol_prefix.as_deref(); - let protocol = stream_protocol(protocol_prefix, "/ipfs/bitswap/1.2.0")?; - - Ok(BitswapBehaviour { - protocol: protocol.clone(), - client: ClientBehaviour::new(config.client, store, protocol_prefix)?, - }) + pub fn builder(blockstore: B) -> BitswapBehaviourBuilder { + BitswapBehaviourBuilder::new(blockstore) } - pub fn get(&mut self, cid: &CidGeneric) -> QueryId { + pub fn get(&mut self, cid: &CidGeneric) -> BitswapQueryId { self.client.get(cid) } - pub fn cancel(&mut self, query_id: QueryId) { + pub fn cancel(&mut self, query_id: BitswapQueryId) { self.client.cancel(query_id) } } @@ -110,7 +94,7 @@ impl NetworkBehaviour where B: Blockstore + Send + Sync + 'static, { - type ConnectionHandler = Handler; + type ConnectionHandler = BitswapConnectionHandler; type ToSwarm = BitswapEvent; fn handle_established_inbound_connection( @@ -120,7 +104,7 @@ where _local_addr: &Multiaddr, _remote_addr: &Multiaddr, ) -> Result { - Ok(Handler { + Ok(BitswapConnectionHandler { peer, protocol: self.protocol.clone(), client_handler: self.client.new_connection_handler(peer), @@ -135,7 +119,7 @@ where _addr: &Multiaddr, _role_override: Endpoint, ) -> Result { - Ok(Handler { + Ok(BitswapConnectionHandler { peer, protocol: self.protocol.clone(), client_handler: self.client.new_connection_handler(peer), @@ -167,6 +151,7 @@ where } #[derive(Debug)] +#[doc(hidden)] pub enum ToBehaviourEvent { IncomingMessage(PeerId, Message), } @@ -178,14 +163,16 @@ pub enum ToHandlerEvent { } #[derive(Debug)] -pub struct Handler { +pub struct BitswapConnectionHandler { peer: PeerId, protocol: StreamProtocol, client_handler: ClientConnectionHandler, incoming_streams: SelectAll, } -impl ConnectionHandler for Handler { +impl ConnectionHandler + for BitswapConnectionHandler +{ type ToBehaviour = ToBehaviourEvent; type FromBehaviour = ToHandlerEvent; type InboundProtocol = ReadyUpgrade; diff --git a/src/multihasher.rs b/src/multihasher.rs new file mode 100644 index 0000000..ce643a6 --- /dev/null +++ b/src/multihasher.rs @@ -0,0 +1,62 @@ +use std::collections::VecDeque; +use std::fmt; + +use multihash::Multihash; +use multihash_codetable::MultihashDigest; + +use crate::utils::convert_multihash; + +pub trait Multihasher { + fn digest(&self, multihash_code: u64, input: &[u8]) -> Option>; +} + +/// Multihasher that uses [`multihash_codetable::Code`] +pub struct StandardMultihasher; + +impl Multihasher for StandardMultihasher { + fn digest(&self, multihash_code: u64, input: &[u8]) -> Option> { + let hash = multihash_codetable::Code::try_from(multihash_code) + .ok()? + .digest(input); + convert_multihash(&hash) + } +} + +pub(crate) struct MultihasherTable { + multihashers: VecDeque + Send + Sync + 'static>>, +} + +impl fmt::Debug for MultihasherTable { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("MultihasherTable { .. }") + } +} + +impl MultihasherTable { + pub(crate) fn new() -> Self { + let mut table = MultihasherTable { + multihashers: VecDeque::new(), + }; + + table.register(StandardMultihasher); + + table + } + + pub(crate) fn register(&mut self, multihasher: M) + where + M: Multihasher + Send + Sync + 'static, + { + self.multihashers.push_front(Box::new(multihasher)); + } + + pub(crate) fn digest(&self, multihash_code: u64, input: &[u8]) -> Option> { + for multihasher in &self.multihashers { + if let Some(hash) = multihasher.digest(multihash_code, input) { + return Some(hash); + } + } + + None + } +} diff --git a/src/utils.rs b/src/utils.rs index 27c8501..a23eb3c 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -4,14 +4,16 @@ use multihash::Multihash; use crate::{BitswapError, Result}; -pub(crate) fn convert_cid( +/// Helper utility that converts `CidGeneric` to `CidGeneric` +pub fn convert_cid( cid: &CidGeneric, ) -> Option> { let hash = convert_multihash(cid.hash())?; CidGeneric::new(cid.version(), cid.codec(), hash).ok() } -pub(crate) fn convert_multihash( +/// Helper utility that converts `Multihash` to `Multihash` +pub fn convert_multihash( hash: &Multihash, ) -> Option> { Multihash::::wrap(hash.code(), hash.digest()).ok() @@ -23,7 +25,7 @@ pub(crate) fn stream_protocol( ) -> Result { match prefix { Some(prefix) => StreamProtocol::try_from_owned(format!("{prefix}{protocol}")) - .map_err(|_| BitswapError::InvalidProtocolPrefix), + .map_err(|_| BitswapError::InvalidProtocolPrefix(prefix.to_owned())), None => Ok(StreamProtocol::new(protocol)), } }