Skip to content

Commit

Permalink
Merge branch 'development' of github.com:tari-project/sha-p2pool into…
Browse files Browse the repository at this point in the history
… development
  • Loading branch information
stringhandler committed Nov 22, 2024
2 parents 132d766 + 5a26674 commit 95b2525
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 22 deletions.
32 changes: 16 additions & 16 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ where S: ShareChain
return Ok(MessageAcceptance::Reject);
}

info!(target: LOG_TARGET, squad = &self.config.squad; "🆕 New block from broadcast: {:?}", &payload.new_blocks.iter().map(|b| b.height.to_string()).collect::<Vec<String>>());
info!(target: LOG_TARGET, squad = &self.config.squad; "[{:?}]🆕 New block from broadcast: {:?}", &payload.new_blocks.first().unwrap().original_header.pow.pow_algo ,&payload.new_blocks.iter().map(|b| b.height.to_string()).collect::<Vec<String>>());
// info!(target: LOG_TARGET, squad = &self.config.squad; "🆕 New blocks from broadcast:
// {:?}", &payload.new_blocks.iter().map(|b| b.hash.to_hex()).collect::<Vec<String>>());
let algo = payload.algo();
Expand Down Expand Up @@ -1006,17 +1006,6 @@ where S: ShareChain
missing_parents,
is_from_new_block_notify,
} = sync_share_chain;
let peer_store_read_lock = self.network_peer_store.read().await;
if peer_store_read_lock.is_blacklisted(&peer) {
warn!(target: LOG_TARGET, squad = &self.config.squad; "Peer is blacklisted, skipping sync");
return;
}

if !peer_store_read_lock.is_whitelisted(&peer) {
info!(target: LOG_TARGET, squad = &self.config.squad; "Peer is not whitelisted, will still try to sync");
// return;
}
drop(peer_store_read_lock);

if is_from_new_block_notify {
info!(target: SYNC_REQUEST_LOG_TARGET, "[{}] Sending sync to connected peers for blocks {:?} from notify", algo, missing_parents.iter().map(|(height, hash)|format!("{}({:x}{:x}{:x}{:x})",height.to_string(), hash[0], hash[1], hash[2], hash[3])).collect::<Vec<String>>());
Expand All @@ -1041,13 +1030,24 @@ where S: ShareChain
// };

// ask our connected peers rather than everyone swarming the original peer
let mut sent_to_original_peer = false;
let connected_peers: Vec<_> = self.swarm.connected_peers().cloned().collect();
for connected_peer in connected_peers {
if connected_peer == peer {
sent_to_original_peer = true;
}
let _outbound_id = self.swarm.behaviour_mut().share_chain_sync.send_request(
&connected_peer,
ShareChainSyncRequest::new(algo, missing_parents.clone()),
);
}
if !sent_to_original_peer && !is_from_new_block_notify {
let _outbound_id = self
.swarm
.behaviour_mut()
.share_chain_sync
.send_request(&peer, ShareChainSyncRequest::new(algo, missing_parents.clone()));
}
}

/// Main method to handle libp2p events.
Expand Down Expand Up @@ -1442,7 +1442,7 @@ where S: ShareChain
let their_tip_hash = *response.tip_hash();
let their_height = response.tip_height();
let their_pow = response.achieved_pow();
let blocks: Vec<_> = response.into_blocks().into_iter().map(Arc::new).collect();
let mut blocks: Vec<_> = response.into_blocks().into_iter().map(Arc::new).collect();
info!(target: SYNC_REQUEST_LOG_TARGET, "Received catch up sync response for chain {} from {} with blocks {}. Their tip: {}:{}", algo, peer, blocks.iter().map(|a| a.height.to_string()).join(", "), their_height, &their_tip_hash.to_hex()[0..8]);
if blocks.is_empty() {
return;
Expand All @@ -1453,6 +1453,7 @@ where S: ShareChain
let synced_bool = self.are_we_synced_with_p2pool.clone();

tokio::spawn(async move {
blocks.sort_by(|a, b| a.height.cmp(&b.height));
let last_block_from_them = blocks.last().map(|b| (b.height, b.hash));
let mut missing_blocks = HashSet::new();
for b in &blocks {
Expand All @@ -1463,7 +1464,6 @@ where S: ShareChain
Err(error) => match error {
crate::sharechain::error::ShareChainError::BlockParentDoesNotExist { missing_parents } => {
// This should not happen though, catchup should return all blocks
warn!(target: SYNC_REQUEST_LOG_TARGET, squad; "Catchup sync Reporting missing blocks {}", missing_parents.len());
for (height, hash) in missing_parents {
missing_blocks.insert((height, hash));
}
Expand All @@ -1487,19 +1487,19 @@ where S: ShareChain
};
}
if missing_blocks.len() > 0 {
warn!(target: SYNC_REQUEST_LOG_TARGET, squad; "Catchup sync Reporting missing blocks {}", missing_blocks.len());
let sync_share_chain = SyncShareChain {
algo,
peer,
missing_parents: missing_blocks.into_iter().collect(),
is_from_new_block_notify: false,
};
let _ = tx.send(InnerRequest::DoSyncChain(sync_share_chain));
return;
}

info!(target: SYNC_REQUEST_LOG_TARGET, squad = &squad; "Synced blocks added to share chain");
let our_pow = share_chain.get_total_chain_pow().await;
let mut must_continue_sync = missing_blocks.is_empty() && their_pow > our_pow.as_u128();
let mut must_continue_sync = their_pow > our_pow.as_u128();
info!(target: SYNC_REQUEST_LOG_TARGET, "[{:?}] must continue: {}", algo, must_continue_sync);
// Check if we have recieved their tip
if blocks.iter().any(|b| b.hash == their_tip_hash) {
Expand Down
2 changes: 0 additions & 2 deletions src/server/p2p/peer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::{
io::{BufReader, Write},
path::Path,
str::FromStr,
sync::Arc,
time::{Duration, Instant},
};

Expand All @@ -17,7 +16,6 @@ use log::warn;
use tari_core::proof_of_work::PowAlgorithm;
use tari_utilities::epoch_time::EpochTime;

use super::messages::NotifyNewTipBlock;
use crate::server::{http::stats_collector::StatsBroadcastClient, p2p::messages::PeerInfo, PROTOCOL_VERSION};

const LOG_TARGET: &str = "tari::p2pool::peer_store";
Expand Down
6 changes: 2 additions & 4 deletions src/sharechain/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use super::{
UNCLE_REWARD_SHARE,
};
use crate::{
main,
server::{http::stats_collector::StatsBroadcastClient, PROTOCOL_VERSION},
sharechain::{
error::{ShareChainError, ValidationError},
Expand Down Expand Up @@ -329,7 +328,6 @@ impl InMemoryShareChain {
}
};


loop {
for block in level.blocks.values() {
if main_chain_only {
Expand Down Expand Up @@ -698,7 +696,7 @@ impl ShareChain for InMemoryShareChain {
if let Some(last_block_received) = last_block_received {
if let Some(level) = p2_chain_read.level_at_height(last_block_received.0) {
if let Some(block) = level.blocks.get(&last_block_received.1) {
split_height = block.height;
split_height = block.height.saturating_add(1);
}
}
}
Expand All @@ -715,7 +713,7 @@ impl ShareChain for InMemoryShareChain {
if let Some(block) = level.blocks.get(&their_block.1) {
// Only split if the block is in the main chain
if level.chain_block == block.hash {
split_height2 = block.height;
split_height2 = block.height.saturating_add(1);
break;
}
}
Expand Down

0 comments on commit 95b2525

Please sign in to comment.