From 58671a49ed3f809e18c8482e00298c3eaa342314 Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Wed, 3 Jul 2024 21:18:53 -0400 Subject: [PATCH] refactor: Remove sessions and redundant code (#255) --- CHANGELOG.md | 1 + Cargo.lock | 3 +- Cargo.toml | 2 - examples/dag_creation.rs | 5 +- src/dag.rs | 42 +++---------- src/lib.rs | 4 +- src/refs.rs | 2 +- src/repo/mod.rs | 32 +++------- src/task.rs | 24 ++------ src/unixfs/add.rs | 65 +++++++++----------- src/unixfs/cat.rs | 130 ++++++++++++++------------------------- src/unixfs/get.rs | 72 ++++++++-------------- src/unixfs/ls.rs | 23 ++----- src/unixfs/mod.rs | 15 +++-- unixfs/Cargo.toml | 2 +- 15 files changed, 144 insertions(+), 278 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c51cd116..6bf13e002 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - refactor: Remove redundant async signature. [PR 247](https://github.com/dariusc93/rust-ipfs/pull/247) - refactor: Add `Serialize` for {Ipfs, IpldDag)::put_dag. [PR 249](https://github.com/dariusc93/rust-ipfs/pull/249) - feat: Add `Ipfs::{dhy_get_providers,dht_provide)`, supporting `RecordKey` directly. [PR 250](https://github.com/dariusc93/rust-ipfs/pull/250) +- refactor: Remove sessions and redundant code. [PR 255](https://github.com/dariusc93/rust-ipfs/pull/255) # 0.11.20 - feat: Add Ipfs::{add,remove}_external_address. diff --git a/Cargo.lock b/Cargo.lock index df2a8076e..af37b075a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4782,7 +4782,6 @@ dependencies = [ "unsigned-varint 0.7.2", "void", "wasm-bindgen-futures", - "wasm-timer", "web-time", "zeroize", ] @@ -4808,7 +4807,7 @@ dependencies = [ name = "rust-unixfs" version = "0.4.1" dependencies = [ - "criterion 0.4.0", + "criterion 0.5.1", "either", "filetime", "hex-literal", diff --git a/Cargo.toml b/Cargo.toml index 3555c3410..c1f25dab6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,7 +83,6 @@ tokio-util = { default-features = false, version = "0.7" } unsigned-varint = { version = "0.7.1", features = ["asynchronous_codec"] } void = { default-features = false, version = "1" } wasm-bindgen-futures = { version = "0.4" } -wasm-timer = "0.2" web-time = "1.1.0" zeroize = "1" @@ -125,7 +124,6 @@ tracing = { default-features = false, features = ["log"], workspace = true } tracing-futures = { workspace = true } unsigned-varint.workspace = true void = { default-features = false, workspace = true } -wasm-timer.workspace = true web-time.workspace = true zeroize.workspace = true diff --git a/examples/dag_creation.rs b/examples/dag_creation.rs index f9df6434d..e7bc89c01 100644 --- a/examples/dag_creation.rs +++ b/examples/dag_creation.rs @@ -8,10 +8,7 @@ async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); // Initialize the repo and start a daemon - let ipfs: Ipfs = UninitializedIpfs::new() - .add_listening_addr("/ip4/0.0.0.0/tcp/0".parse()?) - .start() - .await?; + let ipfs: Ipfs = UninitializedIpfs::new().start().await?; // Create a DAG let cid1 = ipfs.put_dag(ipld!("block1")).await?; diff --git a/src/dag.rs b/src/dag.rs index 3fd903594..bc5af6f32 100644 --- a/src/dag.rs +++ b/src/dag.rs @@ -227,9 +227,8 @@ impl IpldDag { DagGet::new(self.clone()) } - pub(crate) async fn get_with_session( + pub(crate) async fn _get( &self, - session: Option, path: IpfsPath, providers: &[PeerId], local_only: bool, @@ -256,9 +255,7 @@ impl IpldDag { let mut iter = resolved_path.iter().peekable(); let (node, _) = match self - .resolve0( - session, cid, &mut iter, true, providers, local_only, timeout, - ) + .resolve0(cid, &mut iter, true, providers, local_only, timeout) .await { Ok(t) => t, @@ -289,13 +286,12 @@ impl IpldDag { providers: &[PeerId], local_only: bool, ) -> Result<(ResolvedNode, SlashedPath), ResolveError> { - self.resolve_with_session(None, path, follow_links, providers, local_only, None) + self._resolve(path, follow_links, providers, local_only, None) .await } - pub(crate) async fn resolve_with_session( + pub(crate) async fn _resolve( &self, - session: Option, path: IpfsPath, follow_links: bool, providers: &[PeerId], @@ -323,15 +319,7 @@ impl IpldDag { let (node, matched_segments) = { let mut iter = resolved_path.iter().peekable(); match self - .resolve0( - session, - cid, - &mut iter, - follow_links, - providers, - local_only, - timeout, - ) + .resolve0(cid, &mut iter, follow_links, providers, local_only, timeout) .await { Ok(t) => t, @@ -354,7 +342,6 @@ impl IpldDag { #[allow(clippy::too_many_arguments)] async fn resolve0<'a>( &self, - session: Option, cid: &Cid, segments: &mut Peekable>, follow_links: bool, @@ -372,7 +359,7 @@ impl IpldDag { loop { let block = match self .repo - .get_block_with_session(session, ¤t, providers, local_only, timeout) + ._get_block(¤t, providers, local_only, timeout) .await { Ok(block) => block, @@ -443,7 +430,6 @@ impl IpldDag { #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct DagGet { dag_ipld: IpldDag, - session: Option, path: Option, providers: Vec, local: bool, @@ -455,7 +441,6 @@ impl DagGet { pub fn new(dag: IpldDag) -> Self { Self { dag_ipld: dag, - session: None, path: None, providers: vec![], local: false, @@ -464,13 +449,6 @@ impl DagGet { } } - /// Bitswap session - #[allow(dead_code)] - pub(crate) fn session(mut self, session: u64) -> Self { - self.session = Some(session); - self - } - /// Path to object pub fn path>(mut self, path: P) -> Self { let path = path.into(); @@ -535,13 +513,7 @@ impl std::future::IntoFuture for DagGet { async move { let path = self.path.ok_or(ResolveError::PathNotProvided)?; self.dag_ipld - .get_with_session( - self.session, - path, - &self.providers, - self.local, - self.timeout, - ) + ._get(path, &self.providers, self.local, self.timeout) .await } .instrument(span) diff --git a/src/lib.rs b/src/lib.rs index 99b26393f..345dcea76 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -75,7 +75,7 @@ use std::{ fmt, ops::{Deref, DerefMut}, path::Path, - sync::{atomic::AtomicU64, Arc}, + sync::Arc, time::Duration, }; @@ -126,8 +126,6 @@ use libp2p::{ pub use libp2p_connection_limits::ConnectionLimits; use serde::Serialize; -pub(crate) static BITSWAP_ID: AtomicU64 = AtomicU64::new(1); - #[allow(dead_code)] #[deprecated(note = "Use `StoreageType` instead")] type StoragePath = StorageType; diff --git a/src/refs.rs b/src/refs.rs index 104505b3f..01af9149a 100644 --- a/src/refs.rs +++ b/src/refs.rs @@ -224,7 +224,7 @@ where let borrowed = repo.borrow(); let block = if download_blocks { - match borrowed.get_block_with_session(None, &cid, &providers, !download_blocks, timeout).await { + match borrowed._get_block(&cid, &providers, !download_blocks, timeout).await { Ok(block) => block, Err(e) => { warn!("failed to load {}, linked from {}: {}", cid, source, e); diff --git a/src/repo/mod.rs b/src/repo/mod.rs index 7d528ea44..77663157f 100644 --- a/src/repo/mod.rs +++ b/src/repo/mod.rs @@ -345,7 +345,7 @@ pub(crate) struct RepoInner { #[derive(Debug)] pub enum RepoEvent { /// Signals a desired block. - WantBlock(Option, Vec, Vec), + WantBlock(Vec, Vec), /// Signals a desired block is no longer wanted. UnwantBlock(Cid), /// Signals the posession of a new block. @@ -616,8 +616,7 @@ impl Repo { peers: &[PeerId], local_only: bool, ) -> Result { - self.get_block_with_session(None, cid, peers, local_only, None) - .await + self._get_block(cid, peers, local_only, None).await } /// Retrives a set of blocks from the block store, or starts fetching them from the network and awaits @@ -629,8 +628,7 @@ impl Repo { peers: &[PeerId], local_only: bool, ) -> Result>, Error> { - self.get_blocks_with_session(None, cids, peers, local_only, None) - .await + self._get_blocks(cids, peers, local_only, None).await } /// Get the size of listed blocks @@ -645,9 +643,8 @@ impl Repo { self.inner.block_store.total_size().await } - pub(crate) async fn get_blocks_with_session( + pub(crate) async fn _get_blocks( &self, - session: impl Into>, cids: &[Cid], peers: &[PeerId], local_only: bool, @@ -713,29 +710,22 @@ impl Repo { } events - .send(RepoEvent::WantBlock( - session.into(), - missing, - peers.to_vec(), - )) + .send(RepoEvent::WantBlock(missing, peers.to_vec())) .await .ok(); Ok(blocks.boxed()) } - pub(crate) async fn get_block_with_session( + pub(crate) async fn _get_block( &self, - session: impl Into>, cid: &Cid, peers: &[PeerId], local_only: bool, timeout: impl Into>, ) -> Result { let cids = vec![*cid]; - let mut blocks = self - .get_blocks_with_session(session, &cids, peers, local_only, timeout) - .await?; + let mut blocks = self._get_blocks(&cids, peers, local_only, timeout).await?; blocks .next() @@ -1124,9 +1114,7 @@ impl std::future::IntoFuture for RepoFetch { async move { // Although getting a block adds a guard, we will add a read guard here a head of time so we can hold it throughout this future let _g = repo.inner.gclock.read().await; - let block = repo - .get_block_with_session(None, &cid, &providers, false, timeout) - .await?; + let block = repo._get_block(&cid, &providers, false, timeout).await?; if !recursive { return Ok(()); @@ -1255,9 +1243,7 @@ impl std::future::IntoFuture for RepoInsertPin { async move { // Although getting a block adds a guard, we will add a read guard here a head of time so we can hold it throughout this future let _g = repo.inner.gclock.read().await; - let block = repo - .get_block_with_session(None, &cid, &providers, local, timeout) - .await?; + let block = repo._get_block(&cid, &providers, local, timeout).await?; if !recursive { repo.insert_direct_pin(&cid).await? diff --git a/src/task.rs b/src/task.rs index e95b1c09b..30cb32ce3 100644 --- a/src/task.rs +++ b/src/task.rs @@ -12,13 +12,12 @@ use futures::{ use crate::TSwarmEvent; use crate::{p2p::MultiaddrExt, Channel, InnerPubsubEvent}; -use wasm_timer::Interval; - use std::{ collections::{hash_map::Entry, HashMap, HashSet}, time::Duration, }; +use futures_timer::Delay; use std::pin::Pin; use std::task::{Context, Poll}; @@ -60,15 +59,12 @@ pub(crate) struct IpfsTask> { pub(crate) from_facade: Fuse>, pub(crate) listening_addresses: HashMap>, pub(crate) provider_stream: HashMap>, - pub(crate) bitswap_provider_stream: - HashMap, String>>>, pub(crate) record_stream: HashMap>, pub(crate) repo: Repo, pub(crate) kad_subscriptions: HashMap>, pub(crate) dht_peer_lookup: HashMap>>, pub(crate) bootstraps: HashSet, pub(crate) swarm_event: Option>, - pub(crate) bitswap_sessions: HashMap, pub(crate) pubsub_event_stream: Vec>, pub(crate) timer: TaskTimer, pub(crate) local_external_addr: bool, @@ -96,10 +92,8 @@ impl> IpfsTask { from_facade, swarm, provider_stream: HashMap::new(), - bitswap_provider_stream: Default::default(), record_stream: HashMap::new(), dht_peer_lookup: Default::default(), - bitswap_sessions: Default::default(), pubsub_event_stream: Default::default(), kad_subscriptions: Default::default(), repo: repo.clone(), @@ -121,12 +115,12 @@ impl> IpfsTask { } pub(crate) struct TaskTimer { - pub(crate) event_cleanup: Interval, + pub(crate) event_cleanup: Delay, } impl Default for TaskTimer { fn default() -> Self { - let event_cleanup = Interval::new(Duration::from_secs(60)); + let event_cleanup = Delay::new(Duration::from_secs(60)); Self { event_cleanup } } @@ -158,8 +152,9 @@ impl> futures::Future for IpfsTask } } - if self.timer.event_cleanup.poll_next_unpin(cx).is_ready() { + if self.timer.event_cleanup.poll_unpin(cx).is_ready() { self.pubsub_event_stream.retain(|ch| !ch.is_closed()); + self.timer.event_cleanup.reset(Duration::from_secs(60)); } Poll::Pending @@ -168,7 +163,6 @@ impl> futures::Future for IpfsTask impl> IpfsTask { pub(crate) async fn run(&mut self) { - let mut session_cleanup = futures_timer::Delay::new(Duration::from_secs(5 * 60)); let mut event_cleanup = futures_timer::Delay::new(Duration::from_secs(60)); loop { @@ -190,9 +184,6 @@ impl> IpfsTask { self.pubsub_event_stream.retain(|ch| !ch.is_closed()); event_cleanup.reset(Duration::from_secs(60)); } - _ = &mut session_cleanup => { - session_cleanup.reset(Duration::from_secs(5 * 60)); - } } } } @@ -435,9 +426,6 @@ impl> IpfsTask { if let Some(tx) = self.provider_stream.remove(&id) { tx.close_channel(); } - if let Some(tx) = self.bitswap_provider_stream.remove(&id) { - drop(tx); - } } } GetProviders(Err(GetProvidersError::Timeout { key, .. })) => { @@ -1553,7 +1541,7 @@ impl> IpfsTask { fn handle_repo_event(&mut self, event: RepoEvent) { match event { - RepoEvent::WantBlock(_, cids, peers) => { + RepoEvent::WantBlock(cids, peers) => { let Some(bs) = self.swarm.behaviour_mut().bitswap.as_mut() else { return; }; diff --git a/src/unixfs/add.rs b/src/unixfs/add.rs index b06fa5392..e14034d3b 100644 --- a/src/unixfs/add.rs +++ b/src/unixfs/add.rs @@ -1,7 +1,7 @@ -use std::{ - path::{Path, PathBuf}, - task::Poll, -}; +use std::task::Poll; + +#[cfg(not(target_arch = "wasm32"))] +use std::path::{Path, PathBuf}; use crate::{repo::Repo, Block}; use bytes::Bytes; @@ -19,30 +19,33 @@ use tracing::{Instrument, Span}; use crate::{Ipfs, IpfsPath}; -use super::{StatusStreamState, UnixfsStatus}; +use super::UnixfsStatus; pub enum AddOpt { + #[cfg(not(target_arch = "wasm32"))] File(PathBuf), Stream { name: Option, total: Option, - stream: BoxStream<'static, std::result::Result>, + stream: BoxStream<'static, Result>, }, } +#[cfg(not(target_arch = "wasm32"))] impl From for AddOpt { fn from(path: PathBuf) -> Self { AddOpt::File(path) } } +#[cfg(not(target_arch = "wasm32"))] impl From<&Path> for AddOpt { fn from(path: &Path) -> Self { AddOpt::File(path.to_path_buf()) } } -#[must_use = "do nothing unless you `.await` or poll the stream"] +#[must_use = "does nothing unless you `.await` or poll the stream"] pub struct UnixfsAdd { core: Option>, opt: Option, @@ -51,7 +54,7 @@ pub struct UnixfsAdd { pin: bool, provide: bool, wrap: bool, - stream: StatusStreamState, + stream: Option>, } impl UnixfsAdd { @@ -73,7 +76,7 @@ impl UnixfsAdd { pin: true, provide: false, wrap: false, - stream: StatusStreamState::None, + stream: None, } } @@ -111,7 +114,7 @@ impl Stream for UnixfsAdd { ) -> std::task::Poll> { loop { match &mut self.stream { - StatusStreamState::None => { + None => { let (ipfs, repo) = match self.core.take().expect("ipfs or repo is used") { Either::Left(ipfs) => { let repo = ipfs.repo().clone(); @@ -148,11 +151,6 @@ impl Stream for UnixfsAdd { return; } }, - #[cfg(target_arch = "wasm32")] - AddOpt::File(_) => { - yield UnixfsStatus::FailedStatus { written, total_size: None, error: Some(anyhow::anyhow!("unimplemented")) }; - return; - }, AddOpt::Stream { name, total, stream } => (name, total, stream), }; @@ -287,29 +285,24 @@ impl Stream for UnixfsAdd { yield UnixfsStatus::CompletedStatus { path, written, total_size } }; - self.stream = StatusStreamState::Pending { - stream: stream.boxed(), - }; + self.stream = Some(stream.boxed()); } - StatusStreamState::Pending { stream } => { - match futures::ready!(stream.poll_next_unpin(cx)) { - Some(item) => { - if matches!( - item, - UnixfsStatus::FailedStatus { .. } - | UnixfsStatus::CompletedStatus { .. } - ) { - self.stream = StatusStreamState::Done; - } - return Poll::Ready(Some(item)); - } - None => { - self.stream = StatusStreamState::Done; - return Poll::Ready(None); + Some(stream) => match futures::ready!(stream.poll_next_unpin(cx)) { + Some(item) => { + if matches!( + item, + UnixfsStatus::FailedStatus { .. } + | UnixfsStatus::CompletedStatus { .. } + ) { + self.stream.take(); } + return Poll::Ready(Some(item)); } - } - StatusStreamState::Done => return Poll::Ready(None), + None => { + self.stream.take(); + return Poll::Ready(None); + } + }, } } } @@ -341,6 +334,6 @@ impl std::future::IntoFuture for UnixfsAdd { impl FusedStream for UnixfsAdd { fn is_terminated(&self) -> bool { - matches!(self.stream, StatusStreamState::Done) && self.core.is_none() + self.stream.is_none() && self.core.is_none() } } diff --git a/src/unixfs/cat.rs b/src/unixfs/cat.rs index 4a7e32de9..1b3479e96 100644 --- a/src/unixfs/cat.rs +++ b/src/unixfs/cat.rs @@ -1,5 +1,5 @@ use crate::{dag::IpldDag, repo::Repo, Block, Ipfs}; -use async_stream::stream; +use async_stream::try_stream; use bytes::Bytes; use either::Either; use futures::future::BoxFuture; @@ -9,7 +9,7 @@ use libp2p::PeerId; use rust_unixfs::file::visit::IdleFileVisit; use std::ops::Range; use std::task::Poll; -use std::{borrow::Borrow, time::Duration}; +use std::time::Duration; use tracing::{Instrument, Span}; use super::TraversalFailed; @@ -20,7 +20,7 @@ use super::TraversalFailed; /// be helpful in some contexts, like the http. /// /// Returns a stream of bytes on the file pointed with the Cid. -#[must_use = "do nothing unless you `.await` or poll the stream"] +#[must_use = "does nothing unless you `.await` or poll the stream"] pub struct UnixfsCat { core: Option>, span: Span, @@ -139,20 +139,9 @@ impl Stream for UnixfsCat { return Poll::Ready(None); }; - let (repo, dag, session) = match core { - Either::Left(ipfs) => ( - ipfs.repo().clone(), - ipfs.dag(), - Some( - crate::BITSWAP_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst), - ), - ), - Either::Right(repo) => { - let session = repo.is_online().then(|| { - crate::BITSWAP_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst) - }); - (repo.clone(), IpldDag::from(repo.clone()), session) - } + let (repo, dag) = match core { + Either::Left(ipfs) => (ipfs.repo().clone(), ipfs.dag()), + Either::Right(repo) => (repo.clone(), IpldDag::from(repo.clone())), }; let mut visit = IdleFileVisit::default(); @@ -170,59 +159,46 @@ impl Stream for UnixfsCat { // using async_stream here at least to get on faster; writing custom streams is not too easy // but this might be easy enough to write open. - let stream = stream! { + let stream = try_stream! { // Get the root block to start the traversal. The stream does not expose any of the file // metadata. To get to it the user needs to create a Visitor over the first block. let block = match starting_point { - StartingPoint::Left(path) => match dag - .resolve_with_session(session, path.clone(), true, &providers, local_only, timeout) + StartingPoint::Left(path) => dag + ._resolve(path.clone(), true, &providers, local_only, timeout) .await .map_err(TraversalFailed::Resolving) .and_then(|(resolved, _)| { resolved.into_unixfs_block().map_err(TraversalFailed::Path) - }) { - Ok(block) => block, - Err(e) => { - yield Err(e); - return; - } - }, + })?, StartingPoint::Right(block) => block, }; let mut cache = None; + let mut size = 0; + // Start the visit from the root block. We need to move the both components as Options into the // stream as we can't yet return them from this Future context. - let (visit, bytes) = match visit.start(block.data()) { - Ok((bytes, _, _, visit)) => { - let bytes = if !bytes.is_empty() { - Some(Bytes::copy_from_slice(bytes)) - } else { - None - }; - - (visit, bytes) - } - Err(e) => { - yield Err(TraversalFailed::Walking(*block.cid(), e)); - return; + let (visit, bytes) = visit.start(block.data()).map(|(bytes, _, _, visit)| { + let bytes = (!bytes.is_empty()).then(|| Bytes::copy_from_slice(bytes)); + (visit, bytes) + }).map_err(|e| { + TraversalFailed::Walking(*block.cid(), e) + }).and_then(|(visit, bytes)| { + if let Some(bytes) = &bytes { + size += bytes.len(); + if let Some(length) = length { + if size > length { + return Err::<_, TraversalFailed>(TraversalFailed::MaxLengthExceeded { size, length }); + } + } } - }; + Ok::<_, TraversalFailed>((visit, bytes)) + })?; - let mut size = 0; if let Some(bytes) = bytes { - size += bytes.len(); - if let Some(length) = length { - if size > length { - yield Err(TraversalFailed::MaxLengthExceeded { - size, length - }); - return; - } - } - yield Ok(bytes); + yield bytes; } let mut visit = match visit { @@ -238,42 +214,30 @@ impl Stream for UnixfsCat { // going. Not that we have any "operation" concept of the Want yet. let (next, _) = visit.pending_links(); - let borrow = repo.borrow(); - let block = match borrow.get_block_with_session(session, next, &providers, local_only, timeout).await { - Ok(block) => block, - Err(e) => { - yield Err(TraversalFailed::Loading(*next, e)); - return; - }, - }; - - match visit.continue_walk(block.data(), &mut cache) { - Ok((bytes, next_visit)) => { - size += bytes.len(); - - if let Some(length) = length { - if size > length { - yield Err(TraversalFailed::MaxLengthExceeded { - size, length - }); - return; - } - } + let borrow = &repo; + let block = borrow._get_block(next, &providers, local_only, timeout).await.map_err(|e| TraversalFailed::Loading(*next, e))?; - if !bytes.is_empty() { - yield Ok(Bytes::copy_from_slice(bytes)); - } + let (bytes, next_visit) = visit.continue_walk(block.data(), &mut cache).map_err(|e| TraversalFailed::Walking(*block.cid(), e))?; - match next_visit { - Some(v) => visit = v, - None => return, - } - } - Err(e) => { - yield Err(TraversalFailed::Walking(*block.cid(), e)); + size += bytes.len(); + + if let Some(length) = length { + if size > length { + let fn_err = || Err::<_, TraversalFailed>(TraversalFailed::MaxLengthExceeded { size, length }); + fn_err()?; return; } } + + if !bytes.is_empty() { + yield Bytes::copy_from_slice(bytes); + } + + match next_visit { + Some(v) => visit = v, + None => return, + } + } }.boxed(); diff --git a/src/unixfs/get.rs b/src/unixfs/get.rs index 761dced49..d5ac9b3f9 100644 --- a/src/unixfs/get.rs +++ b/src/unixfs/get.rs @@ -5,6 +5,7 @@ use std::{ }; use either::Either; +use futures::stream::BoxStream; use futures::{future::BoxFuture, stream::FusedStream, FutureExt, Stream, StreamExt}; use libp2p::PeerId; #[allow(unused_imports)] @@ -16,9 +17,9 @@ use tracing::{Instrument, Span}; use crate::{dag::IpldDag, repo::Repo, Ipfs, IpfsPath}; #[allow(unused_imports)] -use super::{StatusStreamState, TraversalFailed, UnixfsStatus}; +use super::{TraversalFailed, UnixfsStatus}; -#[must_use = "do nothing unless you `.await` or poll the stream"] +#[must_use = "does nothing unless you `.await` or poll the stream"] pub struct UnixfsGet { core: Option>, dest: PathBuf, @@ -27,7 +28,7 @@ pub struct UnixfsGet { providers: Vec, local_only: bool, timeout: Option, - stream: StatusStreamState, + stream: Option>, } impl UnixfsGet { @@ -54,7 +55,7 @@ impl UnixfsGet { providers: Vec::new(), local_only: false, timeout: None, - stream: StatusStreamState::None, + stream: None, } } @@ -99,22 +100,10 @@ impl Stream for UnixfsGet { ) -> std::task::Poll> { loop { match &mut self.stream { - StatusStreamState::None => { - let (repo, dag, session) = match self.core.take().expect("ipfs or repo is used") - { - Either::Left(ipfs) => ( - ipfs.repo().clone(), - ipfs.dag(), - Some( - crate::BITSWAP_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst), - ), - ), - Either::Right(repo) => { - let session = repo.is_online().then(|| { - crate::BITSWAP_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst) - }); - (repo.clone(), IpldDag::from(repo.clone()), session) - } + None => { + let (repo, dag) = match self.core.take().expect("ipfs or repo is used") { + Either::Left(ipfs) => (ipfs.repo().clone(), ipfs.dag()), + Either::Right(repo) => (repo.clone(), IpldDag::from(repo.clone())), }; let path = self.path.take().expect("starting point exist"); @@ -126,7 +115,6 @@ impl Stream for UnixfsGet { #[cfg(not(target_arch = "wasm32"))] let stream = async_stream::stream! { - let mut cache = None; let mut total_size = None; let mut written = 0; @@ -142,7 +130,7 @@ impl Stream for UnixfsGet { }; let block = match dag - .resolve_with_session(session, path.clone(), true, &providers, local_only, timeout) + ._resolve(path.clone(), true, &providers, local_only, timeout) .await .map_err(TraversalFailed::Resolving) .and_then(|(resolved, _)| resolved.into_unixfs_block().map_err(TraversalFailed::Path)) { @@ -160,7 +148,7 @@ impl Stream for UnixfsGet { while walker.should_continue() { let (next, _) = walker.pending_links(); - let block = match repo.get_block_with_session(session, next, &providers, local_only, timeout).await { + let block = match repo._get_block(next, &providers, local_only, timeout).await { Ok(block) => block, Err(e) => { yield UnixfsStatus::FailedStatus { written, total_size, error: Some(e) }; @@ -221,7 +209,6 @@ impl Stream for UnixfsGet { let stream = async_stream::stream! { _ = repo; _ = dag; - _ = session; _ = path; _ = providers; _ = local_only; @@ -230,29 +217,24 @@ impl Stream for UnixfsGet { yield UnixfsStatus::FailedStatus { written: 0, total_size: None, error: Some(anyhow::anyhow!("unimplemented")) }; }; - self.stream = StatusStreamState::Pending { - stream: stream.boxed(), - }; + self.stream = Some(stream.boxed()); } - StatusStreamState::Pending { stream } => { - match futures::ready!(stream.poll_next_unpin(cx)) { - Some(item) => { - if matches!( - item, - UnixfsStatus::FailedStatus { .. } - | UnixfsStatus::CompletedStatus { .. } - ) { - self.stream = StatusStreamState::Done; - } - return Poll::Ready(Some(item)); - } - None => { - self.stream = StatusStreamState::Done; - return Poll::Ready(None); + Some(stream) => match futures::ready!(stream.poll_next_unpin(cx)) { + Some(item) => { + if matches!( + item, + UnixfsStatus::FailedStatus { .. } + | UnixfsStatus::CompletedStatus { .. } + ) { + self.stream.take(); } + return Poll::Ready(Some(item)); } - } - StatusStreamState::Done => return Poll::Ready(None), + None => { + self.stream.take(); + return Poll::Ready(None); + } + }, } } } @@ -286,6 +268,6 @@ impl std::future::IntoFuture for UnixfsGet { impl FusedStream for UnixfsGet { fn is_terminated(&self) -> bool { - matches!(self.stream, StatusStreamState::Done) && self.core.is_none() + self.stream.is_none() && self.core.is_none() } } diff --git a/src/unixfs/ls.rs b/src/unixfs/ls.rs index b27822f87..45d1ee859 100644 --- a/src/unixfs/ls.rs +++ b/src/unixfs/ls.rs @@ -21,7 +21,7 @@ pub enum Entry { File { cid: Cid, file: String, size: usize }, } -#[must_use = "do nothing unless you `.await` or poll the stream"] +#[must_use = "does nothing unless you `.await` or poll the stream"] pub struct UnixfsLs { core: Option>, span: Span, @@ -107,20 +107,9 @@ impl Stream for UnixfsLs { return Poll::Ready(None); }; - let (repo, dag, session) = match core { - Either::Left(ipfs) => ( - ipfs.repo().clone(), - ipfs.dag(), - Some( - crate::BITSWAP_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst), - ), - ), - Either::Right(repo) => { - let session = repo.is_online().then(|| { - crate::BITSWAP_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst) - }); - (repo.clone(), IpldDag::from(repo.clone()), session) - } + let (repo, dag) = match core { + Either::Left(ipfs) => (ipfs.repo().clone(), ipfs.dag()), + Either::Right(repo) => (repo.clone(), IpldDag::from(repo.clone())), }; let path = self.path.take().expect("path exist"); @@ -133,7 +122,7 @@ impl Stream for UnixfsLs { let stream = async_stream::stream! { let resolved = match dag - .resolve_with_session(session, path, true, &providers, local_only, timeout) + ._resolve(path, true, &providers, local_only, timeout) .await { Ok((resolved, _)) => resolved, Err(e) => { @@ -158,7 +147,7 @@ impl Stream for UnixfsLs { let mut root_directory = String::new(); while walker.should_continue() { let (next, _) = walker.pending_links(); - let block = match repo.get_block_with_session(session, next, &providers, local_only, timeout).await { + let block = match repo._get_block(next, &providers, local_only, timeout).await { Ok(block) => block, Err(error) => { yield Entry::Error { error }; diff --git a/src/unixfs/mod.rs b/src/unixfs/mod.rs index e6817e8d1..74d148fe0 100644 --- a/src/unixfs/mod.rs +++ b/src/unixfs/mod.rs @@ -3,6 +3,7 @@ //! Adding files and directory structures is supported but not exposed via an API. See examples and //! `ipfs-http`. +#[cfg(not(target_arch = "wasm32"))] use std::path::PathBuf; use anyhow::Error; @@ -34,29 +35,34 @@ pub struct IpfsUnixfs { } pub enum AddOpt { + #[cfg(not(target_arch = "wasm32"))] Path(PathBuf), Stream(BoxStream<'static, std::io::Result>), StreamWithName(String, BoxStream<'static, std::io::Result>), } +#[cfg(not(target_arch = "wasm32"))] impl From<&str> for AddOpt { fn from(value: &str) -> Self { AddOpt::Path(PathBuf::from(value)) } } +#[cfg(not(target_arch = "wasm32"))] impl From for AddOpt { fn from(value: String) -> Self { AddOpt::Path(PathBuf::from(value)) } } +#[cfg(not(target_arch = "wasm32"))] impl From<&std::path::Path> for AddOpt { fn from(path: &std::path::Path) -> Self { AddOpt::Path(path.to_path_buf()) } } +#[cfg(not(target_arch = "wasm32"))] impl From for AddOpt { fn from(path: PathBuf) -> Self { AddOpt::Path(path) @@ -146,6 +152,7 @@ impl IpfsUnixfs { pub fn add>(&self, item: I) -> UnixfsAdd { let item = item.into(); match item { + #[cfg(not(target_arch = "wasm32"))] AddOpt::Path(path) => UnixfsAdd::with_ipfs(&self.ipfs, path), AddOpt::Stream(stream) => UnixfsAdd::with_ipfs( &self.ipfs, @@ -197,14 +204,6 @@ pub enum UnixfsStatus { }, } -pub(crate) enum StatusStreamState { - None, - Pending { - stream: BoxStream<'static, UnixfsStatus>, - }, - Done, -} - /// Types of failures which can occur while walking the UnixFS graph. #[derive(Debug, thiserror::Error)] pub enum TraversalFailed { diff --git a/unixfs/Cargo.toml b/unixfs/Cargo.toml index 4fa73406a..5bd72b6cd 100644 --- a/unixfs/Cargo.toml +++ b/unixfs/Cargo.toml @@ -25,7 +25,7 @@ hex-literal = { default-features = false, version = "0.4" } libc = { default-features = false, version = "0.2" } multibase = { default-features = false, version = "0.9" } tar = { default-features = false, version = "0.4" } -criterion = { default-features = false, version = "0.4" } +criterion = { default-features = false, version = "0.5" } [[bench]] name = "ingest-tar"