Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Send full wantlist every 30 seconds #7

Merged
merged 2 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ bytes = "1.5.0"
cid = "0.11.0"
fnv = "1.0.7"
futures = "0.3.30"
futures-timer = "3.0.2"
libp2p = "0.53.2"
multihash = "0.19.1"
multihash-codetable = "0.1.1"
Expand All @@ -25,3 +26,6 @@ void = "1.0.2"
hex = "0.4.3"
multihash-codetable = { version = "0.1.1", features = ["digest", "sha2"] }
tokio = { version = "1.35.1", features = ["rt", "macros", "time"] }

[features]
wasm-bindgen = ["futures-timer/wasm-bindgen"]
81 changes: 47 additions & 34 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::VecDeque;
use std::fmt;
use std::sync::Arc;
use std::task::{ready, Context, Poll};
use std::time::{Duration, Instant};
use std::time::Duration;

use asynchronous_codec::FramedWrite;
use blockstore::{Blockstore, BlockstoreError};
Expand All @@ -12,6 +12,7 @@ use futures::future::{AbortHandle, Abortable, BoxFuture};
use futures::stream::FuturesUnordered;
use futures::task::AtomicWaker;
use futures::{FutureExt, SinkExt, StreamExt};
use futures_timer::Delay;
use libp2p::swarm::NotifyHandler;
use libp2p::PeerId;
use libp2p::{
Expand Down Expand Up @@ -75,13 +76,14 @@ where
next_query_id: u64,
waker: Arc<AtomicWaker>,
multihasher: Arc<MultihasherTable<S>>,
send_full_timer: Delay,
}

#[derive(Debug)]
struct PeerState<const S: usize> {
sending: Arc<Mutex<SendingState>>,
wantlist: WantlistState<S>,
last_send_full_tm: Option<Instant>,
send_full: bool,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -117,6 +119,7 @@ where
next_query_id: 0,
waker: Arc::new(AtomicWaker::new()),
multihasher,
send_full_timer: Delay::new(SEND_FULL_INTERVAL),
})
}

Expand All @@ -126,7 +129,7 @@ where
PeerState {
sending: Arc::new(Mutex::new(SendingState::Ready)),
wantlist: WantlistState::new(),
last_send_full_tm: None,
send_full: true,
},
);

Expand Down Expand Up @@ -294,12 +297,7 @@ where
continue;
}
}
SendingState::Ready => match state.last_send_full_tm {
// Send full list if interval time is elapsed.
Some(tm) => tm.elapsed() >= SEND_FULL_INTERVAL,
// Send full list the first time.
None => true,
},
SendingState::Ready => state.send_full,
// State is poisoned, send full list to recover.
SendingState::Poisoned => true,
};
Expand All @@ -310,18 +308,15 @@ where
state.wantlist.generate_proto_update(&self.wantlist)
};

if wantlist.entries.is_empty() {
// Nothing to send
//
// TODO: What if the send_full is true? Shouldn't we send it to clear
// the wantlist? However we should do it once.
// Allow empty entries to be sent when send_full flag is set.
if send_full {
// Reset flag
state.send_full = false;
} else if wantlist.entries.is_empty() {
// No updates to be sent for this peer
continue;
}

if wantlist.full {
state.last_send_full_tm = Some(Instant::now());
}

self.queue.push_back(ToSwarm::NotifyHandler {
peer_id: peer.to_owned(),
handler: NotifyHandler::Any,
Expand All @@ -345,6 +340,16 @@ where
return Poll::Ready(ev);
}

if self.send_full_timer.poll_unpin(cx).is_ready() {
for state in self.peers.values_mut() {
state.send_full = true;
}

// Reset timer and loop again to get it registered
self.send_full_timer.reset(SEND_FULL_INTERVAL);
continue;
}

if let Poll::Ready(Some(task_result)) = self.tasks.poll_next_unpin(cx) {
match task_result {
// Blockstore already has the data so return them to the user
Expand Down Expand Up @@ -686,28 +691,36 @@ mod tests {

// Simulate that full wantlist is needed
for peer_state in client.peers.values_mut() {
*peer_state.last_send_full_tm.as_mut().unwrap() -= SEND_FULL_INTERVAL;
peer_state.send_full = true;
}

let ev = poll_fn(|cx| client.poll(cx)).await;
let (peer_id, wantlist, send_state) = expect_send_wantlist_event(ev);

// wantlist should be generated only for peer2, because peer1 already replied with DontHave
assert_eq!(peer_id, peer2);
assert_eq!(wantlist.entries.len(), 1);
assert!(wantlist.full);
for _ in 0..2 {
let ev = poll_fn(|cx| client.poll(cx)).await;
let (peer_id, wantlist, send_state) = expect_send_wantlist_event(ev);

let entry = &wantlist.entries[0];
assert_eq!(entry.block, cid1.to_bytes());
assert!(!entry.cancel);
assert_eq!(entry.wantType, WantType::Have);
assert!(entry.sendDontHave);
if peer_id == peer1 {
// full wantlist of peer1 will be empty because it alreayd replied with DontHave
assert!(wantlist.entries.is_empty());
assert!(wantlist.full);
} else if peer_id == peer2 {
assert_eq!(wantlist.entries.len(), 1);
assert!(wantlist.full);

let entry = &wantlist.entries[0];
assert_eq!(entry.block, cid1.to_bytes());
assert!(!entry.cancel);
assert_eq!(entry.wantType, WantType::Have);
assert!(entry.sendDontHave);
} else {
panic!("Unknown peer id");
}

// Mark send state as ready
*send_state.lock().unwrap() = SendingState::Ready;
// Mark send state as ready
*send_state.lock().unwrap() = SendingState::Ready;
}

// No other events should be produced
assert!(poll_fn_once(|cx| client.poll(cx)).await.is_none());
assert!(dbg!(poll_fn_once(|cx| client.poll(cx)).await).is_none());
}

#[tokio::test]
Expand Down