Skip to content

Commit

Permalink
Merge branch 'development' of github.com:tari-project/sha-p2pool into…
Browse files Browse the repository at this point in the history
… development
  • Loading branch information
stringhandler committed Nov 14, 2024
2 parents ef9192c + c1b9083 commit d7c2196
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 14 deletions.
6 changes: 6 additions & 0 deletions src/server/http/stats/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ pub(crate) async fn handle_get_stats(State(state): State<AppState>) -> Result<Js
let timer = std::time::Instant::now();
info!(target: LOG_TARGET, "handle_get_stats");

let last_gossip_message = state.stats_client.get_last_gossip_message().await.map_err(|error| {
error!(target: LOG_TARGET, "Failed to get last gossip message: {error:?}");
StatusCode::INTERNAL_SERVER_ERROR
})?;

let (rx_stats, sha3x_stats) = get_chain_stats(state.clone()).await?;
// let peer_count = state.peer_store.peer_count().await;
let peer_count = 0;
Expand Down Expand Up @@ -236,6 +241,7 @@ pub(crate) async fn handle_get_stats(State(state): State<AppState>) -> Result<Js
connected_since,
randomx_stats: rx_stats,
sha3x_stats,
last_gossip_message,
};
if timer.elapsed() > MAX_ACCEPTABLE_HTTP_TIMEOUT {
error!(target: LOG_TARGET, "handle_get_stats took too long: {}ms", timer.elapsed().as_millis());
Expand Down
1 change: 1 addition & 0 deletions src/server/http/stats/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub struct Stats {
pub connected_since: Option<EpochTime>,
pub randomx_stats: GetStatsResponse,
pub sha3x_stats: GetStatsResponse,
pub last_gossip_message: EpochTime,
}

#[derive(Serialize, Deserialize, Clone)]
Expand Down
32 changes: 30 additions & 2 deletions src/server/http/stats_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub(crate) struct StatsCollector {
pending_outgoing: u32,
established_incoming: u32,
established_outgoing: u32,
last_gossip_message: EpochTime,
}

impl StatsCollector {
Expand Down Expand Up @@ -67,6 +68,7 @@ impl StatsCollector {
pending_outgoing: 0,
established_incoming: 0,
established_outgoing: 0,
last_gossip_message: EpochTime::now(),
}
}

Expand Down Expand Up @@ -148,6 +150,9 @@ impl StatsCollector {
self.established_incoming = established_incoming;
self.established_outgoing = established_outgoing;
},
StatData::GossipsubMessageReceived { timestamp } => {
self.last_gossip_message = timestamp;
},
}
}

Expand All @@ -164,7 +169,7 @@ impl StatsCollector {
let formatter = Formatter::new();

info!(target: LOG_TARGET,
"========= Uptime: {}. V{} Chains: Rx {}..{}, Sha3 {}..{}. Difficulty (Target/Network): Rx: {}/{} Sha3x: {}/{} Miner(A/R): {}/{}. Pool(A/R) {}/{}. Peers(a/g/b) {}/{}/{} libp2p (i/o) {}/{}==== ",
"========= Uptime: {}. V{} Chains: Rx {}..{}, Sha3 {}..{}. Difficulty (Target/Network): Rx: {}/{} Sha3x: {}/{} Miner(A/R): {}/{}. Pool(A/R) {}/{}. Peers(a/g/b) {}/{}/{} libp2p (i/o) {}/{} Last gossip: {}==== ",
humantime::format_duration(Duration::from_secs(
EpochTime::now().as_u64().checked_sub(
self.first_stat_received.unwrap_or(EpochTime::now()).as_u64())
Expand All @@ -186,7 +191,10 @@ impl StatsCollector {
self.total_grey_list,
self.total_black_list,
self.established_incoming,
self.established_outgoing
self.established_outgoing,
humantime::format_duration(Duration::from_secs(
EpochTime::now().as_u64().checked_sub(self.last_gossip_message.as_u64()).unwrap_or_default())),

);
},
res = self.request_rx.recv() => {
Expand All @@ -212,6 +220,9 @@ impl StatsCollector {
},
}
},
Some(StatsRequest::GetLastGossipMessage(tx)) => {
let _ = tx.send(self.last_gossip_message).inspect_err(|e| error!(target: LOG_TARGET, "Error sending last gossip message: {:?}", e));
},
None => {
break;
}
Expand Down Expand Up @@ -245,6 +256,7 @@ impl StatsCollector {

pub(crate) enum StatsRequest {
GetStats(PowAlgorithm, tokio::sync::oneshot::Sender<GetStatsResponse>),
GetLastGossipMessage(tokio::sync::oneshot::Sender<EpochTime>),
}

#[derive(Serialize, Clone, Debug)]
Expand Down Expand Up @@ -309,6 +321,9 @@ pub(crate) enum StatData {
established_outgoing: u32,
timestamp: EpochTime,
},
GossipsubMessageReceived {
timestamp: EpochTime,
},
}

impl StatData {
Expand All @@ -323,6 +338,7 @@ impl StatData {
StatData::TargetDifficultyChanged { timestamp, .. } => *timestamp,
StatData::NetworkDifficultyChanged { timestamp, .. } => *timestamp,
StatData::LibP2PStats { timestamp, .. } => *timestamp,
StatData::GossipsubMessageReceived { timestamp } => *timestamp,
}
}
}
Expand All @@ -338,6 +354,12 @@ impl StatsClient {
self.request_tx.send(StatsRequest::GetStats(pow_algo, tx)).await?;
Ok(rx.await?)
}

pub async fn get_last_gossip_message(&self) -> Result<EpochTime, anyhow::Error> {
let (tx, rx) = oneshot::channel();
self.request_tx.send(StatsRequest::GetLastGossipMessage(tx)).await?;
Ok(rx.await?)
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -455,4 +477,10 @@ impl StatsBroadcastClient {
timestamp: EpochTime::now(),
})
}

pub fn send_gossipsub_message_received(&self) -> Result<(), anyhow::Error> {
self.broadcast(StatData::GossipsubMessageReceived {
timestamp: EpochTime::now(),
})
}
}
31 changes: 19 additions & 12 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use libp2p::{
swarm::{
behaviour::toggle::Toggle,
dial_opts::{DialOpts, PeerCondition},
ExternalAddrExpired,
NetworkBehaviour,
SwarmEvent,
},
Expand Down Expand Up @@ -528,6 +529,7 @@ where S: ShareChain
#[allow(clippy::too_many_lines)]
async fn handle_new_gossipsub_message(&mut self, message: Message) -> Result<MessageAcceptance, Error> {
debug!(target: MESSAGE_LOGGING_LOG_TARGET, "New gossipsub message: {message:?}");
let _ = self.stats_broadcast_client.send_gossipsub_message_received();
let source_peer = message.source;
if let Some(source_peer) = source_peer {
let topic = message.topic.to_string();
Expand Down Expand Up @@ -616,16 +618,20 @@ where S: ShareChain

self.network_peer_store
.add_last_new_tip_notify(&source_peer, payload.clone());
let _ = self.swarm
.behaviour_mut()
.peer_sync
.add_want_peers(vec![source_peer.clone()])
.await.inspect_err(|error| {
info!(target: LOG_TARGET, squad = &self.config.squad; "Failed to add want peers: {error:?}");
});
// let _ = self.swarm
// .behaviour_mut()
// .peer_sync
// .add_want_peers(vec![source_peer.clone()])
// .await.inspect_err(|error| {
// info!(target: LOG_TARGET, squad = &self.config.squad; "Failed to add want peers:
// {error:?}"); });
// If we don't have this peer, try do peer exchange
if !self.network_peer_store.exists(message_peer) {
self.initiate_direct_peer_exchange(message_peer).await;
// if !self.network_peer_store.exists(message_peer) {
// self.initiate_direct_peer_exchange(message_peer).await;
// }

if self.config.is_seed_peer {
return Ok(MessageAcceptance::Accept);
}

// verify payload
Expand Down Expand Up @@ -1082,8 +1088,9 @@ where S: ShareChain
SwarmEvent::ListenerError { listener_id, error } => {
error!(target: LOG_TARGET, squad = &self.config.squad; "Listener error: {listener_id:?} -> {error:?}");
},
SwarmEvent::ExternalAddrExpired { address, .. } => {
SwarmEvent::ExternalAddrExpired { address } => {
warn!(target: LOG_TARGET, squad = &self.config.squad; "External address has expired: {address:?}. TODO: Do we need to create a new one?");
self.attempt_relay_reservation().await;
},
SwarmEvent::Behaviour(event) => match event {
ServerNetworkBehaviourEvent::Mdns(mdns_event) => match mdns_event {
Expand Down Expand Up @@ -1878,7 +1885,7 @@ where S: ShareChain
},
_ = sync_interval.tick() => {
let timer = Instant::now();
if self.config.sync_job_enabled {
if !self.config.is_seed_peer && self.config.sync_job_enabled {
self.try_sync_from_best_peer().await;
}
if timer.elapsed() > MAX_ACCEPTABLE_NETWORK_EVENT_TIMEOUT {
Expand Down Expand Up @@ -1916,7 +1923,7 @@ where S: ShareChain
_ = connection_stats_publish.tick() => {
let timer = Instant::now();
let connection_info = self.get_libp2p_connection_info();
self.stats_broadcast_client.send_libp2p_stats(
let _ = self.stats_broadcast_client.send_libp2p_stats(
connection_info.network_info.connection_counters.pending_incoming,
connection_info.network_info.connection_counters.pending_outgoing,
connection_info.network_info.connection_counters.established_incoming,
Expand Down

0 comments on commit d7c2196

Please sign in to comment.