Skip to content

Commit

Permalink
chore: Upgrade to blockstore 0.4.0 (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
oblique authored Apr 4, 2024
1 parent 11ea91b commit 56ba3a1
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 34 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ rust-version = "1.75"

[dependencies]
asynchronous-codec = "0.7"
blockstore = "0.3"
blockstore = "0.4"
bytes = "1"
cid = "0.11"
fnv = "1.0.5"
Expand Down
36 changes: 15 additions & 21 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use asynchronous_codec::FramedWrite;
use blockstore::{Blockstore, BlockstoreError};
use cid::CidGeneric;
use fnv::FnvHashMap;
use futures::future::{AbortHandle, Abortable, BoxFuture};
use futures::future::{AbortHandle, Abortable};
use futures::stream::FuturesUnordered;
use futures::task::AtomicWaker;
use futures::{FutureExt, SinkExt, StreamExt};
Expand All @@ -26,7 +26,7 @@ use crate::incoming_stream::ClientMessage;
use crate::message::Codec;
use crate::proto::message::mod_Message::{BlockPresenceType, Wantlist as ProtoWantlist};
use crate::proto::message::Message;
use crate::utils::{convert_cid, stream_protocol};
use crate::utils::{box_future, convert_cid, stream_protocol, BoxFuture};
use crate::wantlist::{Wantlist, WantlistState};
use crate::StreamRequester;
use crate::{Error, Event, Result, ToBehaviourEvent, ToHandlerEvent};
Expand Down Expand Up @@ -162,16 +162,13 @@ where
let (handle, reg) = AbortHandle::new_pair();

// Try to asynchronously get the CID from the store..
self.tasks.push(
async move {
match Abortable::new(store.get(&cid), reg).await {
// ..And continue the procedure in `poll`. Missing CID will be handled there.
Ok(res) => TaskResult::Get(query_id, cid, res),
Err(_) => TaskResult::Cancelled,
}
self.tasks.push(box_future(async move {
match Abortable::new(store.get(&cid), reg).await {
// ..And continue the procedure in `poll`. Missing CID will be handled there.
Ok(res) => TaskResult::Get(query_id, cid, res),
Err(_) => TaskResult::Cancelled,
}
.boxed(),
);
}));

self.query_abort_handle.insert(query_id, handle);
}
Expand All @@ -180,16 +177,13 @@ where
fn schedule_store_put_many(&mut self, blocks: Vec<(CidGeneric<S>, Vec<u8>)>) {
let store = self.store.clone();

self.tasks.push(
async move {
let res = store
.put_many_keyed(blocks.clone().into_iter())
.await
.map(|_| blocks);
TaskResult::Set(res)
}
.boxed(),
);
self.tasks.push(box_future(async move {
let res = store
.put_many_keyed(blocks.clone().into_iter())
.await
.map(|_| blocks);
TaskResult::Set(res)
}));
}

pub(crate) fn get<const CS: usize>(&mut self, cid: &CidGeneric<CS>) -> QueryId {
Expand Down
16 changes: 6 additions & 10 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ use asynchronous_codec::FramedWrite;
use blockstore::{Blockstore, BlockstoreError};
use cid::CidGeneric;
use fnv::{FnvHashMap, FnvHashSet};
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, SinkExt, StreamExt};
use futures::{SinkExt, StreamExt};
use libp2p_core::upgrade::ReadyUpgrade;
use libp2p_identity::PeerId;
use libp2p_swarm::{
Expand All @@ -24,7 +23,7 @@ use crate::message::Codec;
use crate::proto::message::{
mod_Message::Block as ProtoBlock, mod_Message::Wantlist as ProtoWantlist, Message,
};
use crate::utils::stream_protocol;
use crate::utils::{box_future, stream_protocol, BoxFuture};
use crate::{Event, Result, StreamRequester, ToBehaviourEvent, ToHandlerEvent};

type Sink = FramedWrite<libp2p_swarm::Stream, Codec>;
Expand Down Expand Up @@ -153,13 +152,10 @@ where
fn schedule_store_get(&mut self, peer: Arc<PeerId>, cids: Vec<CidGeneric<S>>) {
let store = self.store.clone();

self.tasks.push(
async move {
let result = get_multiple_cids_from_store(store, cids).await;
TaskResult::Get(peer, result)
}
.boxed(),
);
self.tasks.push(box_future(async move {
let result = get_multiple_cids_from_store(store, cids).await;
TaskResult::Get(peer, result)
}));
}

fn cancel_request(&mut self, peer: Arc<PeerId>, cid: CidGeneric<S>) {
Expand Down
25 changes: 25 additions & 0 deletions src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
//! Helpers used in and provided by the crate
use std::future::Future;

use cid::CidGeneric;
use futures::FutureExt;
use libp2p_core::multihash::Multihash;
use libp2p_swarm::StreamProtocol;

Expand Down Expand Up @@ -29,6 +32,28 @@ pub(crate) fn stream_protocol(
}
}

#[cfg(not(target_arch = "wasm32"))]
pub(crate) type BoxFuture<'a, T> = futures::future::BoxFuture<'a, T>;

#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn box_future<'a, F, T>(f: F) -> BoxFuture<'a, T>
where
F: Future<Output = T> + Send + 'a,
{
f.boxed()
}

#[cfg(target_arch = "wasm32")]
pub(crate) type BoxFuture<'a, T> = futures::future::LocalBoxFuture<'a, T>;

#[cfg(target_arch = "wasm32")]
pub(crate) fn box_future<'a, F, T>(f: F) -> BoxFuture<'a, T>
where
F: Future<Output = T> + Sized + 'a,
{
f.boxed_local()
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 56ba3a1

Please sign in to comment.