From 1c452fe924bbae481079c723bfeb1d3cc71e42f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Florkiewicz?= Date: Wed, 28 Feb 2024 17:53:07 +0100 Subject: [PATCH] Arc --- src/server.rs | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/server.rs b/src/server.rs index ae473cb..d7fc1c8 100644 --- a/src/server.rs +++ b/src/server.rs @@ -44,7 +44,7 @@ where /// list of CIDs each connected peer is waiting for peers_wantlists: FnvHashMap>, /// list of peers that wait for particular CID - peers_waiting_for_cid: FnvHashMap, SmallVec<[PeerId; 1]>>, + peers_waiting_for_cid: FnvHashMap, SmallVec<[Arc; 1]>>, /// list of blocks received from blockstore or network, that connected peers may be waiting for outgoing_queue: VecDeque>, /// list of events to be sent back to swarm when poll is called @@ -54,7 +54,7 @@ where } enum TaskResult { - Get(PeerId, Vec>), + Get(Arc, Vec>), Cancelled, } @@ -151,7 +151,7 @@ where } } - fn schedule_store_get(&mut self, peer: PeerId, cids: Vec>) { + fn schedule_store_get(&mut self, peer: Arc, cids: Vec>) { let store = self.store.clone(); let (_handle, reg) = AbortHandle::new_pair(); @@ -166,10 +166,10 @@ where ); } - fn cancel_request(&mut self, peer: PeerId, cid: CidGeneric) { + fn cancel_request(&mut self, peer: Arc, cid: CidGeneric) { // remove peer from the waitlist for cid, in case we happen to get it later if let Entry::Occupied(mut entry) = self.peers_waiting_for_cid.entry(cid) { - if entry.get().as_ref() == [peer] { + if entry.get().as_ref() == [peer.clone()] { entry.remove(); } else { let peers = entry.get_mut(); @@ -197,16 +197,17 @@ where removals.len() ); + let peer = Arc::new(peer); for cid in &additions { self.peers_waiting_for_cid .entry(*cid) .or_default() - .push(peer); + .push(peer.clone()); } - self.schedule_store_get(peer, additions); + self.schedule_store_get(peer.clone(), additions); for cid in removals { - self.cancel_request(peer, cid); + self.cancel_request(peer.clone(), cid); } } @@ -229,7 +230,7 @@ where return false; } - let mut blocks_ready_for_peer = FnvHashMap::, Vec)>>::default(); + let mut blocks_ready_for_peer = FnvHashMap::, Vec<(Vec, Vec)>>::default(); while let Some((cid, data)) = self.outgoing_queue.pop_front() { let Some(peers_waiting) = self.peers_waiting_for_cid.remove(&cid) else { @@ -254,7 +255,7 @@ where ); for (peer, blocks) in blocks_ready_for_peer { self.outgoing_event_queue.push_back(ToSwarm::NotifyHandler { - peer_id: peer, + peer_id: *peer, handler: NotifyHandler::Any, event: ToHandlerEvent::QueueOutgoingMessages(blocks), }) @@ -263,7 +264,7 @@ where true } - fn process_store_get_results(&mut self, peer: PeerId, results: Vec>) { + fn process_store_get_results(&mut self, peer: Arc, results: Vec>) { for result in results { let cid = result.cid; match result.data {