Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove sync store and set max-missing-blocks-depth #216

Merged
merged 4 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 71 additions & 28 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ pub(crate) struct Config {
pub randomx_enabled: bool,
pub num_concurrent_syncs: usize,
pub num_sync_tips_to_keep: usize,
pub max_missing_blocks_sync_depth: usize,
}

impl Default for Config {
Expand All @@ -197,15 +198,17 @@ impl Default for Config {
randomx_enabled: true,
num_concurrent_syncs: 1,
num_sync_tips_to_keep: 10,
max_missing_blocks_sync_depth: 8,
}
}
}

struct SyncShareChain {
struct SyncMissingBlocks {
pub algo: PowAlgorithm,
pub peer: PeerId,
pub missing_parents: Vec<(u64, FixedHash)>,
pub is_from_new_block_notify: bool,
pub depth: usize,
}

struct PerformCatchUpSync {
Expand Down Expand Up @@ -283,7 +286,7 @@ pub(crate) struct ConnectionCounters {
}

enum InnerRequest {
DoSyncChain(SyncShareChain),
DoSyncMissingBlocks(SyncMissingBlocks),
PerformCatchUpSync(PerformCatchUpSync),
}

Expand Down Expand Up @@ -323,6 +326,7 @@ where S: ShareChain
randomx_in_progress_syncs: HashMap<PeerId, (OutboundRequestId, OwnedSemaphorePermit)>,
sha3x_in_progress_syncs: HashMap<PeerId, (OutboundRequestId, OwnedSemaphorePermit)>,
recent_synced_tips: HashMap<PowAlgorithm, Arc<RwLock<LruCache<PeerId, (u64, FixedHash)>>>>,
missing_blocks_sync_request_depth: HashMap<OutboundRequestId, usize>,
}

impl<S> Service<S>
Expand Down Expand Up @@ -388,6 +392,7 @@ where S: ShareChain
randomx_in_progress_syncs: HashMap::new(),
sha3x_in_progress_syncs: HashMap::new(),
recent_synced_tips,
missing_blocks_sync_request_depth: HashMap::new(),
})
}

Expand Down Expand Up @@ -682,15 +687,17 @@ where S: ShareChain
return Ok(MessageAcceptance::Accept);
}
info!(target: LOG_TARGET, squad = &self.config.squad; "We are missing less than 5 blocks, sending sync request with missing blocks to {}", propagation_source);
let sync_share_chain = SyncShareChain {
let sync_share_chain = SyncMissingBlocks {
algo,
peer: propagation_source,
missing_parents,
is_from_new_block_notify: true,
depth: 0,
};

let _unused =
self.inner_request_tx.send(InnerRequest::DoSyncChain(sync_share_chain));
let _unused = self
.inner_request_tx
.send(InnerRequest::DoSyncMissingBlocks(sync_share_chain));
return Ok(MessageAcceptance::Accept);
}
},
Expand Down Expand Up @@ -990,7 +997,7 @@ where S: ShareChain

/// Handle share chain sync response.
/// All the responding blocks will be tried to put into local share chain.
async fn handle_sync_missing_blocks_response(&mut self, response: SyncMissingBlocksResponse) {
async fn handle_sync_missing_blocks_response(&mut self, response: SyncMissingBlocksResponse, depth: usize) {
debug!(target: MESSAGE_LOGGING_LOG_TARGET, "Share chain sync response: {response:?}");
let peer = *response.peer_id();

Expand All @@ -1017,20 +1024,26 @@ where S: ShareChain
info!(target: SYNC_REQUEST_LOG_TARGET, squad; "Received sync response for chain {} from {} with blocks {:?}", algo, peer, blocks.iter().map(|a| format!("{}({:x}{:x}{:x}{:x})",a.height, a.hash[0], a.hash[1], a.hash[2], a.hash[3])).collect::<Vec<String>>());
let tx = self.inner_request_tx.clone();
let peer_store = self.network_peer_store.clone();
let max_sync_depth = self.config.max_missing_blocks_sync_depth;
tokio::spawn(async move {
match share_chain.add_synced_blocks(&blocks).await {
Ok(new_tip) => {
info!(target: LOG_TARGET, squad; "[{:?}] Synced blocks added to share chain: {}",algo, new_tip);
let missing_parents = new_tip.into_missing_parents_vec();
if !missing_parents.is_empty() {
let sync_share_chain = SyncShareChain {
if depth + 1 > max_sync_depth {
info!(target: SYNC_REQUEST_LOG_TARGET, squad; "Sync depth reached max depth of {}", max_sync_depth);
return;
}
let sync_share_chain = SyncMissingBlocks {
algo,
peer,
missing_parents,
is_from_new_block_notify: false,
depth: depth + 1,
};

let _unused = tx.send(InnerRequest::DoSyncChain(sync_share_chain));
let _unused = tx.send(InnerRequest::DoSyncMissingBlocks(sync_share_chain));
}
},
Err(error) => {
Expand All @@ -1049,19 +1062,25 @@ where S: ShareChain

/// Trigger share chain sync with another peer with the highest known block height.
/// Note: this is a "stop-the-world" operation, many operations are skipped when synchronizing.
async fn sync_missing_blocks(&mut self, sync_share_chain: SyncShareChain) {
async fn sync_missing_blocks(&mut self, sync_share_chain: SyncMissingBlocks) {
debug!(target: LOG_TARGET, squad = &self.config.squad; "Syncing share chain...");
let SyncShareChain {
let SyncMissingBlocks {
peer,
algo,
missing_parents,
is_from_new_block_notify,
depth,
} = sync_share_chain;

if depth + 1 > self.config.max_missing_blocks_sync_depth {
info!(target: SYNC_REQUEST_LOG_TARGET, squad = &self.config.squad; "Sync depth reached max depth of {}", self.config.max_missing_blocks_sync_depth);
return;
}

if is_from_new_block_notify {
info!(target: SYNC_REQUEST_LOG_TARGET, "[{}] Sending sync to connected peers for blocks {:?} from notify", algo, missing_parents.iter().map(|(height, hash)|format!("{}({:x}{:x}{:x}{:x})",height, hash[0], hash[1], hash[2], hash[3])).collect::<Vec<String>>());
} else {
info!(target: SYNC_REQUEST_LOG_TARGET, "[{}] Sending sync to connected peers for blocks {:?} from sync", algo, missing_parents.iter().map(|(height, hash)|format!("{}({:x}{:x}{:x}{:x})",height, hash[0], hash[1], hash[2], hash[3])).collect::<Vec<String>>());
debug!(target: SYNC_REQUEST_LOG_TARGET, "[{}] Sending sync to connected peers for blocks {:?} from sync", algo, missing_parents.iter().map(|(height, hash)|format!("{}({:x}{:x}{:x}{:x})",height, hash[0], hash[1], hash[2], hash[3])).collect::<Vec<String>>());
}
if missing_parents.is_empty() {
warn!(target: LOG_TARGET, squad = &self.config.squad; "Sync called but with no missing parents.");
Expand All @@ -1073,11 +1092,12 @@ where S: ShareChain
if !is_from_new_block_notify {
info!(target: SYNC_REQUEST_LOG_TARGET, squad = &self.config.squad; "Sending sync request direct to peer {} for blocks {:?} because we did not receive it from sync", peer, missing_parents.iter().map(|(height, hash)|format!("{}({:x}{:x}{:x}{:x})",height, hash[0], hash[1], hash[2], hash[3])).collect::<Vec<String>>());

let _outbound_id = self
let outbound_id = self
.swarm
.behaviour_mut()
.share_chain_sync
.send_request(&peer, SyncMissingBlocksRequest::new(algo, missing_parents.clone()));
self.missing_blocks_sync_request_depth.insert(outbound_id, depth + 1);
return;
}
// ask our connected peers rather than everyone swarming the original peer
Expand Down Expand Up @@ -1111,10 +1131,11 @@ where S: ShareChain
}
}

let _outbound_id = self.swarm.behaviour_mut().share_chain_sync.send_request(
let outbound_id = self.swarm.behaviour_mut().share_chain_sync.send_request(
&connected_peer,
SyncMissingBlocksRequest::new(algo, missing_parents.clone()),
);
self.missing_blocks_sync_request_depth.insert(outbound_id, depth + 1);
peers_asked += 1;
}

Expand Down Expand Up @@ -1285,20 +1306,30 @@ where S: ShareChain
} => {
self.handle_sync_missing_blocks_request(channel, request, &peer).await;
},
request_response::Message::Response {
request_id: _,
response,
} => match response {
Ok(response) => {
self.handle_sync_missing_blocks_response(response).await;
},
Err(error) => {
error!(target: LOG_TARGET, squad = &self.config.squad; "REQ-RES share chain sync response error: {error:?}");
},
request_response::Message::Response { request_id, response } => {
let current_depth = self.missing_blocks_sync_request_depth.remove(&request_id);
if let Some(depth) = current_depth {
match response {
Ok(response) => {
self.handle_sync_missing_blocks_response(response, depth).await;
},
Err(error) => {
error!(target: LOG_TARGET, squad = &self.config.squad; "REQ-RES share chain sync response error: {error:?}");
},
}
} else {
warn!(target: SYNC_REQUEST_LOG_TARGET, squad = &self.config.squad; "Received a response for a request that we didn't send: {peer:?} -> {response:?}");
return;
}
},
},
request_response::Event::OutboundFailure { peer, error, .. } => {
request_response::Event::OutboundFailure {
peer,
error,
request_id,
} => {
debug!(target: LOG_TARGET, squad = &self.config.squad; "REQ-RES outbound failure: {peer:?} -> {error:?}");
self.missing_blocks_sync_request_depth.remove(&request_id);
let mut should_grey_list = true;
match error {
OutboundFailure::DialFailure => {
Expand Down Expand Up @@ -1577,6 +1608,8 @@ where S: ShareChain
let squad = self.config.squad.clone();
let network_peer_store = self.network_peer_store.clone();
let recent_synced_tips = self.recent_synced_tips.get(&algo).cloned().unwrap();
// TODO: Remove
self.print_debug_chain_graph().await;

tokio::spawn(async move {
blocks.sort_by(|a, b| a.height.cmp(&b.height));
Expand Down Expand Up @@ -1610,13 +1643,14 @@ where S: ShareChain
info!(target: LOG_TARGET, "[{:?}] Blocks via catchup sync result {}", algo, new_tip);
let missing_parents = new_tip.into_missing_parents_vec();
if !missing_parents.is_empty() {
let sync_share_chain = SyncShareChain {
let sync_share_chain = SyncMissingBlocks {
algo,
peer,
missing_parents,
is_from_new_block_notify: false,
depth: 0,
};
let _unused = tx.send(InnerRequest::DoSyncChain(sync_share_chain));
let _unused = tx.send(InnerRequest::DoSyncMissingBlocks(sync_share_chain));
}

info!(target: SYNC_REQUEST_LOG_TARGET, squad = &squad; "Synced blocks added to share chain");
Expand Down Expand Up @@ -1645,7 +1679,15 @@ where S: ShareChain
} else {
info!(target: SYNC_REQUEST_LOG_TARGET, "Catch up sync completed for chain {} from {} after {} catchups", algo, peer, num_catchups.map(|s| s.to_string()).unwrap_or_else(|| "None".to_string()));
// this only gets called after sync completes, lets set synced status = true
synced_bool.store(true, std::sync::atomic::Ordering::SeqCst);
let (max_known_network_height, max_known_network_pow, peer_with_best) =
peer_store_write_lock.max_known_network_height(algo);

if our_pow.as_u128() > max_known_network_pow {
info!(target: SYNC_REQUEST_LOG_TARGET, "[{}] our pow is greater than max known network pow, we are now synced", algo);
synced_bool.store(true, std::sync::atomic::Ordering::SeqCst);
} else {
info!(target: SYNC_REQUEST_LOG_TARGET, "[{}] our pow is less than max known network pow, we are not synced, will continue to search for better POW. Best peer is {} at height {}", algo, peer_with_best.map(|p| p.to_base58()).unwrap_or_else(|| "None".to_string()), max_known_network_height);
}
peer_store_write_lock.reset_catch_up_attempts(&peer);
}
if timer.elapsed() > MAX_ACCEPTABLE_P2P_MESSAGE_TIMEOUT {
Expand Down Expand Up @@ -1817,6 +1859,7 @@ where S: ShareChain
.write()
.await
.update_last_sync_attempt(peer, algo);

Ok(())
}

Expand Down Expand Up @@ -2000,7 +2043,7 @@ where S: ShareChain

async fn handle_inner_request(&mut self, req: InnerRequest) {
match req {
InnerRequest::DoSyncChain(sync_chain) => {
InnerRequest::DoSyncMissingBlocks(sync_chain) => {
self.sync_missing_blocks(sync_chain).await;
},
InnerRequest::PerformCatchUpSync(perform_catch_up_sync) => {
Expand Down
30 changes: 29 additions & 1 deletion src/server/p2p/peer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{
use anyhow::Error;
use libp2p::PeerId;
use log::warn;
use tari_core::proof_of_work::PowAlgorithm;
use tari_core::proof_of_work::{AccumulatedDifficulty, PowAlgorithm};
use tari_utilities::epoch_time::EpochTime;

use crate::server::{http::stats_collector::StatsBroadcastClient, p2p::messages::PeerInfo, PROTOCOL_VERSION};
Expand Down Expand Up @@ -123,6 +123,34 @@ impl PeerStore {
}
}

pub fn max_known_network_height(&self, algo: PowAlgorithm) -> (u64, u128, Option<PeerId>) {
let mut max_height = 0;
let mut max_pow = 0;
let mut peer_with_highest = None;
for record in self.whitelist_peers.values() {
match algo {
PowAlgorithm::RandomX => {
let achieved_pow = record.peer_info.current_random_x_pow;
if achieved_pow > max_pow {
max_pow = achieved_pow;
max_height = record.peer_info.current_random_x_height;
peer_with_highest = Some(record.peer_id.clone());
}
},
PowAlgorithm::Sha3x => {
let achieved_pow = record.peer_info.current_sha3x_pow;
if achieved_pow > max_pow {
max_pow = achieved_pow;
max_height = record.peer_info.current_sha3x_height;
peer_with_highest = Some(record.peer_id.clone());
}
},
}
}

(max_height, max_pow, peer_with_highest)
}

pub fn add_seed_peers(&mut self, mut peer_ids: Vec<PeerId>) {
self.seed_peers.append(&mut peer_ids);
}
Expand Down
Loading
Loading