Skip to content

Commit

Permalink
Added debug info
Browse files Browse the repository at this point in the history
  • Loading branch information
povi committed Apr 25, 2024
1 parent 83dec58 commit 0c7e58b
Show file tree
Hide file tree
Showing 11 changed files with 540 additions and 140 deletions.
2 changes: 1 addition & 1 deletion binary_utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub fn initialize_logger(
.filter_module("storage", LevelFilter::Info)
.filter_module("validator", LevelFilter::Info)
.filter_module("validator_key_cache", LevelFilter::Info)
.filter_module("web3", LevelFilter::Info)
.filter_module("web3", LevelFilter::Debug)
.filter_module(module_path!(), LevelFilter::Info)
.filter_module(module_path, LevelFilter::Info)
.format(|formatter, record| {
Expand Down
2 changes: 1 addition & 1 deletion builder_api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl Api {

let builder_bid = response.json::<SignedBuilderBid<P>>().await?;

debug!("get_execution_payload_header response: {builder_bid:?}");
info!("get_execution_payload_header response: {builder_bid:?}");

validate_phase(chain_config.phase_at_slot::<P>(slot), builder_bid.phase())?;

Expand Down
21 changes: 21 additions & 0 deletions fork_choice_control/src/mutator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ where
}

fn handle_tick(&mut self, wait_group: &W, tick: Tick) -> Result<()> {
info!("handle_tick: start");
if tick.epoch::<P>() > self.store.current_epoch() {
let checkpoint = self.store.unrealized_justified_checkpoint();

Expand All @@ -328,6 +329,8 @@ where
}
}

info!("handle_tick: after new epoch check");

// Query the execution engine for the current status of the head
// if it is still optimistic 1 second before the next interval.
if tick.is_end_of_interval() {
Expand All @@ -351,29 +354,43 @@ where
});
}

info!("handle_tick: will notify new payload");

self.execution_engine.notify_new_payload(
head.block_root,
execution_payload,
params,
None,
)?;

info!("handle_tick: will notified new payload");
}
}
}

info!("handle_tick: after end of interval check");

let Some(changes) = self.store_mut().apply_tick(tick)? else {
return Ok(());
};

info!("handle_tick: applied tick to store");

if changes.is_finalized_checkpoint_updated() {
self.archive_finalized(wait_group)?;
self.prune_delayed_until_payload();
}

info!("handle_tick: archived finalized and pruned stuff");

self.update_store_snapshot();

info!("handle_tick: updated store snapshot");

ValidatorMessage::Tick(wait_group.clone(), tick).send(&self.validator_tx);

info!("handle_tick: sent tick to validator");

if changes.is_slot_updated() {
let slot = tick.slot;

Expand All @@ -387,6 +404,8 @@ where
SubnetMessage::Slot(wait_group.clone(), slot).send(&self.subnet_tx);

self.track_collection_metrics();

info!("handle_tick: recorded and printed debug info");
}

if changes.is_finalized_checkpoint_updated() {
Expand Down Expand Up @@ -420,6 +439,8 @@ where
}
}

info!("handle_tick: finish");

Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion http_api_utils/src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use mime::{APPLICATION_JSON, TEXT_EVENT_STREAM};
use crate::{error::Error, misc::Direction};

// Don't log states when `Feature::LogHttpBodies` is enabled.
const ENDPOINTS_WITH_IGNORED_BODIES: &[&str] = &["/eth/v2/debug/beacon/states/", "/metrics"];
const ENDPOINTS_WITH_IGNORED_BODIES: &[&str] = &["/eth/v2/debug/beacon/states/", "/metrics", "/eth/v3/validator/blocks"];

async fn buffer_and_log<B>(direction: Direction, uri: &Uri, body: B) -> Result<Bytes, Error>
where
Expand Down
4 changes: 2 additions & 2 deletions p2p/src/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct SubnetPeerDiscovery {
// so it doesn't make sense to discover peers while unsubscribed.
// This is why `Unsubscribe` does not have a `discover_peers` field.
// `Subscribe` and `DiscoverPeers` are only separate to make log messages more precise.
#[derive(Serialize)]
#[derive(Debug, Serialize)]
pub enum SyncCommitteeSubnetAction {
/// Subscribe and discover peers.
Subscribe,
Expand All @@ -52,7 +52,7 @@ pub struct BeaconCommitteeSubscription {
pub is_aggregator: bool,
}

#[derive(PartialEq, Eq, Debug, Deserialize, Serialize)]
#[derive(Clone, PartialEq, Eq, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct SyncCommitteeSubscription {
#[serde(with = "serde_utils::string_or_native")]
Expand Down
59 changes: 45 additions & 14 deletions p2p/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use core::{cmp::Ordering, convert::Infallible as Never, fmt::Display, time::Duration};
use core::{
cell::RefCell, cmp::Ordering, convert::Infallible as Never, fmt::Display, time::Duration,
};
use std::{
collections::{BTreeMap, HashMap, HashSet},
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
sync::Arc,
time::Instant,
};
Expand Down Expand Up @@ -30,7 +32,8 @@ use futures::{
stream::StreamExt as _,
};
use helper_functions::misc;
use log::{debug, error, log, warn, Level};
use itertools::Itertools as _;
use log::{debug, error, info, log, warn, Level};
use operation_pools::{BlsToExecutionChangePool, Origin, PoolToP2pMessage, SyncCommitteeAggPool};
use prometheus_client::registry::Registry;
use prometheus_metrics::Metrics;
Expand Down Expand Up @@ -120,6 +123,7 @@ pub struct Network<P: Preset> {
target_peers: usize,
#[allow(dead_code)]
port_mappings: Option<PortMappings>,
gr_subscribed: RefCell<BTreeSet<SubnetId>>,
}

impl<P: Preset> Network<P> {
Expand Down Expand Up @@ -195,6 +199,7 @@ impl<P: Preset> Network<P> {
shutdown_rx,
target_peers: network_config.target_peers,
port_mappings,
gr_subscribed: RefCell::new(BTreeSet::new()),
};

Ok(network)
Expand Down Expand Up @@ -695,6 +700,8 @@ impl<P: Preset> Network<P> {
}

fn update_attestation_subnets(&self, subnet_actions: AttestationSubnetActions) {
info!("update_attestation_subnets (actions: {subnet_actions:?}");

let chain_config = self.controller.chain_config();
let current_slot = self.controller.slot();

Expand Down Expand Up @@ -736,7 +743,7 @@ impl<P: Preset> Network<P> {

if subscribe {
self.log(
Level::Debug,
Level::Info,
format_args!("subscribing to attestation subnet (subnet_id: {subnet_id})"),
);

Expand All @@ -751,7 +758,7 @@ impl<P: Preset> Network<P> {
}
} else {
self.log(
Level::Debug,
Level::Info,
format_args!("unsubscribing from attestation subnet {subnet_id}"),
);

Expand All @@ -766,12 +773,12 @@ impl<P: Preset> Network<P> {

if add_to_enr {
self.log(
Level::Debug,
Level::Info,
format_args!("adding attestation subnet to ENR (subnet_id: {subnet_id})"),
);
} else {
self.log(
Level::Debug,
Level::Info,
format_args!("removing attestation subnet from ENR (subnet_id: {subnet_id})"),
);
}
Expand All @@ -785,6 +792,8 @@ impl<P: Preset> Network<P> {
&self,
actions: BTreeMap<SubnetId, SyncCommitteeSubnetAction>,
) {
info!("update_sync_committee_subnets (actions: {actions:?}");

let subnet_discoveries = actions
.iter()
.filter(|(_, action)| {
Expand All @@ -797,7 +806,11 @@ impl<P: Preset> Network<P> {
subnet: Subnet::SyncCommittee(*subnet_id),
min_ttl: None,
})
.collect();
.collect_vec();

for discovery in &subnet_discoveries {
info!("GR SUBNETS: discover peers in subnet {discovery:?}");
}

ServiceInboundMessage::DiscoverSubnetPeers(subnet_discoveries)
.send(&self.network_to_service_tx);
Expand All @@ -814,9 +827,12 @@ impl<P: Preset> Network<P> {

// TODO(Grandine Team): Does it make sense to use the Phase 0 digest here?
if let Some(topic) = self.subnet_gossip_topic(subnet) {
info!("GR SUBNETS: subscribe to subnet {subnet_id:?}");
self.gr_subscribed.borrow_mut().insert(subnet_id);
ServiceInboundMessage::Subscribe(topic).send(&self.network_to_service_tx);
}

info!("GR SUBNETS: add {subnet:?} to ENR");
ServiceInboundMessage::UpdateEnrSubnet(subnet, true)
.send(&self.network_to_service_tx);
}
Expand All @@ -834,14 +850,22 @@ impl<P: Preset> Network<P> {

// TODO(Grandine Team): Does it make sense to use the Phase 0 digest here?
if let Some(topic) = self.subnet_gossip_topic(subnet) {
info!("GR SUBNETS: unsubscribe from subnet {subnet_id:?}");
self.gr_subscribed.borrow_mut().remove(&subnet_id);
ServiceInboundMessage::Unsubscribe(topic).send(&self.network_to_service_tx);
}

info!("GR SUBNETS: remove {subnet:?} from ENR");
ServiceInboundMessage::UpdateEnrSubnet(subnet, false)
.send(&self.network_to_service_tx);
}
}
}

info!(
"SUBNETS STATS ALL: GR {:?}",
self.gr_subscribed.borrow().iter().copied().collect_vec(),
);
}

fn handle_network_event(&mut self, network_event: NetworkEvent<RequestId, P>) {
Expand Down Expand Up @@ -1532,9 +1556,9 @@ impl<P: Preset> Network<P> {
}

self.log(
Level::Debug,
Level::Info,
format_args!(
"received proposer slashing as gossip: {proposer_slashing:?} from {source}"
"SLASHING received proposer slashing as gossip: {proposer_slashing:?} from {source}"
),
);

Expand All @@ -1549,9 +1573,9 @@ impl<P: Preset> Network<P> {
}

self.log(
Level::Debug,
Level::Info,
format_args!(
"received attester slashing as gossip: {attester_slashing:?} from {source}"
"SLASHING received attester slashing as gossip: {attester_slashing:?} from {source}"
),
);

Expand Down Expand Up @@ -1903,9 +1927,16 @@ impl<P: Preset> Network<P> {
fn subnet_gossip_topic(&self, subnet: Subnet) -> Option<GossipTopic> {
let current_phase = self.fork_context.current_fork();

self.fork_context
let res = self
.fork_context
.to_context_bytes(current_phase)
.map(|digest| GossipTopic::new(subnet.into(), GossipEncoding::default(), digest))
.map(|digest| GossipTopic::new(subnet.into(), GossipEncoding::default(), digest));

if res.is_none() {
warn!("SUBNETS subnet topic not found! (subnet: {subnet:?})");
}

res
}

fn prune_received_blob_sidecars(&mut self, epoch: Epoch) {
Expand Down
Loading

0 comments on commit 0c7e58b

Please sign in to comment.