diff --git a/src/lib.rs b/src/lib.rs index 6fbba23..ca3087f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -209,7 +209,7 @@ pub enum ToBehaviourEvent { #[doc(hidden)] pub enum ToHandlerEvent { SendWantlist(ProtoWantlist, Arc>), - SendSendlist(Vec<(Vec, Vec)>), + QueueOutgoingMessages(Vec<(Vec, Vec)>), } pub enum StreamRequester { @@ -245,8 +245,8 @@ impl ConnectionHandler for ConnHandler { self.client_handler.send_wantlist(wantlist, state); } - ToHandlerEvent::SendSendlist(data) => { - self.server_handler.send_sendlist(data); + ToHandlerEvent::QueueOutgoingMessages(data) => { + self.server_handler.queue_messages(data); } } } diff --git a/src/server.rs b/src/server.rs index 4fbe3f9..5f12250 100644 --- a/src/server.rs +++ b/src/server.rs @@ -18,7 +18,7 @@ use libp2p_swarm::{ ConnectionHandlerEvent, NotifyHandler, StreamProtocol, SubstreamProtocol, ToSwarm, }; use smallvec::SmallVec; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, trace, warn}; use crate::incoming_stream::ServerMessage; use crate::message::Codec; @@ -59,11 +59,17 @@ enum BlockstoreResult { Cancelled, } +#[derive(Debug, PartialEq)] +enum WantlistChange { + Want(CidGeneric), + DontWant(CidGeneric), +} + #[derive(Debug, Default)] struct PeerWantlist(FnvHashSet>); impl PeerWantlist { - pub fn process_wantlist(&mut self, wantlist: ProtoWantlist) -> Vec> { + pub fn process_wantlist(&mut self, wantlist: ProtoWantlist) -> Vec> { // XXX quietly drop invalid entries from wantlist, do we care about logging? if wantlist.full { let wanted_cids = wantlist @@ -89,40 +95,34 @@ impl PeerWantlist { if !entry.cancel { if self.0.insert(cid) { - results.push(WishlistChange::WantCid(cid)); + results.push(WantlistChange::Want(cid)); } } else if self.0.remove(&cid) { - results.push(WishlistChange::DoesntWantCid(cid)) + results.push(WantlistChange::DontWant(cid)) } } results } - fn wantlist_replace(&mut self, cids: FnvHashSet>) -> Vec> { - let wishlist_delta = cids + fn wantlist_replace(&mut self, cids: FnvHashSet>) -> Vec> { + let delta = cids .difference(&self.0) - .map(|cid| WishlistChange::WantCid(*cid)) + .map(|cid| WantlistChange::Want(*cid)) .chain( self.0 .difference(&cids) - .map(|cid| WishlistChange::DoesntWantCid(*cid)), + .map(|cid| WantlistChange::DontWant(*cid)), ); - let wishlist_delta = wishlist_delta.collect(); + let delta = delta.collect(); self.0 = cids; - wishlist_delta + delta } } -#[derive(Debug, PartialEq)] -enum WishlistChange { - Want(CidGeneric), - DontWant(CidGeneric), -} - impl ServerBehaviour where B: Blockstore + Send + Sync + 'static, @@ -195,11 +195,11 @@ where for change in wantlist_changes { match change { - WishlistChange::WantCid(cid) => { + WantlistChange::Want(cid) => { self.schedule_store_get(peer, cid); self.global_waitlist.entry(cid).or_default().push(peer); } - WishlistChange::DoesntWantCid(cid) => { + WantlistChange::DontWant(cid) => { self.cancel_request(peer, cid); } } @@ -216,7 +216,7 @@ where ServerConnectionHandler { protocol: self.protocol.clone(), sink: Default::default(), - sendlist: None, + pending_outgoing_messages: None, } } @@ -252,7 +252,7 @@ where self.outgoing_event_queue.push_back(ToSwarm::NotifyHandler { peer_id: peer, handler: NotifyHandler::Any, - event: ToHandlerEvent::SendSendlist(data), + event: ToHandlerEvent::QueueOutgoingMessages(data), }) } @@ -297,7 +297,7 @@ where pub(crate) struct ServerConnectionHandler { protocol: StreamProtocol, sink: SinkState, - sendlist: Option>, + pending_outgoing_messages: Option>, } impl fmt::Debug for ServerConnectionHandler { @@ -316,26 +316,19 @@ enum SinkState { impl ServerConnectionHandler { pub(crate) fn set_stream(&mut self, stream: libp2p_swarm::Stream) { - info!("got set stream"); - // Convert `AsyncWrite` stream to `Sink` self.sink = SinkState::Ready(FramedWrite::new(stream, Codec)); } - pub(crate) fn send_sendlist(&mut self, sendlist: Vec<(Vec, Vec)>) { - let block_list = sendlist + pub(crate) fn queue_messages(&mut self, messages: Vec<(Vec, Vec)>) { + let block_list = messages .into_iter() .map(|(prefix, data)| ProtoBlock { prefix, data }) .collect::>(); - self.sendlist + self.pending_outgoing_messages .get_or_insert(Vec::with_capacity(block_list.len())) .extend(block_list); - - info!( - "updated sendlist len: {:?}", - self.sendlist.as_ref().map(|s| s.len()) - ); } fn open_new_substream( @@ -343,8 +336,6 @@ impl ServerConnectionHandler { ) -> Poll< ConnectionHandlerEvent, StreamRequester, ToBehaviourEvent>, > { - info!("requesting new substream"); - self.sink = SinkState::Requested; Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { @@ -362,7 +353,7 @@ impl ServerConnectionHandler { ConnectionHandlerEvent, StreamRequester, ToBehaviourEvent>, > { loop { - match (&mut self.sendlist, &mut self.sink) { + match (&mut self.pending_outgoing_messages, &mut self.sink) { (_, SinkState::Requested) => return Poll::Pending, (None, SinkState::None) => return Poll::Pending, (None, SinkState::Ready(sink)) => { @@ -372,12 +363,10 @@ impl ServerConnectionHandler { return Poll::Pending; } (Some(_), SinkState::None) => return self.open_new_substream(), - (sendlist @ Some(_), SinkState::Ready(sink)) => { - let sendlist = sendlist.take().expect("sendlist can't be None here"); - let blocks = sendlist.len(); - + (pending_messages @ Some(_), SinkState::Ready(sink)) => { + let messages = pending_messages.take().expect("pending_messages can't be None here"); let message = Message { - payload: sendlist, + payload: messages, ..Message::default() }; @@ -386,7 +375,6 @@ impl ServerConnectionHandler { continue; } - info!("start_send: {} blocks", blocks); if sink.start_send_unpin(&message).is_err() { self.close_sink("start_send_unpin"); continue; @@ -420,7 +408,7 @@ mod tests { use multihash::Multihash; #[test] - fn wishlist_replace() { + fn wantlist_replace() { let initial_cids = (0..512_i32).map(|v| Cid::new_v1(24, Multihash::wrap(42, &v.to_le_bytes()).unwrap())); let replacing_cids = (513..1024_i32) @@ -430,7 +418,7 @@ mod tests { let initial_events = wantlist.wantlist_replace(initial_cids.clone().collect()); assert_eq!(initial_cids.len(), initial_events.len()); for cid in initial_cids.clone() { - assert!(initial_events.contains(&WishlistChange::WantCid(cid))); + assert!(initial_events.contains(&WantlistChange::Want(cid))); } let replacing_events = wantlist.wantlist_replace(replacing_cids.clone().collect()); @@ -439,15 +427,15 @@ mod tests { initial_cids.len() + replacing_cids.len() ); for cid in replacing_cids { - assert!(replacing_events.contains(&WishlistChange::WantCid(cid))); + assert!(replacing_events.contains(&WantlistChange::Want(cid))); } for cid in initial_cids { - assert!(replacing_events.contains(&WishlistChange::DoesntWantCid(cid))); + assert!(replacing_events.contains(&WantlistChange::DontWant(cid))); } } #[test] - fn wishlist_replace_overlaping() { + fn wantlist_replace_overlaping() { let initial_cids = (0..600_i32) .map(|v| Cid::new_v1(24, Multihash::wrap(42, &v.to_le_bytes()).unwrap())) .collect(); @@ -467,10 +455,10 @@ mod tests { .collect(); assert_eq!(events.len(), added_cids.len() + removed_cids.len()); for cid in added_cids { - assert!(events.contains(&WishlistChange::WantCid(cid))); + assert!(events.contains(&WantlistChange::Want(cid))); } for cid in removed_cids { - assert!(events.contains(&WishlistChange::DoesntWantCid(cid))); + assert!(events.contains(&WantlistChange::DontWant(cid))); } } diff --git a/tests/bitswap.rs b/tests/bitswap.rs index 1ad8479..e90119f 100644 --- a/tests/bitswap.rs +++ b/tests/bitswap.rs @@ -19,7 +19,10 @@ async fn test_client_request() { let received0 = client.request_cid(data_with_cid[0].0); drop(client.connect(&server)); - let received1 = client.request_cid(data_with_cid[1].0).await.expect("could not get CID"); + let received1 = client + .request_cid(data_with_cid[1].0) + .await + .expect("could not get CID"); let received0 = received0.await.expect("could not get CID"); assert_eq!(&received0[..], data_with_cid[0].1.as_bytes()); @@ -40,7 +43,10 @@ async fn test_server_request() { let received0 = client.request_cid(data_with_cid[0].0); drop(client.connect(&server)); - let received1 = server.request_cid(data_with_cid[1].0).await.expect("could not get CID"); + let received1 = server + .request_cid(data_with_cid[1].0) + .await + .expect("could not get CID"); let received0 = received0.await.expect("Could not get CID"); assert_eq!(&received0[..], data_with_cid[0].1.as_bytes());