Skip to content

Commit

Permalink
feat: display waiting for intial peer data (#6545)
Browse files Browse the repository at this point in the history
Description
---
Displays waiting for initial peer data on the status string

Motivation and Context
---
better info for the user:
![Screenshot 2024-09-09 at 16 59
47](https://github.com/user-attachments/assets/78dcb4c2-430c-42c3-9ac5-9e25465c5e94)
  • Loading branch information
SWvheerden authored Sep 10, 2024
1 parent 0a5d20b commit 39aed8e
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 25 deletions.
1 change: 1 addition & 0 deletions applications/minotari_app_grpc/proto/base_node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ message SyncProgressResponse {
uint64 local_height = 2;
SyncState state = 3;
string short_desc = 4;
uint64 initial_connected_peers = 5;
}

enum SyncState {
Expand Down
5 changes: 5 additions & 0 deletions applications/minotari_node/src/grpc/base_node_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2012,24 +2012,28 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
local_height: 0,
state: tari_rpc::SyncState::HeaderStarting.into(),
short_desc,
initial_connected_peers: 0,
},
StateInfo::HeaderSync(Some(info)) => tari_rpc::SyncProgressResponse {
tip_height: info.tip_height,
local_height: info.local_height,
state: tari_rpc::SyncState::Header.into(),
short_desc,
initial_connected_peers: 0,
},
StateInfo::Connecting(_) => tari_rpc::SyncProgressResponse {
tip_height: 0,
local_height: 0,
state: tari_rpc::SyncState::BlockStarting.into(),
short_desc,
initial_connected_peers: 0,
},
StateInfo::BlockSync(info) => tari_rpc::SyncProgressResponse {
tip_height: info.tip_height,
local_height: info.local_height,
state: tari_rpc::SyncState::Block.into(),
short_desc,
initial_connected_peers: 0,
},
_ => tari_rpc::SyncProgressResponse {
tip_height: 0,
Expand All @@ -2040,6 +2044,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
tari_rpc::SyncState::Startup.into()
},
short_desc,
initial_connected_peers: state.get_initial_connected_peers(),
},
};
Ok(Response::new(response))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ pub struct BaseNodeStateMachineConfig {
/// to always be behind the network
#[serde(with = "serializers::seconds")]
pub time_before_considered_lagging: Duration,
/// This is the amount of metadata events that a node will wait for before decide to start syncing for a peer,
/// choosing the best peer out of the list
pub initial_sync_peer_count: u64,
}

#[allow(clippy::derivable_impls)]
Expand All @@ -71,6 +74,7 @@ impl Default for BaseNodeStateMachineConfig {
blockchain_sync_config: Default::default(),
blocks_behind_before_considered_lagging: 1,
time_before_considered_lagging: Duration::from_secs(10),
initial_sync_peer_count: 5,
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,17 @@ impl StateInfo {
HorizonSync(info) => info.to_progress_string(),

BlockSync(info) => format!("Syncing blocks: {}", info.sync_progress_string_blocks()),
Listening(_) => "Listening".to_string(),
Listening(info) => {
if info.is_synced() {
"Listening".to_string()
} else {
format!(
"Waiting for peer data: {}/{}",
info.initial_delay_connected_count(),
info.initial_sync_peer_wait_count()
)
}
},
SyncFailed(details) => format!("Sync failed: {}", details),
}
}
Expand All @@ -230,6 +240,13 @@ impl StateInfo {
Listening(info) => info.is_synced(),
}
}

pub fn get_initial_connected_peers(&self) -> u64 {
match self {
StateInfo::Listening(info) => info.initial_delay_connected_count(),
_ => 0,
}
}
}

impl Display for StateInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ use crate::{
};

const LOG_TARGET: &str = "c::bn::state_machine_service::states::listening";
const INITIAL_SYNC_PEER_COUNT: usize = 5;

/// This struct contains the info of the peer, and is used to serialised and deserialised.
#[derive(Serialize, Deserialize)]
Expand All @@ -78,6 +77,8 @@ impl PeerMetadata {
/// This struct contains info that is use full for external viewing of state info
pub struct ListeningInfo {
synced: bool,
initial_delay_connected_count: u64,
initial_sync_peer_wait_count: u64,
}

impl Display for ListeningInfo {
Expand All @@ -88,13 +89,25 @@ impl Display for ListeningInfo {

impl ListeningInfo {
/// Creates a new ListeningInfo
pub const fn new(is_synced: bool) -> Self {
Self { synced: is_synced }
pub const fn new(is_synced: bool, initial_delay_connected_count: u64, initial_sync_peer_wait_count: u64) -> Self {
Self {
synced: is_synced,
initial_delay_connected_count,
initial_sync_peer_wait_count,
}
}

pub fn is_synced(&self) -> bool {
self.synced
}

pub fn initial_delay_connected_count(&self) -> u64 {
self.initial_delay_connected_count
}

pub fn initial_sync_peer_wait_count(&self) -> u64 {
self.initial_sync_peer_wait_count
}
}

/// This state listens for chain metadata events received from the liveness and chain metadata service. Based on the
Expand All @@ -103,6 +116,7 @@ impl ListeningInfo {
#[derive(Clone, Debug, PartialEq, Eq, Default)]
pub struct Listening {
is_synced: bool,
initial_delay_count: u64,
}

impl Listening {
Expand All @@ -116,7 +130,11 @@ impl Listening {
shared: &mut BaseNodeStateMachine<B>,
) -> StateEvent {
info!(target: LOG_TARGET, "Listening for chain metadata updates");
shared.set_state_info(StateInfo::Listening(ListeningInfo::new(self.is_synced)));
shared.set_state_info(StateInfo::Listening(ListeningInfo::new(
self.is_synced,
self.initial_delay_count,
shared.config.initial_sync_peer_count,
)));
let mut time_since_better_block = None;
let mut initial_sync_counter = 0;
let mut initial_sync_peer_list = Vec::new();
Expand All @@ -130,11 +148,25 @@ impl Listening {
debug!("NetworkSilence event received");
if !self.is_synced {
self.is_synced = true;
shared.set_state_info(StateInfo::Listening(ListeningInfo::new(true)));
self.initial_delay_count = 0;
shared.set_state_info(StateInfo::Listening(ListeningInfo::new(
true,
0,
shared.config.initial_sync_peer_count,
)));
debug!(target: LOG_TARGET, "Initial sync achieved");
}
},
Ok(ChainMetadataEvent::PeerChainMetadataReceived(peer_metadata)) => {
// if we are not yet synced, we wait for the initial delay of ping/pongs, so let's propagate the
// updated info
if !self.is_synced {
shared.set_state_info(StateInfo::Listening(ListeningInfo::new(
self.is_synced,
self.initial_delay_count,
shared.config.initial_sync_peer_count,
)));
}
// We already ban the peer based on some previous logic, but this message was already in the
// pipeline before the ban went into effect.
match shared.peer_manager.is_peer_banned(peer_metadata.node_id()).await {
Expand Down Expand Up @@ -224,7 +256,12 @@ impl Listening {

if !self.is_synced && sync_mode.is_up_to_date() {
self.is_synced = true;
shared.set_state_info(StateInfo::Listening(ListeningInfo::new(true)));
self.initial_delay_count = 0;
shared.set_state_info(StateInfo::Listening(ListeningInfo::new(
true,
0,
shared.config.initial_sync_peer_count,
)));
debug!(target: LOG_TARGET, "Initial sync achieved");
}

Expand All @@ -243,6 +280,7 @@ impl Listening {
} = sync_mode
{
initial_sync_counter += 1;
self.initial_delay_count = initial_sync_counter;
for peer in sync_peers {
let mut found = false;
// lets search the list list to ensure we only have unique peers in the list with the latest
Expand All @@ -263,7 +301,7 @@ impl Listening {
}
// We use a list here to ensure that we dont wait for even for INITIAL_SYNC_PEER_COUNT different
// peers
if initial_sync_counter >= INITIAL_SYNC_PEER_COUNT {
if initial_sync_counter >= shared.config.initial_sync_peer_count {
// lets return now that we have enough peers to chose from
return StateEvent::FallenBehind(SyncStatus::Lagging {
local,
Expand Down Expand Up @@ -293,14 +331,18 @@ impl Listening {

impl From<Waiting> for Listening {
fn from(_: Waiting) -> Self {
Self { is_synced: false }
Self {
is_synced: false,
initial_delay_count: 0,
}
}
}

impl From<HeaderSyncState> for Listening {
fn from(sync: HeaderSyncState) -> Self {
Self {
is_synced: sync.is_synced(),
initial_delay_count: 0,
}
}
}
Expand All @@ -309,6 +351,7 @@ impl From<BlockSync> for Listening {
fn from(sync: BlockSync) -> Self {
Self {
is_synced: sync.is_synced(),
initial_delay_count: 0,
}
}
}
Expand All @@ -317,6 +360,7 @@ impl From<DecideNextSync> for Listening {
fn from(sync: DecideNextSync) -> Self {
Self {
is_synced: sync.is_synced(),
initial_delay_count: 0,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/tests/tests/base_node_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async fn setup() -> (
.await;
base_node.mock_base_node_state_machine.publish_status(StatusInfo {
bootstrapped: true,
state_info: StateInfo::Listening(ListeningInfo::new(true)),
state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)),
randomx_vm_cnt: 0,
randomx_vm_flags: RandomXFlag::FLAG_DEFAULT,
});
Expand Down
8 changes: 4 additions & 4 deletions base_layer/core/tests/tests/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1067,19 +1067,19 @@ async fn receive_and_propagate_transaction() {

alice_node.mock_base_node_state_machine.publish_status(StatusInfo {
bootstrapped: true,
state_info: StateInfo::Listening(ListeningInfo::new(true)),
state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)),
randomx_vm_cnt: 0,
randomx_vm_flags: RandomXFlag::FLAG_DEFAULT,
});
bob_node.mock_base_node_state_machine.publish_status(StatusInfo {
bootstrapped: true,
state_info: StateInfo::Listening(ListeningInfo::new(true)),
state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)),
randomx_vm_cnt: 0,
randomx_vm_flags: RandomXFlag::FLAG_DEFAULT,
});
carol_node.mock_base_node_state_machine.publish_status(StatusInfo {
bootstrapped: true,
state_info: StateInfo::Listening(ListeningInfo::new(true)),
state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)),
randomx_vm_cnt: 0,
randomx_vm_flags: RandomXFlag::FLAG_DEFAULT,
});
Expand Down Expand Up @@ -1738,7 +1738,7 @@ async fn block_event_and_reorg_event_handling() {

alice.mock_base_node_state_machine.publish_status(StatusInfo {
bootstrapped: true,
state_info: StateInfo::Listening(ListeningInfo::new(true)),
state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)),
randomx_vm_cnt: 0,
randomx_vm_flags: RandomXFlag::FLAG_DEFAULT,
});
Expand Down
22 changes: 11 additions & 11 deletions base_layer/core/tests/tests/node_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,25 +139,25 @@ async fn propagate_and_forward_many_valid_blocks() {
wait_until_online(&[&alice_node, &bob_node, &carol_node, &dan_node]).await;
alice_node.mock_base_node_state_machine.publish_status(StatusInfo {
bootstrapped: true,
state_info: StateInfo::Listening(ListeningInfo::new(true)),
state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)),
randomx_vm_cnt: 0,
randomx_vm_flags: RandomXFlag::FLAG_DEFAULT,
});
bob_node.mock_base_node_state_machine.publish_status(StatusInfo {
bootstrapped: true,
state_info: StateInfo::Listening(ListeningInfo::new(true)),
state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)),
randomx_vm_cnt: 0,
randomx_vm_flags: RandomXFlag::FLAG_DEFAULT,
});
carol_node.mock_base_node_state_machine.publish_status(StatusInfo {
bootstrapped: true,
state_info: StateInfo::Listening(ListeningInfo::new(true)),
state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)),
randomx_vm_cnt: 0,
randomx_vm_flags: RandomXFlag::FLAG_DEFAULT,
});
dan_node.mock_base_node_state_machine.publish_status(StatusInfo {
bootstrapped: true,
state_info: StateInfo::Listening(ListeningInfo::new(true)),
state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)),
randomx_vm_cnt: 0,
randomx_vm_flags: RandomXFlag::FLAG_DEFAULT,
});
Expand Down Expand Up @@ -271,19 +271,19 @@ async fn propagate_and_forward_invalid_block_hash() {
wait_until_online(&[&alice_node, &bob_node, &carol_node]).await;
alice_node.mock_base_node_state_machine.publish_status(StatusInfo {
bootstrapped: true,
state_info: StateInfo::Listening(ListeningInfo::new(true)),
state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)),
randomx_vm_cnt: 0,
randomx_vm_flags: RandomXFlag::FLAG_DEFAULT,
});
bob_node.mock_base_node_state_machine.publish_status(StatusInfo {
bootstrapped: true,
state_info: StateInfo::Listening(ListeningInfo::new(true)),
state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)),
randomx_vm_cnt: 0,
randomx_vm_flags: RandomXFlag::FLAG_DEFAULT,
});
carol_node.mock_base_node_state_machine.publish_status(StatusInfo {
bootstrapped: true,
state_info: StateInfo::Listening(ListeningInfo::new(true)),
state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)),
randomx_vm_cnt: 0,
randomx_vm_flags: RandomXFlag::FLAG_DEFAULT,
});
Expand Down Expand Up @@ -427,25 +427,25 @@ async fn propagate_and_forward_invalid_block() {

alice_node.mock_base_node_state_machine.publish_status(StatusInfo {
bootstrapped: true,
state_info: StateInfo::Listening(ListeningInfo::new(true)),
state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)),
randomx_vm_cnt: 0,
randomx_vm_flags: RandomXFlag::FLAG_DEFAULT,
});
bob_node.mock_base_node_state_machine.publish_status(StatusInfo {
bootstrapped: true,
state_info: StateInfo::Listening(ListeningInfo::new(true)),
state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)),
randomx_vm_cnt: 0,
randomx_vm_flags: RandomXFlag::FLAG_DEFAULT,
});
carol_node.mock_base_node_state_machine.publish_status(StatusInfo {
bootstrapped: true,
state_info: StateInfo::Listening(ListeningInfo::new(true)),
state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)),
randomx_vm_cnt: 0,
randomx_vm_flags: RandomXFlag::FLAG_DEFAULT,
});
dan_node.mock_base_node_state_machine.publish_status(StatusInfo {
bootstrapped: true,
state_info: StateInfo::Listening(ListeningInfo::new(true)),
state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)),
randomx_vm_cnt: 0,
randomx_vm_flags: RandomXFlag::FLAG_DEFAULT,
});
Expand Down
2 changes: 2 additions & 0 deletions common/config/presets/c_base_node_c.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ blockchain_sync_config.rpc_deadline = 240
# intensive. Be careful of setting this higher than the block time, which would potentially cause it
# to always be behind the network (default = 10) (in seconds)
#time_before_considered_lagging = 10
#This is the amount of metadata events that a node will wait for before decide to start syncing for a peer, choosing the best peer out of the list
#initial_sync_peer_count = 5,

[base_node.p2p]
# The node's publicly-accessible hostname. This is the host name that is advertised on the network so that
Expand Down

0 comments on commit 39aed8e

Please sign in to comment.