Skip to content

Commit

Permalink
refactor: Remove sessions and redundant code (#255)
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 authored Jul 4, 2024
1 parent 6102dae commit 58671a4
Show file tree
Hide file tree
Showing 15 changed files with 144 additions and 278 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- refactor: Remove redundant async signature. [PR 247](https://github.com/dariusc93/rust-ipfs/pull/247)
- refactor: Add `Serialize` for {Ipfs, IpldDag)::put_dag. [PR 249](https://github.com/dariusc93/rust-ipfs/pull/249)
- feat: Add `Ipfs::{dhy_get_providers,dht_provide)`, supporting `RecordKey` directly. [PR 250](https://github.com/dariusc93/rust-ipfs/pull/250)
- refactor: Remove sessions and redundant code. [PR 255](https://github.com/dariusc93/rust-ipfs/pull/255)

# 0.11.20
- feat: Add Ipfs::{add,remove}_external_address.
Expand Down
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ tokio-util = { default-features = false, version = "0.7" }
unsigned-varint = { version = "0.7.1", features = ["asynchronous_codec"] }
void = { default-features = false, version = "1" }
wasm-bindgen-futures = { version = "0.4" }
wasm-timer = "0.2"
web-time = "1.1.0"
zeroize = "1"

Expand Down Expand Up @@ -125,7 +124,6 @@ tracing = { default-features = false, features = ["log"], workspace = true }
tracing-futures = { workspace = true }
unsigned-varint.workspace = true
void = { default-features = false, workspace = true }
wasm-timer.workspace = true
web-time.workspace = true
zeroize.workspace = true

Expand Down
5 changes: 1 addition & 4 deletions examples/dag_creation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();

// Initialize the repo and start a daemon
let ipfs: Ipfs = UninitializedIpfs::new()
.add_listening_addr("/ip4/0.0.0.0/tcp/0".parse()?)
.start()
.await?;
let ipfs: Ipfs = UninitializedIpfs::new().start().await?;

// Create a DAG
let cid1 = ipfs.put_dag(ipld!("block1")).await?;
Expand Down
42 changes: 7 additions & 35 deletions src/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,8 @@ impl IpldDag {
DagGet::new(self.clone())
}

pub(crate) async fn get_with_session(
pub(crate) async fn _get(
&self,
session: Option<u64>,
path: IpfsPath,
providers: &[PeerId],
local_only: bool,
Expand All @@ -256,9 +255,7 @@ impl IpldDag {
let mut iter = resolved_path.iter().peekable();

let (node, _) = match self
.resolve0(
session, cid, &mut iter, true, providers, local_only, timeout,
)
.resolve0(cid, &mut iter, true, providers, local_only, timeout)
.await
{
Ok(t) => t,
Expand Down Expand Up @@ -289,13 +286,12 @@ impl IpldDag {
providers: &[PeerId],
local_only: bool,
) -> Result<(ResolvedNode, SlashedPath), ResolveError> {
self.resolve_with_session(None, path, follow_links, providers, local_only, None)
self._resolve(path, follow_links, providers, local_only, None)
.await
}

pub(crate) async fn resolve_with_session(
pub(crate) async fn _resolve(
&self,
session: Option<u64>,
path: IpfsPath,
follow_links: bool,
providers: &[PeerId],
Expand Down Expand Up @@ -323,15 +319,7 @@ impl IpldDag {
let (node, matched_segments) = {
let mut iter = resolved_path.iter().peekable();
match self
.resolve0(
session,
cid,
&mut iter,
follow_links,
providers,
local_only,
timeout,
)
.resolve0(cid, &mut iter, follow_links, providers, local_only, timeout)
.await
{
Ok(t) => t,
Expand All @@ -354,7 +342,6 @@ impl IpldDag {
#[allow(clippy::too_many_arguments)]
async fn resolve0<'a>(
&self,
session: Option<u64>,
cid: &Cid,
segments: &mut Peekable<impl Iterator<Item = &'a str>>,
follow_links: bool,
Expand All @@ -372,7 +359,7 @@ impl IpldDag {
loop {
let block = match self
.repo
.get_block_with_session(session, &current, providers, local_only, timeout)
._get_block(&current, providers, local_only, timeout)
.await
{
Ok(block) => block,
Expand Down Expand Up @@ -443,7 +430,6 @@ impl IpldDag {
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct DagGet {
dag_ipld: IpldDag,
session: Option<u64>,
path: Option<IpfsPath>,
providers: Vec<PeerId>,
local: bool,
Expand All @@ -455,7 +441,6 @@ impl DagGet {
pub fn new(dag: IpldDag) -> Self {
Self {
dag_ipld: dag,
session: None,
path: None,
providers: vec![],
local: false,
Expand All @@ -464,13 +449,6 @@ impl DagGet {
}
}

/// Bitswap session
#[allow(dead_code)]
pub(crate) fn session(mut self, session: u64) -> Self {
self.session = Some(session);
self
}

/// Path to object
pub fn path<P: Into<IpfsPath>>(mut self, path: P) -> Self {
let path = path.into();
Expand Down Expand Up @@ -535,13 +513,7 @@ impl std::future::IntoFuture for DagGet {
async move {
let path = self.path.ok_or(ResolveError::PathNotProvided)?;
self.dag_ipld
.get_with_session(
self.session,
path,
&self.providers,
self.local,
self.timeout,
)
._get(path, &self.providers, self.local, self.timeout)
.await
}
.instrument(span)
Expand Down
4 changes: 1 addition & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ use std::{
fmt,
ops::{Deref, DerefMut},
path::Path,
sync::{atomic::AtomicU64, Arc},
sync::Arc,
time::Duration,
};

Expand Down Expand Up @@ -126,8 +126,6 @@ use libp2p::{
pub use libp2p_connection_limits::ConnectionLimits;
use serde::Serialize;

pub(crate) static BITSWAP_ID: AtomicU64 = AtomicU64::new(1);

#[allow(dead_code)]
#[deprecated(note = "Use `StoreageType` instead")]
type StoragePath = StorageType;
Expand Down
2 changes: 1 addition & 1 deletion src/refs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ where
let borrowed = repo.borrow();

let block = if download_blocks {
match borrowed.get_block_with_session(None, &cid, &providers, !download_blocks, timeout).await {
match borrowed._get_block(&cid, &providers, !download_blocks, timeout).await {
Ok(block) => block,
Err(e) => {
warn!("failed to load {}, linked from {}: {}", cid, source, e);
Expand Down
32 changes: 9 additions & 23 deletions src/repo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ pub(crate) struct RepoInner {
#[derive(Debug)]
pub enum RepoEvent {
/// Signals a desired block.
WantBlock(Option<u64>, Vec<Cid>, Vec<PeerId>),
WantBlock(Vec<Cid>, Vec<PeerId>),
/// Signals a desired block is no longer wanted.
UnwantBlock(Cid),
/// Signals the posession of a new block.
Expand Down Expand Up @@ -616,8 +616,7 @@ impl Repo {
peers: &[PeerId],
local_only: bool,
) -> Result<Block, Error> {
self.get_block_with_session(None, cid, peers, local_only, None)
.await
self._get_block(cid, peers, local_only, None).await
}

/// Retrives a set of blocks from the block store, or starts fetching them from the network and awaits
Expand All @@ -629,8 +628,7 @@ impl Repo {
peers: &[PeerId],
local_only: bool,
) -> Result<BoxStream<'static, Result<Block, Error>>, Error> {
self.get_blocks_with_session(None, cids, peers, local_only, None)
.await
self._get_blocks(cids, peers, local_only, None).await
}

/// Get the size of listed blocks
Expand All @@ -645,9 +643,8 @@ impl Repo {
self.inner.block_store.total_size().await
}

pub(crate) async fn get_blocks_with_session(
pub(crate) async fn _get_blocks(
&self,
session: impl Into<Option<u64>>,
cids: &[Cid],
peers: &[PeerId],
local_only: bool,
Expand Down Expand Up @@ -713,29 +710,22 @@ impl Repo {
}

events
.send(RepoEvent::WantBlock(
session.into(),
missing,
peers.to_vec(),
))
.send(RepoEvent::WantBlock(missing, peers.to_vec()))
.await
.ok();

Ok(blocks.boxed())
}

pub(crate) async fn get_block_with_session(
pub(crate) async fn _get_block(
&self,
session: impl Into<Option<u64>>,
cid: &Cid,
peers: &[PeerId],
local_only: bool,
timeout: impl Into<Option<Duration>>,
) -> Result<Block, Error> {
let cids = vec![*cid];
let mut blocks = self
.get_blocks_with_session(session, &cids, peers, local_only, timeout)
.await?;
let mut blocks = self._get_blocks(&cids, peers, local_only, timeout).await?;

blocks
.next()
Expand Down Expand Up @@ -1124,9 +1114,7 @@ impl std::future::IntoFuture for RepoFetch {
async move {
// Although getting a block adds a guard, we will add a read guard here a head of time so we can hold it throughout this future
let _g = repo.inner.gclock.read().await;
let block = repo
.get_block_with_session(None, &cid, &providers, false, timeout)
.await?;
let block = repo._get_block(&cid, &providers, false, timeout).await?;

if !recursive {
return Ok(());
Expand Down Expand Up @@ -1255,9 +1243,7 @@ impl std::future::IntoFuture for RepoInsertPin {
async move {
// Although getting a block adds a guard, we will add a read guard here a head of time so we can hold it throughout this future
let _g = repo.inner.gclock.read().await;
let block = repo
.get_block_with_session(None, &cid, &providers, local, timeout)
.await?;
let block = repo._get_block(&cid, &providers, local, timeout).await?;

if !recursive {
repo.insert_direct_pin(&cid).await?
Expand Down
24 changes: 6 additions & 18 deletions src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@ use futures::{
use crate::TSwarmEvent;
use crate::{p2p::MultiaddrExt, Channel, InnerPubsubEvent};

use wasm_timer::Interval;

use std::{
collections::{hash_map::Entry, HashMap, HashSet},
time::Duration,
};

use futures_timer::Delay;
use std::pin::Pin;
use std::task::{Context, Poll};

Expand Down Expand Up @@ -60,15 +59,12 @@ pub(crate) struct IpfsTask<C: NetworkBehaviour<ToSwarm = void::Void>> {
pub(crate) from_facade: Fuse<Receiver<IpfsEvent>>,
pub(crate) listening_addresses: HashMap<ListenerId, Vec<Multiaddr>>,
pub(crate) provider_stream: HashMap<QueryId, UnboundedSender<PeerId>>,
pub(crate) bitswap_provider_stream:
HashMap<QueryId, futures::channel::mpsc::Sender<Result<HashSet<PeerId>, String>>>,
pub(crate) record_stream: HashMap<QueryId, UnboundedSender<Record>>,
pub(crate) repo: Repo,
pub(crate) kad_subscriptions: HashMap<QueryId, Channel<KadResult>>,
pub(crate) dht_peer_lookup: HashMap<PeerId, Vec<Channel<libp2p::identify::Info>>>,
pub(crate) bootstraps: HashSet<Multiaddr>,
pub(crate) swarm_event: Option<TSwarmEventFn<C>>,
pub(crate) bitswap_sessions: HashMap<i64, libipld::Cid>,
pub(crate) pubsub_event_stream: Vec<UnboundedSender<InnerPubsubEvent>>,
pub(crate) timer: TaskTimer,
pub(crate) local_external_addr: bool,
Expand Down Expand Up @@ -96,10 +92,8 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
from_facade,
swarm,
provider_stream: HashMap::new(),
bitswap_provider_stream: Default::default(),
record_stream: HashMap::new(),
dht_peer_lookup: Default::default(),
bitswap_sessions: Default::default(),
pubsub_event_stream: Default::default(),
kad_subscriptions: Default::default(),
repo: repo.clone(),
Expand All @@ -121,12 +115,12 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
}

pub(crate) struct TaskTimer {
pub(crate) event_cleanup: Interval,
pub(crate) event_cleanup: Delay,
}

impl Default for TaskTimer {
fn default() -> Self {
let event_cleanup = Interval::new(Duration::from_secs(60));
let event_cleanup = Delay::new(Duration::from_secs(60));

Self { event_cleanup }
}
Expand Down Expand Up @@ -158,8 +152,9 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void>> futures::Future for IpfsTask<C>
}
}

if self.timer.event_cleanup.poll_next_unpin(cx).is_ready() {
if self.timer.event_cleanup.poll_unpin(cx).is_ready() {
self.pubsub_event_stream.retain(|ch| !ch.is_closed());
self.timer.event_cleanup.reset(Duration::from_secs(60));
}

Poll::Pending
Expand All @@ -168,7 +163,6 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void>> futures::Future for IpfsTask<C>

impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
pub(crate) async fn run(&mut self) {
let mut session_cleanup = futures_timer::Delay::new(Duration::from_secs(5 * 60));
let mut event_cleanup = futures_timer::Delay::new(Duration::from_secs(60));

loop {
Expand All @@ -190,9 +184,6 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
self.pubsub_event_stream.retain(|ch| !ch.is_closed());
event_cleanup.reset(Duration::from_secs(60));
}
_ = &mut session_cleanup => {
session_cleanup.reset(Duration::from_secs(5 * 60));
}
}
}
}
Expand Down Expand Up @@ -435,9 +426,6 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
if let Some(tx) = self.provider_stream.remove(&id) {
tx.close_channel();
}
if let Some(tx) = self.bitswap_provider_stream.remove(&id) {
drop(tx);
}
}
}
GetProviders(Err(GetProvidersError::Timeout { key, .. })) => {
Expand Down Expand Up @@ -1553,7 +1541,7 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {

fn handle_repo_event(&mut self, event: RepoEvent) {
match event {
RepoEvent::WantBlock(_, cids, peers) => {
RepoEvent::WantBlock(cids, peers) => {
let Some(bs) = self.swarm.behaviour_mut().bitswap.as_mut() else {
return;
};
Expand Down
Loading

0 comments on commit 58671a4

Please sign in to comment.