Skip to content

Commit

Permalink
fix: better gossip messages (#94)
Browse files Browse the repository at this point in the history
Co-Authored-By: @SWvheerden

---------

Co-authored-by: SW van Heerden <[email protected]>
  • Loading branch information
stringhandler and SWvheerden authored Oct 21, 2024
1 parent 546255d commit a365f2a
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 60 deletions.
10 changes: 7 additions & 3 deletions src/server/grpc/p2pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::{
server::{
grpc::{error::Error, util, util::convert_coinbase_extra, MAX_ACCEPTABLE_GRPC_TIMEOUT},
http::stats_collector::StatsBroadcastClient,
p2p::{client::ServiceClient, Squad},
p2p::{client::ServiceClient, messages::NotifyNewTipBlock, Squad},
},
sharechain::{p2block::P2Block, BlockValidationParams, ShareChain},
};
Expand Down Expand Up @@ -125,9 +125,14 @@ where S: ShareChain
match share_chain.submit_block(block.clone()).await {
Ok(_) => {
let _ = self.stats_broadcast.send_miner_block_accepted(pow_algo);
let mut new_blocks = vec![(block.height, block.hash.clone())];
for uncle in block.uncles.iter() {
new_blocks.push(uncle.clone());
}
let notify = NotifyNewTipBlock::new(pow_algo, new_blocks);
let res = self
.p2p_client
.broadcast_block(block.clone())
.broadcast_block(notify)
.map_err(|error| Status::internal(error.to_string()));
if res.is_ok() {
info!(target: LOG_TARGET, "Broadcast new block: {:?}", block.hash.to_hex());
Expand Down Expand Up @@ -358,7 +363,6 @@ where S: ShareChain
let tari_hash = tari_block.header.hash();
tari_block.header.nonce = mined_nonce;
tari_block.header.pow.pow_data = temp_pow_data;
//todo dont remove, just peek
let mut p2pool_block = match pow_algo{
PowAlgorithm::Sha3x => self
.template_store_sha3x
Expand Down
12 changes: 5 additions & 7 deletions src/server/p2p/client.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::sync::Arc;

use tokio::sync::broadcast;

use crate::sharechain::p2block::P2Block;
use crate::server::p2p::messages::NotifyNewTipBlock;

pub struct ServiceClient {
broadcast_block_sender: broadcast::Sender<Arc<P2Block>>,
broadcast_block_sender: broadcast::Sender<NotifyNewTipBlock>,
}

impl ServiceClient {
pub fn new(broadcast_block_sender: broadcast::Sender<Arc<P2Block>>) -> Self {
pub fn new(broadcast_block_sender: broadcast::Sender<NotifyNewTipBlock>) -> Self {
Self { broadcast_block_sender }
}

/// Triggering broadcasting of a new block to p2pool network.
pub fn broadcast_block(&self, block: Arc<P2Block>) -> Result<(), anyhow::Error> {
self.broadcast_block_sender.send(block)?;
pub fn broadcast_block(&self, new_blocks: NotifyNewTipBlock) -> Result<(), anyhow::Error> {
self.broadcast_block_sender.send(new_blocks)?;

Ok(())
}
Expand Down
24 changes: 23 additions & 1 deletion src/server/p2p/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl PeerInfo {
) -> Self {
let timestamp = EpochTime::now();
Self {
version: 6,
version: 7,
current_sha3x_height,
current_random_x_height,
squad,
Expand Down Expand Up @@ -120,6 +120,28 @@ pub struct LocalShareChainSyncRequest {
pub request: ShareChainSyncRequest,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct NotifyNewTipBlock {
pub version: u32,
pub algo: u64,
pub new_blocks: Vec<(u64, FixedHash)>,
}
impl_conversions!(NotifyNewTipBlock);

impl NotifyNewTipBlock {
pub fn new(algo: PowAlgorithm, new_blocks: Vec<(u64, FixedHash)>) -> Self {
Self {
version: 1,
algo: algo.as_u64(),
new_blocks,
}
}

pub fn algo(&self) -> PowAlgorithm {
PowAlgorithm::try_from(self.algo).unwrap()
}
}

impl LocalShareChainSyncRequest {
pub fn new(peer_id: PeerId, request: ShareChainSyncRequest) -> Self {
Self { peer_id, request }
Expand Down
4 changes: 2 additions & 2 deletions src/server/p2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ mod relay_store;
pub mod util;
pub const MAX_SNOOZES: usize = 5;
pub const MAX_SNOOZE_DURATION: Duration = Duration::from_secs(2); // 2 seconds, max 10 seconds of snoozing
pub const MIN_PEER_INFO_VERSION: u64 = 6;
pub const MIN_BLOCK_VERSION: u32 = 7;
pub const MIN_PEER_INFO_VERSION: u64 = 7;
pub const MIN_NOTIFY_VERSION: u32 = 1;
115 changes: 68 additions & 47 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ use tokio::{
time::MissedTickBehavior,
};

use super::messages::{DirectPeerInfoRequest, DirectPeerInfoResponse};
use super::messages::{DirectPeerInfoRequest, DirectPeerInfoResponse, NotifyNewTipBlock};
use crate::{
server::{
config,
Expand All @@ -84,7 +84,7 @@ use crate::{
relay_store::RelayStore,
Error,
LibP2PError,
MIN_BLOCK_VERSION,
MIN_NOTIFY_VERSION,
MIN_PEER_INFO_VERSION,
},
},
Expand All @@ -95,7 +95,7 @@ use crate::{
};

const PEER_INFO_TOPIC: &str = "peer_info";
const NEW_BLOCK_TOPIC: &str = "new_block";
const BLOCK_NOTIFY_TOPIC: &str = "block_notify";
const SHARE_CHAIN_SYNC_REQ_RESP_PROTOCOL: &str = "/share_chain_sync/3";
const DIRECT_PEER_EXCHANGE_REQ_RESP_PROTOCOL: &str = "/tari_direct_peer_info/3";
const LOG_TARGET: &str = "tari::p2pool::server::p2p";
Expand Down Expand Up @@ -255,8 +255,8 @@ where S: ShareChain
query_rx: mpsc::Receiver<P2pServiceQuery>,
// service client related channels
// TODO: consider mpsc channels instead of broadcast to not miss any message (might drop)
client_broadcast_block_tx: broadcast::Sender<Arc<P2Block>>,
client_broadcast_block_rx: broadcast::Receiver<Arc<P2Block>>,
client_broadcast_block_tx: broadcast::Sender<NotifyNewTipBlock>,
client_broadcast_block_rx: broadcast::Receiver<NotifyNewTipBlock>,

relay_store: Arc<RwLock<RelayStore>>,
}
Expand All @@ -276,7 +276,7 @@ where S: ShareChain
let swarm = Self::new_swarm(config).await?;

// client related channels
let (broadcast_block_tx, broadcast_block_rx) = broadcast::channel::<Arc<P2Block>>(100);
let (broadcast_block_tx, broadcast_block_rx) = broadcast::channel::<NotifyNewTipBlock>(100);
// let (_share_chain_sync_tx, _share_chain_sync_rx) = broadcast::channel::<LocalShareChainSyncRequest>(1000);
// let (snooze_block_tx, snooze_block_rx) = mpsc::channel::<(usize, P2Block)>(1000);
let (query_tx, query_rx) = mpsc::channel(100);
Expand Down Expand Up @@ -481,23 +481,23 @@ where S: ShareChain
}

/// Broadcasting a new mined [`Block`] to the network (assume it is already validated with the network).
async fn broadcast_block(&mut self, result: Result<Arc<P2Block>, RecvError>) {
async fn broadcast_block(&mut self, result: Result<NotifyNewTipBlock, RecvError>) {
dbg!("Broadcast block");
// if self.sync_in_progress.load(Ordering::SeqCst) {
// return;
// }

match result {
Ok(block) => {
let block_raw_result: Result<Vec<u8>, Error> = (*block).clone().try_into();
let block_raw_result: Result<Vec<u8>, Error> = block.clone().try_into();
match block_raw_result {
Ok(block_raw) => {
match self
.swarm
.behaviour_mut()
.gossipsub
.publish(
IdentTopic::new(Self::squad_topic(&self.config.squad, NEW_BLOCK_TOPIC)),
IdentTopic::new(Self::squad_topic(&self.config.squad, BLOCK_NOTIFY_TOPIC)),
block_raw,
)
// .map_err(|error| Error::LibP2P(LibP2PError::Publish(error)))
Expand Down Expand Up @@ -560,7 +560,7 @@ where S: ShareChain
}
self.subscribe(PEER_INFO_TOPIC, false);
self.subscribe(PEER_INFO_TOPIC, true);
self.subscribe(NEW_BLOCK_TOPIC, true);
self.subscribe(BLOCK_NOTIFY_TOPIC, true);
}

/// Main method to handle any message comes from gossipsub.
Expand Down Expand Up @@ -619,38 +619,54 @@ where S: ShareChain
// TODO: send a signature that proves that the actual block was coming from this peer
// TODO: (sender peer's wallet address should be included always in the conibases with a fixed percent (like
// 20%))
topic if topic == Self::squad_topic(&self.config.squad, NEW_BLOCK_TOPIC) => {
topic if topic == Self::squad_topic(&self.config.squad, BLOCK_NOTIFY_TOPIC) => {
debug!(target: MESSAGE_LOGGING_LOG_TARGET, "[SQUAD_NEW_BLOCK_TOPIC] New block from gossip: {peer:?}");

// if self.sync_in_progress.load(Ordering::SeqCst) {
// return;
// }
match P2Block::try_from(message) {
match NotifyNewTipBlock::try_from(message) {
Ok(mut payload) => {
payload.verified = false;
if payload.version < MIN_NOTIFY_VERSION {
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", peer);
return;
}
let payload = Arc::new(payload);
debug!(target: MESSAGE_LOGGING_LOG_TARGET, "[SQUAD_NEW_BLOCK_TOPIC] New block from gossip: {peer:?} -> {payload:?}");

if payload.version < MIN_BLOCK_VERSION {
debug!(target: LOG_TARGET, squad = &self.config.squad; "Block {} ({}) has an outdated version, skipping", payload.height, &payload.hash.to_hex());
return;
// If we don't have this peer, try do peer exchange
if self.network_peer_store.exists(&peer) {
self.initiate_direct_peer_exchange(peer).await;
}
info!(target: LOG_TARGET, squad = &self.config.squad; "🆕 New block from broadcast: {:?}", &payload.hash.to_hex());
let share_chain = match payload.original_block.header.pow.pow_algo {

// Don't add unless we've already synced.

info!(target: LOG_TARGET, squad = &self.config.squad; "🆕 New block from broadcast: {:?}", &payload.new_blocks.iter().map(|b| b.0.to_string()).join(","));
let algo = payload.algo();
let share_chain = match algo {
PowAlgorithm::RandomX => self.share_chain_random_x.clone(),
PowAlgorithm::Sha3x => self.share_chain_sha3x.clone(),
};

if let Ok(Some(missing_parents)) =
Service::<S>::try_add_propagated_block(&share_chain, payload.clone()).await
{
self.sync_share_chain(payload.original_block.header.pow.pow_algo, peer, missing_parents)
.await;
if share_chain.tip_height().await.unwrap_or_default() == 0 {
warn!(target: LOG_TARGET, squad = &self.config.squad; "Share chain tip height is None, skipping block until we have synced");
return;
}
let mut missing_blocks = vec![];
for block in &payload.new_blocks {
if share_chain.has_block(block.0, &block.1).await {
continue;
}
missing_blocks.push(block.clone());
}
self.sync_share_chain(algo, peer, missing_blocks).await;
},
Err(error) => {
// TODO: elevate to error
debug!(target: LOG_TARGET, squad = &self.config.squad; "Can't deserialize broadcast block payload: {:?}", error);
self.network_peer_store.move_to_grey_list(
peer,
format!("Node sent a block that could not be deserialized: {:?}", error),
);
},
}
},
Expand All @@ -675,26 +691,7 @@ where S: ShareChain

match add_status {
AddPeerStatus::NewPeer => {
if let Ok(_my_info) = self
.create_peer_info(self.swarm.external_addresses().cloned().collect())
.await
.inspect_err(|error| {
error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to create peer info: {error:?}");
})
{
// let local_peer_id = self.swarm.local_peer_id().clone();
// TODO: Should we send them our details? The problem is that if we send too many of these, libp2p
// starts dropping requests with "libp2p_relay::priv_client::handler Dropping in-flight connect
// request because we are at capacity"

// self.swarm
// .behaviour_mut()
// .direct_peer_exchange
// .send_request(&peer, DirectPeerInfoRequest {
// info: my_info,
// peer_id: local_peer_id.to_base58(),
// });
}
// self.initiate_direct_peer_exchange(peer).await;
},
AddPeerStatus::Existing => {},
AddPeerStatus::Greylisted => {
Expand Down Expand Up @@ -737,6 +734,29 @@ where S: ShareChain
ControlFlow::Continue(())
}

async fn initiate_direct_peer_exchange(&mut self, peer: PeerId) {
if let Ok(my_info) = self
.create_peer_info(self.swarm.external_addresses().cloned().collect())
.await
.inspect_err(|error| {
error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to create peer info: {error:?}");
})
{
let local_peer_id = self.swarm.local_peer_id().clone();
// TODO: Should we send them our details? The problem is that if we send too many of these, libp2p
// starts dropping requests with "libp2p_relay::priv_client::handler Dropping in-flight connect
// request because we are at capacity"

self.swarm
.behaviour_mut()
.direct_peer_exchange
.send_request(&peer, DirectPeerInfoRequest {
info: my_info,
peer_id: local_peer_id.to_base58(),
});
}
}

async fn try_add_propagated_block(
share_chain: &Arc<S>,
block: Arc<P2Block>,
Expand Down Expand Up @@ -1285,9 +1305,9 @@ where S: ShareChain
}
},

block = self.client_broadcast_block_rx.recv() => {
blocks = self.client_broadcast_block_rx.recv() => {
dbg!("client broadcast");
self.broadcast_block(block).await;
self.broadcast_block(blocks).await;
},
event = self.swarm.select_next_some() => {
self.handle_event(event).await;
Expand Down Expand Up @@ -1350,7 +1370,6 @@ where S: ShareChain
PowAlgorithm::RandomX => self.share_chain_random_x.tip_height().await.unwrap_or_default(),
PowAlgorithm::Sha3x => self.share_chain_sha3x.tip_height().await.unwrap_or_default(),
};
dbg!(self.swarm.local_peer_id());

// info!(target: LOG_TARGET, squad = &self.config.squad; "Best peers to sync: {best_peers:?}");

Expand All @@ -1362,6 +1381,8 @@ where S: ShareChain
};
if their_height > 0 {
let mut blocks_to_request = vec![];
dbg!(our_tip);
dbg!(their_height);
for i in (our_tip..=their_height).rev().take(self.config.max_blocks_to_request) {
blocks_to_request.push((i, FixedHash::zero()));
}
Expand Down
6 changes: 6 additions & 0 deletions src/server/p2p/peer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ impl PeerStore {
}
}

pub fn exists(&self, peer_id: &PeerId) -> bool {
self.whitelist_peers.contains_key(&peer_id.to_base58()) ||
self.greylist_peers.contains_key(&peer_id.to_base58()) ||
self.blacklist_peers.contains(&peer_id.to_base58())
}

pub fn whitelist_peers(&self) -> &HashMap<String, PeerStoreRecord> {
&self.whitelist_peers
}
Expand Down
8 changes: 8 additions & 0 deletions src/sharechain/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,14 @@ impl ShareChain for InMemoryShareChain {
}
Ok(res)
}

async fn has_block(&self, height: u64, hash: &FixedHash) -> bool {
let chain_read_lock = self.p2_chain.read().await;
if let Some(level) = chain_read_lock.level_at_height(height) {
return level.blocks.contains_key(&hash);
}
false
}
}

#[cfg(test)]
Expand Down
2 changes: 2 additions & 0 deletions src/sharechain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,6 @@ pub(crate) trait ShareChain: Send + Sync + 'static {
async fn get_target_difficulty(&self, height: u64) -> Difficulty;

async fn all_blocks(&self) -> Result<Vec<Arc<P2Block>>, Error>;

async fn has_block(&self, height: u64, hash: &FixedHash) -> bool;
}

0 comments on commit a365f2a

Please sign in to comment.