Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add more docs and comments #19

Merged
merged 9 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
WIP
# beetswap

Rust implementation of [`Bitswap`] protocol for [`libp2p`].


[`Bitswap`]: https://specs.ipfs.tech/bitswap-protocolo/
[`libp2p`]: https://docs.rs/libp2p
70 changes: 47 additions & 23 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use blockstore::Blockstore;
use crate::client::{ClientBehaviour, ClientConfig};
use crate::multihasher::{Multihasher, MultihasherTable};
use crate::utils::stream_protocol;
use crate::{BitswapBehaviour, Result};
use crate::{BitswapBehaviour, BitswapError, Result};

/// Builder for [`BitswapBehaviour`].
///
Expand All @@ -14,10 +14,9 @@ use crate::{BitswapBehaviour, Result};
/// ```rust,no_run
/// # use blockstore::InMemoryBlockstore;
/// # use beetswap::BitswapBehaviour;
/// # fn main() -> beetswap::Result<()> {
/// BitswapBehaviour::<64, _>::builder(InMemoryBlockstore::<64>::new())
/// .build()?;
/// # Ok(())
/// # fn new() -> BitswapBehaviour<64, InMemoryBlockstore<64>> {
/// BitswapBehaviour::builder(InMemoryBlockstore::new())
/// .build()
/// # }
pub struct BitswapBehaviourBuilder<const S: usize, B>
where
Expand Down Expand Up @@ -47,39 +46,63 @@ where

/// Set a prefix on the protocol name.
///
/// The prefix will be added on `/ipfs/bitswap/1.2.0` and it must start with `/`.
/// The prefix will be added on `/ipfs/bitswap/1.2.0`.
///
/// # Errors
///
/// This function will return an error if `prefix` does not start with a forward slash (`/`).
///
/// # Example
///
/// ```rust,no_run
/// # use blockstore::InMemoryBlockstore;
/// # use beetswap::BitswapBehaviour;
/// # fn main() -> beetswap::Result<()> {
/// BitswapBehaviour::<64, _>::builder(InMemoryBlockstore::<64>::new())
/// .protocol_prefix("/celestia/celestia")
/// .build()?;
/// # Ok(())
/// # fn new() -> beetswap::Result<BitswapBehaviour<64, InMemoryBlockstore<64>>> {
/// # Ok(
/// BitswapBehaviour::builder(InMemoryBlockstore::new())
/// .protocol_prefix("/celestia/celestia")?
/// .build()
/// # )
/// # }
/// ```
pub fn protocol_prefix(mut self, prefix: &str) -> Self {
pub fn protocol_prefix(mut self, prefix: &str) -> Result<Self> {
if prefix.starts_with('/') {
return Err(BitswapError::InvalidProtocolPrefix(prefix.to_owned()));
}

self.protocol_prefix = Some(prefix.to_owned());
self
Ok(self)
}

/// Client will set `send_dont_have` flag on each query.
/// Client will set `send_dont_have` flag on each query (enabled by default).
///
/// # Example
///
/// ```rust,no_run
/// # use blockstore::InMemoryBlockstore;
/// # use beetswap::BitswapBehaviour;
/// # fn new() -> BitswapBehaviour<64, InMemoryBlockstore<64>> {
/// BitswapBehaviour::builder(InMemoryBlockstore::new())
/// .client_set_send_dont_have(false)
/// .build()
/// # }
pub fn client_set_send_dont_have(mut self, enable: bool) -> Self {
self.client.set_send_dont_have = enable;
self
}

/// Register extra [`Multihasher`].
/// Register an extra [`Multihasher`].
///
/// Every registration adds new hasher to [`BitswapBehaviour`]. Hashers are used to
/// reconstruct the [`Cid`] from the received data. `BitswapBehaviour` will try them
/// in the reverse order they were registered until some successfully constructs
oblique marked this conversation as resolved.
Show resolved Hide resolved
/// [`Multihash`].
///
/// Every registration adds new hasher to `BitswapBehavior`. Hashers are used to reconstruct the `Cid` from the received data.
/// `BitswapBehavior` will try them in the reverse order they were registered until some successfully constructs `Multihash`.
/// By default `BitswapBehaviourBuilder` is pre-loaded with [`StandardMultihasher`].

///
/// [`StandardMultihasher`]: crate::multihasher::StandardMultihasher
/// [`Cid`]: cid::CidGeneric
/// [`Multihash`]: libp2p_core::multihash::Multihash
pub fn register_multihasher<M>(mut self, multihasher: M) -> Self
where
M: Multihasher<S> + Send + Sync + 'static,
Expand All @@ -89,14 +112,15 @@ where
}

/// Build a [`BitswapBehaviour`].
pub fn build(self) -> Result<BitswapBehaviour<S, B>> {
pub fn build(self) -> BitswapBehaviour<S, B> {
let blockstore = Arc::new(self.blockstore);
let multihasher = Arc::new(self.multihasher);
let protocol_prefix = self.protocol_prefix.as_deref();

Ok(BitswapBehaviour {
protocol: stream_protocol(protocol_prefix, "/ipfs/bitswap/1.2.0")?,
client: ClientBehaviour::new(self.client, blockstore, multihasher, protocol_prefix)?,
})
BitswapBehaviour {
protocol: stream_protocol(protocol_prefix, "/ipfs/bitswap/1.2.0")
.expect("prefix checked by BitswapBehaviourBuilder::protocol_prefix"),
client: ClientBehaviour::new(self.client, blockstore, multihasher, protocol_prefix),
}
}
}
19 changes: 13 additions & 6 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::{BitswapError, BitswapEvent, Result, ToBehaviourEvent, ToHandlerEvent

const SEND_FULL_INTERVAL: Duration = Duration::from_secs(30);

/// ID of an ongoing query.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct BitswapQueryId(u64);

Expand Down Expand Up @@ -102,11 +103,12 @@ where
store: Arc<B>,
multihasher: Arc<MultihasherTable<S>>,
protocol_prefix: Option<&str>,
) -> Result<Self> {
let protocol = stream_protocol(protocol_prefix, "/ipfs/bitswap/1.2.0")?;
) -> Self {
let protocol = stream_protocol(protocol_prefix, "/ipfs/bitswap/1.2.0")
.expect("prefix checked by BitswapBehaviourBuilder::protocol_prefix");
let set_send_dont_have = config.set_send_dont_have;

Ok(ClientBehaviour {
ClientBehaviour {
store,
protocol,
queue: VecDeque::new(),
Expand All @@ -119,7 +121,7 @@ where
waker: Arc::new(AtomicWaker::new()),
multihasher,
send_full_timer: Delay::new(SEND_FULL_INTERVAL),
})
}
}

pub(crate) fn new_connection_handler(&mut self, peer: PeerId) -> ClientConnectionHandler<S> {
Expand Down Expand Up @@ -195,7 +197,11 @@ where
let query_id = self.next_query_id();

match convert_cid(cid) {
// Schedule an asynchronous get from blockstor. The result will provided
oblique marked this conversation as resolved.
Show resolved Hide resolved
// from `pool` and if CID is missing `pool` will query the network.
oblique marked this conversation as resolved.
Show resolved Hide resolved
Some(cid) => self.schedule_store_get(query_id, cid),
// In failure to convert CID an event with the error will be given to
// the requestor on the next `pool`.
None => {
self.queue
.push_back(ToSwarm::GenerateEvent(BitswapEvent::GetQueryError {
Expand All @@ -217,7 +223,8 @@ where
if let Some(pos) = queries.iter().position(|id| *id == query_id) {
queries.swap_remove(pos);

// If CID doesn't have any other queries requesting it, remove it completely
// If CID doesn't have any other queries requesting it, remove it completely.
// Cancel message will be send to the servers on from `pool`.
oblique marked this conversation as resolved.
Show resolved Hide resolved
if queries.is_empty() {
// Cancelling message will be generated from `pool` method
let cid = cid.to_owned();
Expand Down Expand Up @@ -918,7 +925,7 @@ mod tests {
async fn new_client() -> ClientBehaviour<64, InMemoryBlockstore<64>> {
let store = blockstore().await;
let multihasher = Arc::new(MultihasherTable::<64>::new());
ClientBehaviour::<64, _>::new(ClientConfig::default(), store, multihasher, None).unwrap()
ClientBehaviour::<64, _>::new(ClientConfig::default(), store, multihasher, None)
}

fn expect_send_wantlist_event(
Expand Down
15 changes: 15 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::proto::message::Message;
pub use crate::builder::BitswapBehaviourBuilder;
pub use crate::client::BitswapQueryId;

/// [`NetworkBehaviour`] for Bitswap protocol.
#[derive(Debug)]
pub struct BitswapBehaviour<const MAX_MULTIHASH_SIZE: usize, B>
where
Expand All @@ -44,6 +45,7 @@ where
client: ClientBehaviour<MAX_MULTIHASH_SIZE, B>,
}

/// Event produced by [`BitswapBehaviour`].
#[derive(Debug)]
pub enum BitswapEvent {
GetQueryResponse {
Expand All @@ -56,6 +58,7 @@ pub enum BitswapEvent {
},
}

/// Representation of all the errors that can occur when interacting with this crate.
#[derive(Debug, thiserror::Error)]
pub enum BitswapError {
#[error("Invalid multihash size")]
Expand All @@ -68,20 +71,31 @@ pub enum BitswapError {
Blockstore(#[from] BlockstoreError),
}

/// Alias for a [`Result`] with the error type [`BitswapError`].
pub type Result<T, E = BitswapError> = std::result::Result<T, E>;

impl<const MAX_MULTIHASH_SIZE: usize, B> BitswapBehaviour<MAX_MULTIHASH_SIZE, B>
where
B: Blockstore + Send + Sync + 'static,
{
/// Creates a new [`BitswapBehaviour`] with the default configuration.
pub fn new(blockstore: B) -> BitswapBehaviour<MAX_MULTIHASH_SIZE, B> {
BitswapBehaviourBuilder::new(blockstore).build()
}

/// Creates a new [`BitswapBehaviourBuilder`].
pub fn builder(blockstore: B) -> BitswapBehaviourBuilder<MAX_MULTIHASH_SIZE, B> {
BitswapBehaviourBuilder::new(blockstore)
}

/// Start a query that returns the raw data of a [`Cid`].
///
/// [`Cid`]: cid::CidGeneric
pub fn get<const S: usize>(&mut self, cid: &CidGeneric<S>) -> BitswapQueryId {
self.client.get(cid)
}

/// Cancel an ongoing query.
pub fn cancel(&mut self, query_id: BitswapQueryId) {
self.client.cancel(query_id)
}
Expand Down Expand Up @@ -169,6 +183,7 @@ pub enum ToHandlerEvent {
}

#[derive(Debug)]
#[doc(hidden)]
pub struct BitswapConnectionHandler<const MAX_MULTIHASH_SIZE: usize> {
peer: PeerId,
protocol: StreamProtocol,
Expand Down
3 changes: 2 additions & 1 deletion src/multihasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ use multihash_codetable::MultihashDigest;

use crate::utils::convert_multihash;

/// Trait for producing a custom [`Multihash`].
pub trait Multihasher<const S: usize> {
fn digest(&self, multihash_code: u64, input: &[u8]) -> Option<Multihash<S>>;
}

/// Multihasher that uses [`multihash_codetable::Code`]
/// [`Multihasher`] that uses [`multihash_codetable::Code`]
pub struct StandardMultihasher;

impl<const S: usize> Multihasher<S> for StandardMultihasher {
Expand Down
27 changes: 12 additions & 15 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,28 @@ use cid::CidGeneric;
use libp2p_core::multihash::Multihash;
use libp2p_swarm::StreamProtocol;

use crate::{BitswapError, Result};

/// Helper utility that converts `CidGeneric<INPUT_SIZE>` to `CidGeneric<OUTPUT_SIZE>`
pub fn convert_cid<const INPUT_S: usize, const OUTPUT_S: usize>(
cid: &CidGeneric<INPUT_S>,
) -> Option<CidGeneric<OUTPUT_S>> {
/// Helper utility that converts `CidGeneric<S>` to `CidGeneric<NEW_S>`
pub fn convert_cid<const S: usize, const NEW_S: usize>(
cid: &CidGeneric<S>,
) -> Option<CidGeneric<NEW_S>> {
let hash = convert_multihash(cid.hash())?;
CidGeneric::new(cid.version(), cid.codec(), hash).ok()
}

/// Helper utility that converts `Multihash<INPUT_SIZE>` to `Multihash<OUTPUT_SIZE>`
pub fn convert_multihash<const INPUT_S: usize, const OUTPUT_S: usize>(
hash: &Multihash<INPUT_S>,
) -> Option<Multihash<OUTPUT_S>> {
Multihash::<OUTPUT_S>::wrap(hash.code(), hash.digest()).ok()
/// Helper utility that converts `Multihash<S>` to `Multihash<NEW_S>`
pub fn convert_multihash<const S: usize, const NEW_S: usize>(
hash: &Multihash<S>,
) -> Option<Multihash<NEW_S>> {
Multihash::<NEW_S>::wrap(hash.code(), hash.digest()).ok()
}

pub(crate) fn stream_protocol(
prefix: Option<&str>,
protocol: &'static str,
) -> Result<StreamProtocol> {
) -> Option<StreamProtocol> {
match prefix {
Some(prefix) => StreamProtocol::try_from_owned(format!("{prefix}{protocol}"))
.map_err(|_| BitswapError::InvalidProtocolPrefix(prefix.to_owned())),
None => Ok(StreamProtocol::new(protocol)),
Some(prefix) => StreamProtocol::try_from_owned(format!("{prefix}{protocol}")).ok(),
None => Some(StreamProtocol::new(protocol)),
}
}

Expand Down
12 changes: 10 additions & 2 deletions src/wantlist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,22 @@ impl<const S: usize> WantlistState<S> {
// Update existing entries
for (cid, req_state) in self.req_state.iter_mut() {
match (wantlist.cids.contains(cid), *req_state) {
// If CID is not in wantlist but we got the block, remove it
// If CID is not in the wantlist that means we received
// its block from another peer. If we received a block
// from this peer too then we don't need to send cancel
oblique marked this conversation as resolved.
Show resolved Hide resolved
// message back.
(false, WantReqState::GotBlock) => {
// Remove CID request state
removed.push(cid.to_owned());
}

// If CID is not in wantlist, cancel it
// If CID is not in the wantlist that means we received
// its block from another peer. We need to send cancel
oblique marked this conversation as resolved.
Show resolved Hide resolved
// message to this peer.
(false, _) => {
// Remove CID request state
removed.push(cid.to_owned());
// Add a cancel mesage
entries.push(new_cancel_entry(cid));
}

Expand Down
Loading