Skip to content

Commit

Permalink
Arc<PeerId>
Browse files Browse the repository at this point in the history
  • Loading branch information
fl0rek committed Feb 28, 2024
1 parent f35f420 commit 1c452fe
Showing 1 changed file with 12 additions and 11 deletions.
23 changes: 12 additions & 11 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ where
/// list of CIDs each connected peer is waiting for
peers_wantlists: FnvHashMap<PeerId, PeerWantlist<S>>,
/// list of peers that wait for particular CID
peers_waiting_for_cid: FnvHashMap<CidGeneric<S>, SmallVec<[PeerId; 1]>>,
peers_waiting_for_cid: FnvHashMap<CidGeneric<S>, SmallVec<[Arc<PeerId>; 1]>>,
/// list of blocks received from blockstore or network, that connected peers may be waiting for
outgoing_queue: VecDeque<BlockWithCid<S>>,
/// list of events to be sent back to swarm when poll is called
Expand All @@ -54,7 +54,7 @@ where
}

enum TaskResult<const S: usize> {
Get(PeerId, Vec<GetCidResult<S>>),
Get(Arc<PeerId>, Vec<GetCidResult<S>>),
Cancelled,
}

Expand Down Expand Up @@ -151,7 +151,7 @@ where
}
}

fn schedule_store_get(&mut self, peer: PeerId, cids: Vec<CidGeneric<S>>) {
fn schedule_store_get(&mut self, peer: Arc<PeerId>, cids: Vec<CidGeneric<S>>) {
let store = self.store.clone();
let (_handle, reg) = AbortHandle::new_pair();

Expand All @@ -166,10 +166,10 @@ where
);
}

fn cancel_request(&mut self, peer: PeerId, cid: CidGeneric<S>) {
fn cancel_request(&mut self, peer: Arc<PeerId>, cid: CidGeneric<S>) {
// remove peer from the waitlist for cid, in case we happen to get it later
if let Entry::Occupied(mut entry) = self.peers_waiting_for_cid.entry(cid) {
if entry.get().as_ref() == [peer] {
if entry.get().as_ref() == [peer.clone()] {
entry.remove();
} else {
let peers = entry.get_mut();
Expand Down Expand Up @@ -197,16 +197,17 @@ where
removals.len()
);

let peer = Arc::new(peer);
for cid in &additions {
self.peers_waiting_for_cid
.entry(*cid)
.or_default()
.push(peer);
.push(peer.clone());
}
self.schedule_store_get(peer, additions);
self.schedule_store_get(peer.clone(), additions);

for cid in removals {
self.cancel_request(peer, cid);
self.cancel_request(peer.clone(), cid);
}
}

Expand All @@ -229,7 +230,7 @@ where
return false;
}

let mut blocks_ready_for_peer = FnvHashMap::<PeerId, Vec<(Vec<u8>, Vec<u8>)>>::default();
let mut blocks_ready_for_peer = FnvHashMap::<Arc<PeerId>, Vec<(Vec<u8>, Vec<u8>)>>::default();

while let Some((cid, data)) = self.outgoing_queue.pop_front() {
let Some(peers_waiting) = self.peers_waiting_for_cid.remove(&cid) else {
Expand All @@ -254,7 +255,7 @@ where
);
for (peer, blocks) in blocks_ready_for_peer {
self.outgoing_event_queue.push_back(ToSwarm::NotifyHandler {
peer_id: peer,
peer_id: *peer,
handler: NotifyHandler::Any,
event: ToHandlerEvent::QueueOutgoingMessages(blocks),
})
Expand All @@ -263,7 +264,7 @@ where
true
}

fn process_store_get_results(&mut self, peer: PeerId, results: Vec<GetCidResult<S>>) {
fn process_store_get_results(&mut self, peer: Arc<PeerId>, results: Vec<GetCidResult<S>>) {
for result in results {
let cid = result.cid;
match result.data {
Expand Down

0 comments on commit 1c452fe

Please sign in to comment.