Skip to content

Commit

Permalink
feat: only miner broadcasts new tip (#162)
Browse files Browse the repository at this point in the history
* Now only the miner will broadcast new tip notify
* Aggressively try to connect to 16 peers
* Temporarily change to unbounded channel
  • Loading branch information
stringhandler authored Nov 16, 2024
1 parent 8c17d70 commit b2d4712
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 64 deletions.
135 changes: 72 additions & 63 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use std::{
collections::HashMap,
fmt::Display,
fmt::{format, Display},
fs,
hash::Hash,
io::Write,
Expand All @@ -16,40 +16,33 @@ use std::{
};

use anyhow::{anyhow, Error};
use blake2::Blake2b;
use convert_case::{Case, Casing};
use digest::{consts::U32, generic_array::GenericArray, Digest};
use hickory_resolver::{
config::{ResolverConfig, ResolverOpts},
TokioAsyncResolver,
};
use itertools::Itertools;
use libp2p::{
autonat::{self, NatStatus, OutboundProbeEvent},
connection_limits::{self, ConnectionLimits},
connection_limits::{self},
dcutr,
futures::StreamExt,
gossipsub::{self, IdentTopic, Message, MessageAcceptance, MessageId, PublishError},
gossipsub::{self, IdentTopic, Message, MessageAcceptance, PublishError},
identify::{self, Info},
identity::Keypair,
mdns::{self, tokio::Tokio},
multiaddr::Protocol,
noise,
ping,
relay,
request_response::{self, cbor, OutboundFailure, OutboundRequestId, ResponseChannel},
swarm::{
behaviour::toggle::Toggle,
dial_opts::{DialOpts, PeerCondition},
ExternalAddrExpired,
NetworkBehaviour,
SwarmEvent,
},
tcp,
yamux,
Multiaddr,
PeerId,
StreamProtocol,
Swarm,
};
use log::{
Expand All @@ -67,8 +60,6 @@ use tari_core::proof_of_work::{AccumulatedDifficulty, PowAlgorithm};
use tari_shutdown::ShutdownSignal;
use tari_utilities::{epoch_time::EpochTime, hex::Hex};
use tokio::{
fs::File,
io::{self, AsyncReadExt, AsyncWriteExt},
select,
sync::{
broadcast::{self, error::RecvError},
Expand Down Expand Up @@ -98,7 +89,7 @@ use crate::{
p2p::{
client::ServiceClient,
messages::{self, PeerInfo, ShareChainSyncRequest, ShareChainSyncResponse},
peer_store::{AddPeerStatus, PeerStore},
peer_store::{self, AddPeerStatus, PeerStore},
relay_store::RelayStore,
},
PROTOCOL_VERSION,
Expand Down Expand Up @@ -204,8 +195,8 @@ impl Default for Config {
relay_server_disabled: false,
squad: Squad::from("default".to_string()),
user_agent: "tari-p2pool".to_string(),
grey_list_clear_interval: Duration::from_secs(60 * 5),
black_list_clear_interval: Duration::from_secs(60 * 15),
grey_list_clear_interval: Duration::from_secs(60 * 15),
black_list_clear_interval: Duration::from_secs(60 * 60),
sync_interval: Duration::from_secs(10),
is_seed_peer: false,
debug_print_chain: false,
Expand Down Expand Up @@ -345,8 +336,8 @@ where S: ShareChain
// TODO: consider mpsc channels instead of broadcast to not miss any message (might drop)
client_broadcast_block_tx: broadcast::Sender<NotifyNewTipBlock>,
client_broadcast_block_rx: broadcast::Receiver<NotifyNewTipBlock>,
inner_request_tx: mpsc::Sender<InnerRequest>,
inner_request_rx: mpsc::Receiver<InnerRequest>,
inner_request_tx: mpsc::UnboundedSender<InnerRequest>,
inner_request_rx: mpsc::UnboundedReceiver<InnerRequest>,

relay_store: Arc<RwLock<RelayStore>>,
are_we_synced_with_p2pool: Arc<AtomicBool>,
Expand Down Expand Up @@ -378,7 +369,8 @@ where S: ShareChain
// let (_share_chain_sync_tx, _share_chain_sync_rx) = broadcast::channel::<LocalShareChainSyncRequest>(1000);
// let (snooze_block_tx, snooze_block_rx) = mpsc::channel::<(usize, P2Block)>(1000);
let (query_tx, query_rx) = mpsc::channel(100);
let (inner_request_tx, inner_request_rx) = mpsc::channel(100);
// This should not be unbounded but we need to find out what is using up all the permits
let (inner_request_tx, inner_request_rx) = mpsc::unbounded_channel();

Ok(Self {
swarm,
Expand Down Expand Up @@ -687,8 +679,8 @@ where S: ShareChain
};
let _ = self
.inner_request_tx
.send(InnerRequest::PerformCatchUpSync(perform_catch_up_sync))
.await;
.send(InnerRequest::PerformCatchUpSync(perform_catch_up_sync));
// .await;
return Ok(MessageAcceptance::Ignore);
}

Expand All @@ -706,23 +698,18 @@ where S: ShareChain
missing_parents: missing_blocks,
is_from_new_block_notify: true,
};
let _ = self
.inner_request_tx
.send(InnerRequest::SyncChainResponse(sync_share_chain))
.await;
let _ = self.inner_request_tx;
}

return Ok(MessageAcceptance::Accept);
},
Err(error) => {
// TODO: elevate to error
debug!(target: LOG_TARGET, squad = &self.config.squad; "Can't deserialize broadcast block payload: {:?}", error);
self.network_peer_store
.move_to_grey_list(
source_peer,
format!("Node sent a block that could not be deserialized: {:?}", error),
)
.await;
self.network_peer_store.move_to_grey_list(
source_peer,
format!("Node sent a block that could not be deserialized: {:?}", error),
);
return Ok(MessageAcceptance::Reject);
},
}
Expand Down Expand Up @@ -928,11 +915,7 @@ where S: ShareChain
return;
},
};
if tx
.send(InnerRequest::SyncChainRequest((channel, response)))
.await
.is_err()
{
if tx.send(InnerRequest::SyncChainRequest((channel, response))).is_err() {
error!(target: LOG_TARGET, squad; "Failed to send block sync response");
}
});
Expand Down Expand Up @@ -976,7 +959,8 @@ where S: ShareChain
return;
};
let total_pow = share_chain.get_total_chain_pow().await;
let _ = new_tip_notify.send(NotifyNewTipBlock::new(local_peer_id, algo, new_blocks, total_pow));
// let _ = new_tip_notify.send(NotifyNewTipBlock::new(local_peer_id, algo, new_blocks,
// total_pow));
}
},
Err(error) => match error {
Expand All @@ -988,17 +972,15 @@ where S: ShareChain
is_from_new_block_notify: false,
};

let _ = tx.send(InnerRequest::SyncChainResponse(sync_share_chain)).await;
let _ = tx.send(InnerRequest::SyncChainResponse(sync_share_chain));
return;
},
_ => {
error!(target: LOG_TARGET, squad; "Failed to add synced blocks to share chain: {error:?}");
let _ = tx
.send(InnerRequest::GreyListPeer(
peer,
format!("Block failed validation: {error}"),
))
.await;
let _ = tx.send(InnerRequest::GreyListPeer(
peer,
format!("Block failed validation: {error}"),
));
},
},
};
Expand Down Expand Up @@ -1110,6 +1092,13 @@ where S: ShareChain
warn!(target: LOG_TARGET, squad = &self.config.squad; "External address has expired: {address:?}. TODO: Do we need to create a new one?");
self.attempt_relay_reservation().await;
},
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
if let Some(peer_id) = peer_id {
warn!(target: LOG_TARGET, squad = &self.config.squad; "Outgoing connection error: {peer_id:?} -> {error:?}");
self.network_peer_store
.move_to_grey_list(peer_id, format!("Outgoing connection error: {error}"));
}
},
SwarmEvent::Behaviour(event) => match event {
ServerNetworkBehaviourEvent::Mdns(mdns_event) => match mdns_event {
mdns::Event::Discovered(peers) => {
Expand Down Expand Up @@ -1227,8 +1216,7 @@ where S: ShareChain
}
if should_grey_list {
self.network_peer_store
.move_to_grey_list(peer, format!("Error during share chain sync:{}", error))
.await;
.move_to_grey_list(peer, format!("Error during share chain sync:{}", error));
}

// Remove peer from peer store to try to sync from another peer,
Expand Down Expand Up @@ -1331,8 +1319,7 @@ where S: ShareChain
}
if should_grey_list {
self.network_peer_store
.move_to_grey_list(peer, format!("Error during catch up sync:{}", error))
.await;
.move_to_grey_list(peer, format!("Error during catch up sync:{}", error));
}
// Remove peer from peer store to try to sync from another peer,
// if the peer goes online/accessible again, the peer store will have it again.
Expand Down Expand Up @@ -1419,9 +1406,7 @@ where S: ShareChain
tip: our_tip,
achieved_pow: our_achieved_pow,
};
let _ = tx
.send(InnerRequest::CatchUpSyncRequest((channel, catch_up_sync)))
.await;
let _ = tx.send(InnerRequest::CatchUpSyncRequest((channel, catch_up_sync)));
});
}

Expand Down Expand Up @@ -1468,17 +1453,17 @@ where S: ShareChain
info!(target: SYNC_REQUEST_LOG_TARGET, "[{:?}] num catchups: {:?}", algo, num_catchups);
if must_continue_sync && num_catchups.map(|n| n < MAX_CATCH_UP_ATTEMPTS).unwrap_or(true) {
info!(target: SYNC_REQUEST_LOG_TARGET, "Continue catch up for {}", peer);
let _ = tx.send(InnerRequest::AddCatchUpAttempt(peer)).await;
let _ = tx.send(InnerRequest::AddCatchUpAttempt(peer));
let perform_catch_up_sync = PerformCatchUpSync {
algo,
peer,
last_block_from_them,
their_height,
};
let _ = tx.send(InnerRequest::PerformCatchUpSync(perform_catch_up_sync)).await;
let _ = tx.send(InnerRequest::PerformCatchUpSync(perform_catch_up_sync));
} else {
info!(target: SYNC_REQUEST_LOG_TARGET, "Catch up sync completed for chain {} from {} after {} catchups", algo, peer, num_catchups.map(|s| s.to_string()).unwrap_or_else(|| "None".to_string()));
let _ = tx.send(InnerRequest::ResetCatchUpAttempts(peer)).await;
let _ = tx.send(InnerRequest::ResetCatchUpAttempts(peer));
}
},
Err(error) => match error {
Expand All @@ -1491,17 +1476,15 @@ where S: ShareChain
missing_parents,
is_from_new_block_notify: false,
};
let _ = tx.send(InnerRequest::SyncChainResponse(sync_share_chain)).await;
let _ = tx.send(InnerRequest::SyncChainResponse(sync_share_chain));
return;
},
_ => {
error!(target: SYNC_REQUEST_LOG_TARGET, squad; "Failed to add Catchup synced blocks to share chain: {error:?}");
let _ = tx
.send(InnerRequest::GreyListPeer(
peer,
format!("Block failed validation: {error}"),
))
.await;
let _ = tx.send(InnerRequest::GreyListPeer(
peer,
format!("Block failed validation: {error}"),
));
},
},
};
Expand Down Expand Up @@ -1849,7 +1832,7 @@ where S: ShareChain
self.sync_share_chain(sync_chain).await;
},
InnerRequest::GreyListPeer(peer, reason) => {
self.network_peer_store.move_to_grey_list(peer, reason).await;
self.network_peer_store.move_to_grey_list(peer, reason);
},
InnerRequest::CatchUpSyncRequest((channel, sync)) => {
let CatchUpSync {
Expand Down Expand Up @@ -1908,6 +1891,9 @@ where S: ShareChain
let mut connection_stats_publish = tokio::time::interval(Duration::from_secs(10));
connection_stats_publish.set_missed_tick_behavior(MissedTickBehavior::Skip);

let mut seek_connections_interval = tokio::time::interval(Duration::from_secs(10));
seek_connections_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

let mut debug_chain_graph = if self.config.debug_print_chain {
tokio::time::interval(Duration::from_secs(60))
} else {
Expand Down Expand Up @@ -1944,6 +1930,30 @@ where S: ShareChain
}

},
_ = seek_connections_interval.tick() => {
let timer = Instant::now();
if !self.config.is_seed_peer {
if self.swarm.connected_peers().count() > 15 {
continue;
}
let mut num_dialed = 0;
for (peer, record) in self.network_peer_store.whitelist_peers().iter() {
// dbg!("Connecting");
// dbg!("Dialing peer: {:?} on {:?}", record.peer_id, record.peer_info.public_addresses());
// let peer_id = peers.0;
if !self.swarm.is_connected(&record.peer_id) && self.network_peer_store.is_seed_peer(&record.peer_id) {
let _ = self.swarm.dial(record.peer_id.clone());
num_dialed += 1;
if num_dialed > 32 {
break;
}
}
}
}
if timer.elapsed() > MAX_ACCEPTABLE_NETWORK_EVENT_TIMEOUT {
warn!(target: LOG_TARGET, "Seeking connections took too long: {:?}", timer.elapsed());
}
},
inner_req = self.inner_request_rx.recv() => {
let timer = Instant::now();
match inner_req {
Expand Down Expand Up @@ -2130,8 +2140,7 @@ where S: ShareChain

let _ = self
.inner_request_tx
.send(InnerRequest::PerformCatchUpSync(perform_catch_up_sync))
.await;
.send(InnerRequest::PerformCatchUpSync(perform_catch_up_sync));
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/p2p/peer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ impl PeerStore {
self.whitelist_peers.len() as u64
}

pub async fn move_to_grey_list(&mut self, peer_id: PeerId, reason: String) {
pub fn move_to_grey_list(&mut self, peer_id: PeerId, reason: String) {
if self.whitelist_peers.contains_key(&peer_id.to_base58()) {
let record = self.whitelist_peers.remove(&peer_id.to_base58());
if let Some(mut record) = record {
Expand Down
1 change: 1 addition & 0 deletions src/server/p2p/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ pub(crate) async fn new_swarm(config: &config::Config) -> Result<Swarm<ServerNet
};
if let Some(max) = config.max_relay_circuits {
relay_config.max_circuits = max;
relay_config.max_reservations = max;
}
if let Some(max) = config.max_relay_circuits_per_peer {
relay_config.max_circuits_per_peer = max;
Expand Down

0 comments on commit b2d4712

Please sign in to comment.