From c1b908300083cd64bce31e6f84688cbb19211c1a Mon Sep 17 00:00:00 2001 From: stringhandler Date: Thu, 14 Nov 2024 13:55:02 +0200 Subject: [PATCH] feat: add last gossip message received to stats (#157) --- src/server/http/stats/handlers.rs | 6 ++++++ src/server/http/stats/models.rs | 1 + src/server/http/stats_collector.rs | 32 ++++++++++++++++++++++++++++-- src/server/p2p/network.rs | 31 ++++++++++++++++++----------- 4 files changed, 56 insertions(+), 14 deletions(-) diff --git a/src/server/http/stats/handlers.rs b/src/server/http/stats/handlers.rs index 22f8a7b0..1bce372f 100644 --- a/src/server/http/stats/handlers.rs +++ b/src/server/http/stats/handlers.rs @@ -197,6 +197,11 @@ pub(crate) async fn handle_get_stats(State(state): State) -> Result) -> Result MAX_ACCEPTABLE_HTTP_TIMEOUT { error!(target: LOG_TARGET, "handle_get_stats took too long: {}ms", timer.elapsed().as_millis()); diff --git a/src/server/http/stats/models.rs b/src/server/http/stats/models.rs index 6e3374a1..8a3b168a 100644 --- a/src/server/http/stats/models.rs +++ b/src/server/http/stats/models.rs @@ -92,6 +92,7 @@ pub struct Stats { pub connected_since: Option, pub randomx_stats: GetStatsResponse, pub sha3x_stats: GetStatsResponse, + pub last_gossip_message: EpochTime, } #[derive(Serialize, Deserialize, Clone)] diff --git a/src/server/http/stats_collector.rs b/src/server/http/stats_collector.rs index 0b3ae908..21247107 100644 --- a/src/server/http/stats_collector.rs +++ b/src/server/http/stats_collector.rs @@ -37,6 +37,7 @@ pub(crate) struct StatsCollector { pending_outgoing: u32, established_incoming: u32, established_outgoing: u32, + last_gossip_message: EpochTime, } impl StatsCollector { @@ -67,6 +68,7 @@ impl StatsCollector { pending_outgoing: 0, established_incoming: 0, established_outgoing: 0, + last_gossip_message: EpochTime::now(), } } @@ -148,6 +150,9 @@ impl StatsCollector { self.established_incoming = established_incoming; self.established_outgoing = established_outgoing; }, + StatData::GossipsubMessageReceived { timestamp } => { + self.last_gossip_message = timestamp; + }, } } @@ -164,7 +169,7 @@ impl StatsCollector { let formatter = Formatter::new(); info!(target: LOG_TARGET, - "========= Uptime: {}. V{} Chains: Rx {}..{}, Sha3 {}..{}. Difficulty (Target/Network): Rx: {}/{} Sha3x: {}/{} Miner(A/R): {}/{}. Pool(A/R) {}/{}. Peers(a/g/b) {}/{}/{} libp2p (i/o) {}/{}==== ", + "========= Uptime: {}. V{} Chains: Rx {}..{}, Sha3 {}..{}. Difficulty (Target/Network): Rx: {}/{} Sha3x: {}/{} Miner(A/R): {}/{}. Pool(A/R) {}/{}. Peers(a/g/b) {}/{}/{} libp2p (i/o) {}/{} Last gossip: {}==== ", humantime::format_duration(Duration::from_secs( EpochTime::now().as_u64().checked_sub( self.first_stat_received.unwrap_or(EpochTime::now()).as_u64()) @@ -186,7 +191,10 @@ impl StatsCollector { self.total_grey_list, self.total_black_list, self.established_incoming, - self.established_outgoing + self.established_outgoing, + humantime::format_duration(Duration::from_secs( + EpochTime::now().as_u64().checked_sub(self.last_gossip_message.as_u64()).unwrap_or_default())), + ); }, res = self.request_rx.recv() => { @@ -212,6 +220,9 @@ impl StatsCollector { }, } }, + Some(StatsRequest::GetLastGossipMessage(tx)) => { + let _ = tx.send(self.last_gossip_message).inspect_err(|e| error!(target: LOG_TARGET, "Error sending last gossip message: {:?}", e)); + }, None => { break; } @@ -245,6 +256,7 @@ impl StatsCollector { pub(crate) enum StatsRequest { GetStats(PowAlgorithm, tokio::sync::oneshot::Sender), + GetLastGossipMessage(tokio::sync::oneshot::Sender), } #[derive(Serialize, Clone, Debug)] @@ -309,6 +321,9 @@ pub(crate) enum StatData { established_outgoing: u32, timestamp: EpochTime, }, + GossipsubMessageReceived { + timestamp: EpochTime, + }, } impl StatData { @@ -323,6 +338,7 @@ impl StatData { StatData::TargetDifficultyChanged { timestamp, .. } => *timestamp, StatData::NetworkDifficultyChanged { timestamp, .. } => *timestamp, StatData::LibP2PStats { timestamp, .. } => *timestamp, + StatData::GossipsubMessageReceived { timestamp } => *timestamp, } } } @@ -338,6 +354,12 @@ impl StatsClient { self.request_tx.send(StatsRequest::GetStats(pow_algo, tx)).await?; Ok(rx.await?) } + + pub async fn get_last_gossip_message(&self) -> Result { + let (tx, rx) = oneshot::channel(); + self.request_tx.send(StatsRequest::GetLastGossipMessage(tx)).await?; + Ok(rx.await?) + } } #[derive(Debug, Clone)] @@ -455,4 +477,10 @@ impl StatsBroadcastClient { timestamp: EpochTime::now(), }) } + + pub fn send_gossipsub_message_received(&self) -> Result<(), anyhow::Error> { + self.broadcast(StatData::GossipsubMessageReceived { + timestamp: EpochTime::now(), + }) + } } diff --git a/src/server/p2p/network.rs b/src/server/p2p/network.rs index 86a2199e..f29b5185 100644 --- a/src/server/p2p/network.rs +++ b/src/server/p2p/network.rs @@ -41,6 +41,7 @@ use libp2p::{ swarm::{ behaviour::toggle::Toggle, dial_opts::{DialOpts, PeerCondition}, + ExternalAddrExpired, NetworkBehaviour, SwarmEvent, }, @@ -528,6 +529,7 @@ where S: ShareChain #[allow(clippy::too_many_lines)] async fn handle_new_gossipsub_message(&mut self, message: Message) -> Result { debug!(target: MESSAGE_LOGGING_LOG_TARGET, "New gossipsub message: {message:?}"); + let _ = self.stats_broadcast_client.send_gossipsub_message_received(); let source_peer = message.source; if let Some(source_peer) = source_peer { let topic = message.topic.to_string(); @@ -616,16 +618,20 @@ where S: ShareChain self.network_peer_store .add_last_new_tip_notify(&source_peer, payload.clone()); - let _ = self.swarm - .behaviour_mut() - .peer_sync - .add_want_peers(vec![source_peer.clone()]) - .await.inspect_err(|error| { - info!(target: LOG_TARGET, squad = &self.config.squad; "Failed to add want peers: {error:?}"); - }); + // let _ = self.swarm + // .behaviour_mut() + // .peer_sync + // .add_want_peers(vec![source_peer.clone()]) + // .await.inspect_err(|error| { + // info!(target: LOG_TARGET, squad = &self.config.squad; "Failed to add want peers: + // {error:?}"); }); // If we don't have this peer, try do peer exchange - if !self.network_peer_store.exists(message_peer) { - self.initiate_direct_peer_exchange(message_peer).await; + // if !self.network_peer_store.exists(message_peer) { + // self.initiate_direct_peer_exchange(message_peer).await; + // } + + if self.config.is_seed_peer { + return Ok(MessageAcceptance::Accept); } // verify payload @@ -1082,8 +1088,9 @@ where S: ShareChain SwarmEvent::ListenerError { listener_id, error } => { error!(target: LOG_TARGET, squad = &self.config.squad; "Listener error: {listener_id:?} -> {error:?}"); }, - SwarmEvent::ExternalAddrExpired { address, .. } => { + SwarmEvent::ExternalAddrExpired { address } => { warn!(target: LOG_TARGET, squad = &self.config.squad; "External address has expired: {address:?}. TODO: Do we need to create a new one?"); + self.attempt_relay_reservation().await; }, SwarmEvent::Behaviour(event) => match event { ServerNetworkBehaviourEvent::Mdns(mdns_event) => match mdns_event { @@ -1878,7 +1885,7 @@ where S: ShareChain }, _ = sync_interval.tick() => { let timer = Instant::now(); - if self.config.sync_job_enabled { + if !self.config.is_seed_peer && self.config.sync_job_enabled { self.try_sync_from_best_peer().await; } if timer.elapsed() > MAX_ACCEPTABLE_NETWORK_EVENT_TIMEOUT { @@ -1916,7 +1923,7 @@ where S: ShareChain _ = connection_stats_publish.tick() => { let timer = Instant::now(); let connection_info = self.get_libp2p_connection_info(); - self.stats_broadcast_client.send_libp2p_stats( + let _ = self.stats_broadcast_client.send_libp2p_stats( connection_info.network_info.connection_counters.pending_incoming, connection_info.network_info.connection_counters.pending_outgoing, connection_info.network_info.connection_counters.established_incoming,