Skip to content

Commit

Permalink
chore: Add more docs and comments (#19)
Browse files Browse the repository at this point in the history
* chore: Add more docs and comments

* Update src/client.rs

Co-authored-by: Maciej Zwoliński <[email protected]>

* add review suggestions

* Update src/builder.rs

Co-authored-by: Mikołaj Florkiewicz <[email protected]>

* Update src/client.rs

Co-authored-by: Mikołaj Florkiewicz <[email protected]>

* Update src/client.rs

Co-authored-by: Mikołaj Florkiewicz <[email protected]>

* Update src/wantlist.rs

Co-authored-by: Mikołaj Florkiewicz <[email protected]>

* Update src/wantlist.rs

Co-authored-by: Mikołaj Florkiewicz <[email protected]>

---------

Co-authored-by: Maciej Zwoliński <[email protected]>
Co-authored-by: Mikołaj Florkiewicz <[email protected]>
  • Loading branch information
3 people authored Jan 30, 2024
1 parent 6a1e3ca commit 914452f
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 56 deletions.
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
WIP
# beetswap

Rust implementation of [`Bitswap`] protocol for [`libp2p`].


[`Bitswap`]: https://specs.ipfs.tech/bitswap-protocolo/
[`libp2p`]: https://docs.rs/libp2p
70 changes: 47 additions & 23 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use blockstore::Blockstore;
use crate::client::{ClientBehaviour, ClientConfig};
use crate::multihasher::{Multihasher, MultihasherTable};
use crate::utils::stream_protocol;
use crate::{BitswapBehaviour, Result};
use crate::{BitswapBehaviour, BitswapError, Result};

/// Builder for [`BitswapBehaviour`].
///
Expand All @@ -14,10 +14,9 @@ use crate::{BitswapBehaviour, Result};
/// ```rust,no_run
/// # use blockstore::InMemoryBlockstore;
/// # use beetswap::BitswapBehaviour;
/// # fn main() -> beetswap::Result<()> {
/// BitswapBehaviour::<64, _>::builder(InMemoryBlockstore::<64>::new())
/// .build()?;
/// # Ok(())
/// # fn new() -> BitswapBehaviour<64, InMemoryBlockstore<64>> {
/// BitswapBehaviour::builder(InMemoryBlockstore::new())
/// .build()
/// # }
pub struct BitswapBehaviourBuilder<const S: usize, B>
where
Expand Down Expand Up @@ -47,39 +46,63 @@ where

/// Set a prefix on the protocol name.
///
/// The prefix will be added on `/ipfs/bitswap/1.2.0` and it must start with `/`.
/// The prefix will be added on `/ipfs/bitswap/1.2.0`.
///
/// # Errors
///
/// This function will return an error if `prefix` does not start with a forward slash (`/`).
///
/// # Example
///
/// ```rust,no_run
/// # use blockstore::InMemoryBlockstore;
/// # use beetswap::BitswapBehaviour;
/// # fn main() -> beetswap::Result<()> {
/// BitswapBehaviour::<64, _>::builder(InMemoryBlockstore::<64>::new())
/// .protocol_prefix("/celestia/celestia")
/// .build()?;
/// # Ok(())
/// # fn new() -> beetswap::Result<BitswapBehaviour<64, InMemoryBlockstore<64>>> {
/// # Ok(
/// BitswapBehaviour::builder(InMemoryBlockstore::new())
/// .protocol_prefix("/celestia/celestia")?
/// .build()
/// # )
/// # }
/// ```
pub fn protocol_prefix(mut self, prefix: &str) -> Self {
pub fn protocol_prefix(mut self, prefix: &str) -> Result<Self> {
if prefix.starts_with('/') {
return Err(BitswapError::InvalidProtocolPrefix(prefix.to_owned()));
}

self.protocol_prefix = Some(prefix.to_owned());
self
Ok(self)
}

/// Client will set `send_dont_have` flag on each query.
/// Client will set `send_dont_have` flag on each query (enabled by default).
///
/// # Example
///
/// ```rust,no_run
/// # use blockstore::InMemoryBlockstore;
/// # use beetswap::BitswapBehaviour;
/// # fn new() -> BitswapBehaviour<64, InMemoryBlockstore<64>> {
/// BitswapBehaviour::builder(InMemoryBlockstore::new())
/// .client_set_send_dont_have(false)
/// .build()
/// # }
pub fn client_set_send_dont_have(mut self, enable: bool) -> Self {
self.client.set_send_dont_have = enable;
self
}

/// Register extra [`Multihasher`].
/// Register an extra [`Multihasher`].
///
/// Every registration adds new hasher to [`BitswapBehaviour`]. Hashers are used to
/// reconstruct the [`Cid`] from the received data. `BitswapBehaviour` will try them
/// in the reverse order they were registered until one successfully constructs
/// [`Multihash`].
///
/// Every registration adds new hasher to `BitswapBehavior`. Hashers are used to reconstruct the `Cid` from the received data.
/// `BitswapBehavior` will try them in the reverse order they were registered until some successfully constructs `Multihash`.
/// By default `BitswapBehaviourBuilder` is pre-loaded with [`StandardMultihasher`].
///
/// [`StandardMultihasher`]: crate::multihasher::StandardMultihasher
/// [`Cid`]: cid::CidGeneric
/// [`Multihash`]: libp2p_core::multihash::Multihash
pub fn register_multihasher<M>(mut self, multihasher: M) -> Self
where
M: Multihasher<S> + Send + Sync + 'static,
Expand All @@ -89,14 +112,15 @@ where
}

/// Build a [`BitswapBehaviour`].
pub fn build(self) -> Result<BitswapBehaviour<S, B>> {
pub fn build(self) -> BitswapBehaviour<S, B> {
let blockstore = Arc::new(self.blockstore);
let multihasher = Arc::new(self.multihasher);
let protocol_prefix = self.protocol_prefix.as_deref();

Ok(BitswapBehaviour {
protocol: stream_protocol(protocol_prefix, "/ipfs/bitswap/1.2.0")?,
client: ClientBehaviour::new(self.client, blockstore, multihasher, protocol_prefix)?,
})
BitswapBehaviour {
protocol: stream_protocol(protocol_prefix, "/ipfs/bitswap/1.2.0")
.expect("prefix checked by BitswapBehaviourBuilder::protocol_prefix"),
client: ClientBehaviour::new(self.client, blockstore, multihasher, protocol_prefix),
}
}
}
23 changes: 15 additions & 8 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::{BitswapError, BitswapEvent, Result, ToBehaviourEvent, ToHandlerEvent

const SEND_FULL_INTERVAL: Duration = Duration::from_secs(30);

/// ID of an ongoing query.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct BitswapQueryId(u64);

Expand Down Expand Up @@ -102,11 +103,12 @@ where
store: Arc<B>,
multihasher: Arc<MultihasherTable<S>>,
protocol_prefix: Option<&str>,
) -> Result<Self> {
let protocol = stream_protocol(protocol_prefix, "/ipfs/bitswap/1.2.0")?;
) -> Self {
let protocol = stream_protocol(protocol_prefix, "/ipfs/bitswap/1.2.0")
.expect("prefix checked by BitswapBehaviourBuilder::protocol_prefix");
let set_send_dont_have = config.set_send_dont_have;

Ok(ClientBehaviour {
ClientBehaviour {
store,
protocol,
queue: VecDeque::new(),
Expand All @@ -119,7 +121,7 @@ where
waker: Arc::new(AtomicWaker::new()),
multihasher,
send_full_timer: Delay::new(SEND_FULL_INTERVAL),
})
}
}

pub(crate) fn new_connection_handler(&mut self, peer: PeerId) -> ClientConnectionHandler<S> {
Expand Down Expand Up @@ -167,7 +169,7 @@ where
self.tasks.push(
async move {
match Abortable::new(store.get(&cid), reg).await {
// ..And continue the procedure in `pool`. Missing CID will be handled there.
// ..And continue the procedure in `poll`. Missing CID will be handled there.
Ok(res) => TaskResult::Get(query_id, cid, res),
Err(_) => TaskResult::Cancelled,
}
Expand Down Expand Up @@ -195,7 +197,11 @@ where
let query_id = self.next_query_id();

match convert_cid(cid) {
// Schedule an asynchronous get from the blockstore. The result will be provided
// from `poll` and if CID is missing `poll` will query the network.
Some(cid) => self.schedule_store_get(query_id, cid),
// If CID conversion fails, an event with the error will be given to
// the requestor on the next `poll`.
None => {
self.queue
.push_back(ToSwarm::GenerateEvent(BitswapEvent::GetQueryError {
Expand All @@ -217,9 +223,10 @@ where
if let Some(pos) = queries.iter().position(|id| *id == query_id) {
queries.swap_remove(pos);

// If CID doesn't have any other queries requesting it, remove it completely
// If CID doesn't have any other queries requesting it, remove it completely.
// Cancel message will be send to the servers from `poll`.
if queries.is_empty() {
// Cancelling message will be generated from `pool` method
// Cancelling message will be generated from `poll` method
let cid = cid.to_owned();
self.cid_to_queries.remove(&cid);
self.wantlist.remove(&cid);
Expand Down Expand Up @@ -918,7 +925,7 @@ mod tests {
async fn new_client() -> ClientBehaviour<64, InMemoryBlockstore<64>> {
let store = blockstore().await;
let multihasher = Arc::new(MultihasherTable::<64>::new());
ClientBehaviour::<64, _>::new(ClientConfig::default(), store, multihasher, None).unwrap()
ClientBehaviour::<64, _>::new(ClientConfig::default(), store, multihasher, None)
}

fn expect_send_wantlist_event(
Expand Down
15 changes: 15 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::proto::message::Message;
pub use crate::builder::BitswapBehaviourBuilder;
pub use crate::client::BitswapQueryId;

/// [`NetworkBehaviour`] for Bitswap protocol.
#[derive(Debug)]
pub struct BitswapBehaviour<const MAX_MULTIHASH_SIZE: usize, B>
where
Expand All @@ -44,6 +45,7 @@ where
client: ClientBehaviour<MAX_MULTIHASH_SIZE, B>,
}

/// Event produced by [`BitswapBehaviour`].
#[derive(Debug)]
pub enum BitswapEvent {
GetQueryResponse {
Expand All @@ -56,6 +58,7 @@ pub enum BitswapEvent {
},
}

/// Representation of all the errors that can occur when interacting with this crate.
#[derive(Debug, thiserror::Error)]
pub enum BitswapError {
#[error("Invalid multihash size")]
Expand All @@ -68,20 +71,31 @@ pub enum BitswapError {
Blockstore(#[from] BlockstoreError),
}

/// Alias for a [`Result`] with the error type [`BitswapError`].
pub type Result<T, E = BitswapError> = std::result::Result<T, E>;

impl<const MAX_MULTIHASH_SIZE: usize, B> BitswapBehaviour<MAX_MULTIHASH_SIZE, B>
where
B: Blockstore + Send + Sync + 'static,
{
/// Creates a new [`BitswapBehaviour`] with the default configuration.
pub fn new(blockstore: B) -> BitswapBehaviour<MAX_MULTIHASH_SIZE, B> {
BitswapBehaviourBuilder::new(blockstore).build()
}

/// Creates a new [`BitswapBehaviourBuilder`].
pub fn builder(blockstore: B) -> BitswapBehaviourBuilder<MAX_MULTIHASH_SIZE, B> {
BitswapBehaviourBuilder::new(blockstore)
}

/// Start a query that returns the raw data of a [`Cid`].
///
/// [`Cid`]: cid::CidGeneric
pub fn get<const S: usize>(&mut self, cid: &CidGeneric<S>) -> BitswapQueryId {
self.client.get(cid)
}

/// Cancel an ongoing query.
pub fn cancel(&mut self, query_id: BitswapQueryId) {
self.client.cancel(query_id)
}
Expand Down Expand Up @@ -169,6 +183,7 @@ pub enum ToHandlerEvent {
}

#[derive(Debug)]
#[doc(hidden)]
pub struct BitswapConnectionHandler<const MAX_MULTIHASH_SIZE: usize> {
peer: PeerId,
protocol: StreamProtocol,
Expand Down
3 changes: 2 additions & 1 deletion src/multihasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ use multihash_codetable::MultihashDigest;

use crate::utils::convert_multihash;

/// Trait for producing a custom [`Multihash`].
pub trait Multihasher<const S: usize> {
fn digest(&self, multihash_code: u64, input: &[u8]) -> Option<Multihash<S>>;
}

/// Multihasher that uses [`multihash_codetable::Code`]
/// [`Multihasher`] that uses [`multihash_codetable::Code`]
pub struct StandardMultihasher;

impl<const S: usize> Multihasher<S> for StandardMultihasher {
Expand Down
27 changes: 12 additions & 15 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,28 @@ use cid::CidGeneric;
use libp2p_core::multihash::Multihash;
use libp2p_swarm::StreamProtocol;

use crate::{BitswapError, Result};

/// Helper utility that converts `CidGeneric<INPUT_SIZE>` to `CidGeneric<OUTPUT_SIZE>`
pub fn convert_cid<const INPUT_S: usize, const OUTPUT_S: usize>(
cid: &CidGeneric<INPUT_S>,
) -> Option<CidGeneric<OUTPUT_S>> {
/// Helper utility that converts `CidGeneric<S>` to `CidGeneric<NEW_S>`
pub fn convert_cid<const S: usize, const NEW_S: usize>(
cid: &CidGeneric<S>,
) -> Option<CidGeneric<NEW_S>> {
let hash = convert_multihash(cid.hash())?;
CidGeneric::new(cid.version(), cid.codec(), hash).ok()
}

/// Helper utility that converts `Multihash<INPUT_SIZE>` to `Multihash<OUTPUT_SIZE>`
pub fn convert_multihash<const INPUT_S: usize, const OUTPUT_S: usize>(
hash: &Multihash<INPUT_S>,
) -> Option<Multihash<OUTPUT_S>> {
Multihash::<OUTPUT_S>::wrap(hash.code(), hash.digest()).ok()
/// Helper utility that converts `Multihash<S>` to `Multihash<NEW_S>`
pub fn convert_multihash<const S: usize, const NEW_S: usize>(
hash: &Multihash<S>,
) -> Option<Multihash<NEW_S>> {
Multihash::<NEW_S>::wrap(hash.code(), hash.digest()).ok()
}

pub(crate) fn stream_protocol(
prefix: Option<&str>,
protocol: &'static str,
) -> Result<StreamProtocol> {
) -> Option<StreamProtocol> {
match prefix {
Some(prefix) => StreamProtocol::try_from_owned(format!("{prefix}{protocol}"))
.map_err(|_| BitswapError::InvalidProtocolPrefix(prefix.to_owned())),
None => Ok(StreamProtocol::new(protocol)),
Some(prefix) => StreamProtocol::try_from_owned(format!("{prefix}{protocol}")).ok(),
None => Some(StreamProtocol::new(protocol)),
}
}

Expand Down
12 changes: 10 additions & 2 deletions src/wantlist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,22 @@ impl<const S: usize> WantlistState<S> {
// Update existing entries
for (cid, req_state) in self.req_state.iter_mut() {
match (wantlist.cids.contains(cid), *req_state) {
// If CID is not in wantlist but we got the block, remove it
// If CID is not in the wantlist that means we received
// its block from another peer. If we received a block
// from this peer too then we don't need to send a cancel
// message back.
(false, WantReqState::GotBlock) => {
// Remove CID request state
removed.push(cid.to_owned());
}

// If CID is not in wantlist, cancel it
// If CID is not in the wantlist that means we received
// its block from another peer. We need to send a cancel
// message to this peer.
(false, _) => {
// Remove CID request state
removed.push(cid.to_owned());
// Add a cancel mesage
entries.push(new_cancel_entry(cid));
}

Expand Down

0 comments on commit 914452f

Please sign in to comment.