Skip to content

Commit

Permalink
commiting because I've managed to crash the compiler
Browse files Browse the repository at this point in the history
  • Loading branch information
fl0rek committed Feb 27, 2024
1 parent 5ed93cd commit b696c25
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 52 deletions.
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ pub enum ToBehaviourEvent<const S: usize> {
#[doc(hidden)]
pub enum ToHandlerEvent {
SendWantlist(ProtoWantlist, Arc<Mutex<SendingState>>),
SendSendlist(Vec<(Vec<u8>, Vec<u8>)>),
QueueOutgoingMessages(Vec<(Vec<u8>, Vec<u8>)>),
}

pub enum StreamRequester {
Expand Down Expand Up @@ -245,8 +245,8 @@ impl<const MAX_MULTIHASH_SIZE: usize> ConnectionHandler for ConnHandler<MAX_MULT
ToHandlerEvent::SendWantlist(wantlist, state) => {
self.client_handler.send_wantlist(wantlist, state);
}
ToHandlerEvent::SendSendlist(data) => {
self.server_handler.send_sendlist(data);
ToHandlerEvent::QueueOutgoingMessages(data) => {
self.server_handler.queue_messages(data);
}
}
}
Expand Down
82 changes: 35 additions & 47 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use libp2p_swarm::{
ConnectionHandlerEvent, NotifyHandler, StreamProtocol, SubstreamProtocol, ToSwarm,
};
use smallvec::SmallVec;
use tracing::{debug, info, trace, warn};
use tracing::{debug, trace, warn};

use crate::incoming_stream::ServerMessage;
use crate::message::Codec;
Expand Down Expand Up @@ -59,11 +59,17 @@ enum BlockstoreResult<const S: usize> {
Cancelled,
}

#[derive(Debug, PartialEq)]
enum WantlistChange<const S: usize> {
Want(CidGeneric<S>),
DontWant(CidGeneric<S>),
}

#[derive(Debug, Default)]
struct PeerWantlist<const S: usize>(FnvHashSet<CidGeneric<S>>);

impl<const S: usize> PeerWantlist<S> {
pub fn process_wantlist(&mut self, wantlist: ProtoWantlist) -> Vec<WishlistChange<S>> {
pub fn process_wantlist(&mut self, wantlist: ProtoWantlist) -> Vec<WantlistChange<S>> {
// XXX quietly drop invalid entries from wantlist, do we care about logging?
if wantlist.full {
let wanted_cids = wantlist
Expand All @@ -89,40 +95,34 @@ impl<const S: usize> PeerWantlist<S> {

if !entry.cancel {
if self.0.insert(cid) {
results.push(WishlistChange::WantCid(cid));
results.push(WantlistChange::Want(cid));
}
} else if self.0.remove(&cid) {
results.push(WishlistChange::DoesntWantCid(cid))
results.push(WantlistChange::DontWant(cid))
}
}

results
}

fn wantlist_replace(&mut self, cids: FnvHashSet<CidGeneric<S>>) -> Vec<WishlistChange<S>> {
let wishlist_delta = cids
fn wantlist_replace(&mut self, cids: FnvHashSet<CidGeneric<S>>) -> Vec<WantlistChange<S>> {
let delta = cids
.difference(&self.0)
.map(|cid| WishlistChange::WantCid(*cid))
.map(|cid| WantlistChange::Want(*cid))
.chain(
self.0
.difference(&cids)
.map(|cid| WishlistChange::DoesntWantCid(*cid)),
.map(|cid| WantlistChange::DontWant(*cid)),
);

let wishlist_delta = wishlist_delta.collect();
let delta = delta.collect();

self.0 = cids;

wishlist_delta
delta
}
}

#[derive(Debug, PartialEq)]
enum WishlistChange<const S: usize> {
Want(CidGeneric<S>),
DontWant(CidGeneric<S>),
}

impl<const S: usize, B> ServerBehaviour<S, B>
where
B: Blockstore + Send + Sync + 'static,
Expand Down Expand Up @@ -195,11 +195,11 @@ where

for change in wantlist_changes {
match change {
WishlistChange::WantCid(cid) => {
WantlistChange::Want(cid) => {
self.schedule_store_get(peer, cid);
self.global_waitlist.entry(cid).or_default().push(peer);
}
WishlistChange::DoesntWantCid(cid) => {
WantlistChange::DontWant(cid) => {
self.cancel_request(peer, cid);
}
}
Expand All @@ -216,7 +216,7 @@ where
ServerConnectionHandler {
protocol: self.protocol.clone(),
sink: Default::default(),
sendlist: None,
pending_outgoing_messages: None,
}
}

Expand Down Expand Up @@ -252,7 +252,7 @@ where
self.outgoing_event_queue.push_back(ToSwarm::NotifyHandler {
peer_id: peer,
handler: NotifyHandler::Any,
event: ToHandlerEvent::SendSendlist(data),
event: ToHandlerEvent::QueueOutgoingMessages(data),
})
}

Expand Down Expand Up @@ -297,7 +297,7 @@ where
pub(crate) struct ServerConnectionHandler<const S: usize> {
protocol: StreamProtocol,
sink: SinkState,
sendlist: Option<Vec<ProtoBlock>>,
pending_outgoing_messages: Option<Vec<ProtoBlock>>,
}

impl<const S: usize> fmt::Debug for ServerConnectionHandler<S> {
Expand All @@ -316,35 +316,26 @@ enum SinkState {

impl<const S: usize> ServerConnectionHandler<S> {
pub(crate) fn set_stream(&mut self, stream: libp2p_swarm::Stream) {
info!("got set stream");

// Convert `AsyncWrite` stream to `Sink`
self.sink = SinkState::Ready(FramedWrite::new(stream, Codec));
}

pub(crate) fn send_sendlist(&mut self, sendlist: Vec<(Vec<u8>, Vec<u8>)>) {
let block_list = sendlist
pub(crate) fn queue_messages(&mut self, messages: Vec<(Vec<u8>, Vec<u8>)>) {
let block_list = messages
.into_iter()
.map(|(prefix, data)| ProtoBlock { prefix, data })
.collect::<Vec<_>>();

self.sendlist
self.pending_outgoing_messages
.get_or_insert(Vec::with_capacity(block_list.len()))
.extend(block_list);

info!(
"updated sendlist len: {:?}",
self.sendlist.as_ref().map(|s| s.len())
);
}

fn open_new_substream(
&mut self,
) -> Poll<
ConnectionHandlerEvent<ReadyUpgrade<StreamProtocol>, StreamRequester, ToBehaviourEvent<S>>,
> {
info!("requesting new substream");

self.sink = SinkState::Requested;

Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
Expand All @@ -362,7 +353,7 @@ impl<const S: usize> ServerConnectionHandler<S> {
ConnectionHandlerEvent<ReadyUpgrade<StreamProtocol>, StreamRequester, ToBehaviourEvent<S>>,
> {
loop {
match (&mut self.sendlist, &mut self.sink) {
match (&mut self.pending_outgoing_messages, &mut self.sink) {
(_, SinkState::Requested) => return Poll::Pending,
(None, SinkState::None) => return Poll::Pending,
(None, SinkState::Ready(sink)) => {
Expand All @@ -372,12 +363,10 @@ impl<const S: usize> ServerConnectionHandler<S> {
return Poll::Pending;
}
(Some(_), SinkState::None) => return self.open_new_substream(),
(sendlist @ Some(_), SinkState::Ready(sink)) => {
let sendlist = sendlist.take().expect("sendlist can't be None here");
let blocks = sendlist.len();

(pending_messages @ Some(_), SinkState::Ready(sink)) => {
let messages = pending_messages.take().expect("pending_messages can't be None here");
let message = Message {
payload: sendlist,
payload: messages,
..Message::default()
};

Expand All @@ -386,7 +375,6 @@ impl<const S: usize> ServerConnectionHandler<S> {
continue;
}

info!("start_send: {} blocks", blocks);
if sink.start_send_unpin(&message).is_err() {
self.close_sink("start_send_unpin");
continue;
Expand Down Expand Up @@ -420,7 +408,7 @@ mod tests {
use multihash::Multihash;

#[test]
fn wishlist_replace() {
fn wantlist_replace() {
let initial_cids =
(0..512_i32).map(|v| Cid::new_v1(24, Multihash::wrap(42, &v.to_le_bytes()).unwrap()));
let replacing_cids = (513..1024_i32)
Expand All @@ -430,7 +418,7 @@ mod tests {
let initial_events = wantlist.wantlist_replace(initial_cids.clone().collect());
assert_eq!(initial_cids.len(), initial_events.len());
for cid in initial_cids.clone() {
assert!(initial_events.contains(&WishlistChange::WantCid(cid)));
assert!(initial_events.contains(&WantlistChange::Want(cid)));
}

let replacing_events = wantlist.wantlist_replace(replacing_cids.clone().collect());
Expand All @@ -439,15 +427,15 @@ mod tests {
initial_cids.len() + replacing_cids.len()
);
for cid in replacing_cids {
assert!(replacing_events.contains(&WishlistChange::WantCid(cid)));
assert!(replacing_events.contains(&WantlistChange::Want(cid)));
}
for cid in initial_cids {
assert!(replacing_events.contains(&WishlistChange::DoesntWantCid(cid)));
assert!(replacing_events.contains(&WantlistChange::DontWant(cid)));
}
}

#[test]
fn wishlist_replace_overlaping() {
fn wantlist_replace_overlaping() {
let initial_cids = (0..600_i32)
.map(|v| Cid::new_v1(24, Multihash::wrap(42, &v.to_le_bytes()).unwrap()))
.collect();
Expand All @@ -467,10 +455,10 @@ mod tests {
.collect();
assert_eq!(events.len(), added_cids.len() + removed_cids.len());
for cid in added_cids {
assert!(events.contains(&WishlistChange::WantCid(cid)));
assert!(events.contains(&WantlistChange::Want(cid)));
}
for cid in removed_cids {
assert!(events.contains(&WishlistChange::DoesntWantCid(cid)));
assert!(events.contains(&WantlistChange::DontWant(cid)));
}
}

Expand Down
10 changes: 8 additions & 2 deletions tests/bitswap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ async fn test_client_request() {
let received0 = client.request_cid(data_with_cid[0].0);

drop(client.connect(&server));
let received1 = client.request_cid(data_with_cid[1].0).await.expect("could not get CID");
let received1 = client
.request_cid(data_with_cid[1].0)
.await
.expect("could not get CID");
let received0 = received0.await.expect("could not get CID");

assert_eq!(&received0[..], data_with_cid[0].1.as_bytes());
Expand All @@ -40,7 +43,10 @@ async fn test_server_request() {
let received0 = client.request_cid(data_with_cid[0].0);

drop(client.connect(&server));
let received1 = server.request_cid(data_with_cid[1].0).await.expect("could not get CID");
let received1 = server
.request_cid(data_with_cid[1].0)
.await
.expect("could not get CID");
let received0 = received0.await.expect("Could not get CID");

assert_eq!(&received0[..], data_with_cid[0].1.as_bytes());
Expand Down

0 comments on commit b696c25

Please sign in to comment.