Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/development' into st-do-catchup-…
Browse files Browse the repository at this point in the history
…from-connected-peers
  • Loading branch information
stringhandler committed Nov 21, 2024
2 parents 4846c25 + 9f6f61b commit f4db865
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 99 deletions.
122 changes: 44 additions & 78 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ pub(crate) struct ConnectedPeerInfo {
pub peer_id: String,
pub peer_info: PeerInfo,
pub last_grey_list_reason: Option<String>,
pub last_notify: Option<NotifyNewTipBlock>,
pub last_ping: Option<EpochTime>, /* peer_addresses: Vec<Multiaddr>,
* is_pending: bol, */
}
Expand Down Expand Up @@ -525,7 +524,11 @@ where S: ShareChain

/// Main method to handle any message comes from gossipsub.
#[allow(clippy::too_many_lines)]
async fn handle_new_gossipsub_message(&mut self, message: Message) -> Result<MessageAcceptance, Error> {
async fn handle_new_gossipsub_message(
&mut self,
message: Message,
propagation_source: PeerId,
) -> Result<MessageAcceptance, Error> {
debug!(target: MESSAGE_LOGGING_LOG_TARGET, "New gossipsub message: {message:?}");
let _ = self.stats_broadcast_client.send_gossipsub_message_received();
let source_peer = message.source;
Expand Down Expand Up @@ -583,11 +586,7 @@ where S: ShareChain
}
// lets check age
// if this timestamp is older than 60 seconds, we reject it
if payload.timestamp < EpochTime::now().as_u64().saturating_sub(20) {
info!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a notify message that is too old, skipping", source_peer);
return Ok(MessageAcceptance::Reject);
}
if payload.timestamp < EpochTime::now().as_u64().saturating_sub(10) {
if payload.timestamp < EpochTime::now().as_u64().saturating_sub(60) {
info!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a notify message that is too old, skipping", source_peer);
return Ok(MessageAcceptance::Ignore);
}
Expand All @@ -606,15 +605,6 @@ where S: ShareChain
}
info!(target: NEW_TIP_NOTIFY_LOGGING_LOG_TARGET, "[SQUAD_NEW_BLOCK_TOPIC] New block from gossip: {source_peer:?} -> {payload:?}");

if self.network_peer_store.read().await.is_whitelisted(&source_peer) {
warn!(target: LOG_TARGET, squad = &self.config.squad; "Received a block from a peer {}, but it is not whitelisted. Will process anyway, but may not be able to switch to this chain. Heights:{}", source_peer, &payload.new_blocks.iter().map(|b| b.height.to_string()).join(","));
// return Ok(MessageAcceptance::Accept);
}
self.network_peer_store
.write()
.await
.add_last_new_tip_notify(&source_peer, payload.clone());

// If we don't have this peer, try do peer exchange
// if !self.network_peer_store.exists(message_peer) {
// self.initiate_direct_peer_exchange(message_peer).await;
Expand Down Expand Up @@ -661,25 +651,7 @@ where S: ShareChain
.map(|payload| payload.height)
.max()
.unwrap_or(0);
// Either the tip is too far ahead and we need to catch up, or the chain is below ours but
// with a higher proof of work, so we then need to reorg to that chain.
// In both cases catchup sync will be better than simple sync
if max_payload_height > our_tip.saturating_add(2) ||
max_payload_height < our_tip.saturating_sub(5)
{
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a block that is too far ahead, we need to initiate chatchup sync", message_peer);
let perform_catch_up_sync = PerformCatchUpSync {
algo,
peer: *message_peer,
last_block_from_them: None,
their_height: max_payload_height,
};
let _ = self
.inner_request_tx
.send(InnerRequest::PerformCatchUpSync(perform_catch_up_sync));
// .await;
return Ok(MessageAcceptance::Ignore);
}

let mut blocks: Vec<P2Block> = payload.new_blocks.iter().cloned().collect();
for block in &mut blocks {
block.verified = false;
Expand All @@ -692,9 +664,15 @@ where S: ShareChain
Err(crate::sharechain::error::ShareChainError::BlockParentDoesNotExist {
missing_parents,
}) => {
let num_missing_parents = missing_parents.len();
if num_missing_parents > 5 {
info!(target: LOG_TARGET, squad = &self.config.squad; "We are missing more than 5 blocks, we are missing: {}", num_missing_parents);
return Ok(MessageAcceptance::Accept);
}
info!(target: LOG_TARGET, squad = &self.config.squad; "We are missing less than 5 blocks, sending sync request with missing blocks to {}", propagation_source);
let sync_share_chain = SyncShareChain {
algo,
peer: *message_peer,
peer: propagation_source,
missing_parents,
is_from_new_block_notify: false,
};
Expand Down Expand Up @@ -1150,7 +1128,7 @@ where S: ShareChain
message,
message_id,
propagation_source,
} => match self.handle_new_gossipsub_message(message).await {
} => match self.handle_new_gossipsub_message(message, propagation_source).await {
Ok(res) => {
let _ = self.swarm.behaviour_mut().gossipsub.report_message_validation_result(
&message_id,
Expand Down Expand Up @@ -1274,31 +1252,8 @@ where S: ShareChain
self.handle_catch_up_sync_request(channel, request).await;
},
request_response::Message::Response { request_id, response } => {
let should_remove: Vec<PeerId> = self
.randomx_in_progress_syncs
.iter()
.filter_map(|(peer, r)| if r.0 == request_id { Some(peer.clone()) } else { None })
.collect();
for peer in should_remove {
if let Some((_r, permit)) = self.randomx_in_progress_syncs.remove(&peer) {
// Probably don't need to do this
drop(permit);
}
}

let should_remove: Vec<PeerId> = self
.sha3x_in_progress_syncs
.iter()
.filter_map(|(peer, r)| if r.0 == request_id { Some(peer.clone()) } else { None })
.collect();
for peer in should_remove {
if let Some((_r, permit)) = self.sha3x_in_progress_syncs.remove(&peer) {
// Probably don't need to do this
drop(permit);
}
}

self.handle_catch_up_sync_response(response).await;
self.release_catchup_sync_permit(request_id);
},
},
request_response::Event::OutboundFailure {
Expand Down Expand Up @@ -1439,6 +1394,32 @@ where S: ShareChain
});
}

fn release_catchup_sync_permit(&mut self, request_id: OutboundRequestId) {
let should_remove: Vec<PeerId> = self
.randomx_in_progress_syncs
.iter()
.filter_map(|(peer, r)| if r.0 == request_id { Some(peer.clone()) } else { None })
.collect();
for peer in should_remove {
if let Some((_r, permit)) = self.randomx_in_progress_syncs.remove(&peer) {
// Probably don't need to do this
drop(permit);
}
}

let should_remove: Vec<PeerId> = self
.sha3x_in_progress_syncs
.iter()
.filter_map(|(peer, r)| if r.0 == request_id { Some(peer.clone()) } else { None })
.collect();
for peer in should_remove {
if let Some((_r, permit)) = self.sha3x_in_progress_syncs.remove(&peer) {
// Probably don't need to do this
drop(permit);
}
}
}

async fn handle_catch_up_sync_response(&mut self, response: CatchUpSyncResponse) {
debug!(target: MESSAGE_LOGGING_LOG_TARGET, "Catch up sync response: {response:?}");
if response.version != PROTOCOL_VERSION {
Expand Down Expand Up @@ -1764,7 +1745,6 @@ where S: ShareChain
peer_id: p.to_string(),
peer_info: info.peer_info.clone(),
last_grey_list_reason: info.last_grey_list_reason.clone(),
last_notify: info.last_new_tip_notify.as_ref().map(|e| e.as_ref().clone()),
last_ping: info.last_ping,
});
}
Expand All @@ -1775,7 +1755,6 @@ where S: ShareChain
peer_id: p.to_string(),
peer_info: info.peer_info.clone(),
last_grey_list_reason: info.last_grey_list_reason.clone(),
last_notify: info.last_new_tip_notify.as_ref().map(|e| e.as_ref().clone()),
last_ping: info.last_ping,
});
}
Expand All @@ -1792,7 +1771,6 @@ where S: ShareChain
peer_id: p.to_string(),
peer_info: peer_info.peer_info,
last_grey_list_reason: peer_info.last_grey_list_reason,
last_notify: peer_info.last_new_tip_notify.as_ref().map(|e| e.as_ref().clone()),
last_ping: peer_info.last_ping,
};
res.push(p);
Expand Down Expand Up @@ -1915,9 +1893,7 @@ where S: ShareChain
tokio::time::interval(Duration::from_secs(60 * 60 * 24))
};
debug_chain_graph.set_missed_tick_behavior(MissedTickBehavior::Skip);
// TODO: Not sure why this is done on a loop instead of just once....
// let mut kademlia_bootstrap_interval = tokio::time::interval(Duration::from_secs(12 * 60 * 60));
// kademlia_bootstrap_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

let shutdown_signal = self.shutdown_signal.clone();
tokio::pin!(shutdown_signal);

Expand Down Expand Up @@ -2135,27 +2111,17 @@ where S: ShareChain
}

for record in best_peers {
let (their_height, their_pow, last_sync_attempt) = match algo {
let (their_height, their_pow) = match algo {
PowAlgorithm::RandomX => (
record.peer_info.current_random_x_height,
AccumulatedDifficulty::from_u128(record.peer_info.current_random_x_pow).unwrap_or_default(),
record.last_rx_sync_attempt,
),
PowAlgorithm::Sha3x => (
record.peer_info.current_sha3x_height,
AccumulatedDifficulty::from_u128(record.peer_info.current_sha3x_pow).unwrap_or_default(),
record.last_sha3x_sync_attempt,
),
};

if last_sync_attempt
.map(|d| d.elapsed() < CATCHUP_SYNC_TIMEOUT)
.unwrap_or(false)
{
info!(target: LOG_TARGET, squad = &self.config.squad; "Already in progress with sync from {}", record.peer_id);
continue;
}

if their_pow > our_pow {
peer_with_better_pow = true;
info!(target: LOG_TARGET, squad = &self.config.squad; "[{:?}] Trying to perform catchup sync from peer: {} with height {}", algo,record.peer_id, their_height);
Expand Down
23 changes: 2 additions & 21 deletions src/server/p2p/peer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ pub(crate) struct PeerStoreRecord {
pub num_grey_listings: u64,
pub last_grey_list_reason: Option<String>,
pub catch_up_attempts: u64,
pub last_new_tip_notify: Option<Arc<NotifyNewTipBlock>>,
pub last_ping: Option<EpochTime>,
}

Expand All @@ -64,7 +63,6 @@ impl PeerStoreRecord {
created: Instant::now(),
last_grey_list_reason: None,
catch_up_attempts: 0,
last_new_tip_notify: None,
last_ping: None,
}
}
Expand All @@ -75,12 +73,8 @@ impl PeerStoreRecord {
}

pub fn last_seen(&self) -> EpochTime {
self.last_ping.unwrap_or_else(|| {
self.last_new_tip_notify
.as_ref()
.map(|n| EpochTime::from(n.timestamp))
.unwrap_or(EpochTime::from(self.peer_info.timestamp))
})
self.last_ping
.unwrap_or_else(|| EpochTime::from(self.peer_info.timestamp))
}
}

Expand Down Expand Up @@ -159,19 +153,6 @@ impl PeerStore {
.map(|record| record.catch_up_attempts as usize)
}

pub fn add_last_new_tip_notify(&mut self, peer_id: &PeerId, notify: Arc<NotifyNewTipBlock>) {
if let Some(entry) = self.whitelist_peers.get_mut(&peer_id.to_base58()) {
let mut new_record = entry.clone();
new_record.last_new_tip_notify = Some(notify.clone());
*entry = new_record;
}
if let Some(entry) = self.greylist_peers.get_mut(&peer_id.to_base58()) {
let mut new_record = entry.clone();
new_record.last_new_tip_notify = Some(notify);
*entry = new_record;
}
}

pub fn add_catch_up_attempt(&mut self, peer_id: &PeerId) {
if let Some(entry) = self.whitelist_peers.get_mut(&peer_id.to_base58()) {
let mut new_record = entry.clone();
Expand Down

0 comments on commit f4db865

Please sign in to comment.