Skip to content

Commit

Permalink
Improvements to sync committee aggregation pool
Browse files Browse the repository at this point in the history
- Fix issue with 'on_slot' not being called for non-built-in validator
  flow (e.g. Vouch).

- Add Slot to Pool.aggregator_contributions key.

- Add 'Collection lengths' chart to pools dashboard.

- Add 'module_name' field to Metrics.collection_lengths metrics to allow easier
  filtering in Grafana charts.

- Add PoolManager service to handle incoming messages from fork_choice.

- Ignore known sync committee contribution subsets.
  • Loading branch information
Tumas committed Jun 11, 2024
1 parent bec4f88 commit 8c7fe09
Show file tree
Hide file tree
Showing 23 changed files with 667 additions and 317 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions eth1/src/eth1_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ impl Eth1Chain {

pub fn track_collection_metrics(&self, metrics: &Arc<Metrics>) {
metrics.set_collection_length(
module_path!(),
&tynm::type_name::<Self>(),
"unfinalized_blocks",
self.unfinalized_blocks
Expand Down
6 changes: 4 additions & 2 deletions fork_choice_control/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ use types::{

use crate::{
messages::{
ApiMessage, AttestationVerifierMessage, MutatorMessage, P2pMessage, SubnetMessage,
SyncMessage, ValidatorMessage,
ApiMessage, AttestationVerifierMessage, MutatorMessage, P2pMessage, PoolMessage,
SubnetMessage, SyncMessage, ValidatorMessage,
},
misc::{VerifyAggregateAndProofResult, VerifyAttestationResult},
mutator::Mutator,
Expand Down Expand Up @@ -98,6 +98,7 @@ where
api_tx: impl UnboundedSink<ApiMessage<P>>,
attestation_verifier_tx: A, // impl UnboundedSink<AttestationVerifierMessage<P, W>>,
p2p_tx: impl UnboundedSink<P2pMessage<P>>,
pool_tx: impl UnboundedSink<PoolMessage>,
subnet_tx: impl UnboundedSink<SubnetMessage<W>>,
sync_tx: impl UnboundedSink<SyncMessage<P>>,
validator_tx: impl UnboundedSink<ValidatorMessage<P, W>>,
Expand Down Expand Up @@ -136,6 +137,7 @@ where
api_tx,
attestation_verifier_tx.clone(),
p2p_tx,
pool_tx,
subnet_tx,
sync_tx,
validator_tx,
Expand Down
2 changes: 1 addition & 1 deletion fork_choice_control/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub use crate::{
controller::Controller,
messages::{
ApiMessage, AttestationVerifierMessage, BlockEvent, ChainReorgEvent,
FinalizedCheckpointEvent, HeadEvent, P2pMessage, SubnetMessage, SyncMessage,
FinalizedCheckpointEvent, HeadEvent, P2pMessage, PoolMessage, SubnetMessage, SyncMessage,
ValidatorMessage,
},
misc::{MutatorRejectionReason, VerifyAggregateAndProofResult, VerifyAttestationResult},
Expand Down
13 changes: 13 additions & 0 deletions fork_choice_control/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,19 @@ impl<P: Preset> P2pMessage<P> {
}
}

pub enum PoolMessage {
Slot(Slot),
Tick(Tick),
}

impl PoolMessage {
pub(crate) fn send(self, tx: &impl UnboundedSink<Self>) {
if tx.unbounded_send(self).is_err() {
debug!("send to operation pools failed because the receiver was dropped");
}
}
}

pub enum ValidatorMessage<P: Preset, W> {
Tick(W, Tick),
FinalizedEth1Data(DepositIndex),
Expand Down
37 changes: 31 additions & 6 deletions fork_choice_control/src/mutator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ use types::{

use crate::{
messages::{
AttestationVerifierMessage, MutatorMessage, P2pMessage, SubnetMessage, SyncMessage,
ValidatorMessage,
AttestationVerifierMessage, MutatorMessage, P2pMessage, PoolMessage, SubnetMessage,
SyncMessage, ValidatorMessage,
},
misc::{
Delayed, MutatorRejectionReason, PendingAggregateAndProof, PendingAttestation,
Expand All @@ -78,7 +78,7 @@ use crate::{
};

#[allow(clippy::struct_field_names)]
pub struct Mutator<P: Preset, E, W, AS, TS, PS, NS, SS, VS> {
pub struct Mutator<P: Preset, E, W, AS, TS, PS, LS, NS, SS, VS> {
store: Arc<Store<P>>,
store_snapshot: Arc<ArcSwap<Store<P>>>,
state_cache: Arc<StateCache<P, W>>,
Expand Down Expand Up @@ -114,19 +114,21 @@ pub struct Mutator<P: Preset, E, W, AS, TS, PS, NS, SS, VS> {
api_tx: AS,
attestation_verifier_tx: TS,
p2p_tx: PS,
pool_tx: LS,
subnet_tx: NS,
sync_tx: SS,
validator_tx: VS,
}

impl<P, E, W, AS, TS, PS, NS, SS, VS> Mutator<P, E, W, AS, TS, PS, NS, SS, VS>
impl<P, E, W, AS, TS, PS, LS, NS, SS, VS> Mutator<P, E, W, AS, TS, PS, LS, NS, SS, VS>
where
P: Preset,
E: ExecutionEngine<P> + Clone + Send + Sync + 'static,
W: Wait,
AS: UnboundedSink<ApiMessage<P>>,
TS: UnboundedSink<AttestationVerifierMessage<P, W>>,
PS: UnboundedSink<P2pMessage<P>>,
LS: UnboundedSink<PoolMessage>,
NS: UnboundedSink<SubnetMessage<W>>,
SS: UnboundedSink<SyncMessage<P>>,
VS: UnboundedSink<ValidatorMessage<P, W>>,
Expand All @@ -144,6 +146,7 @@ where
api_tx: AS,
attestation_verifier_tx: TS,
p2p_tx: PS,
pool_tx: LS,
subnet_tx: NS,
sync_tx: SS,
validator_tx: VS,
Expand All @@ -166,6 +169,7 @@ where
api_tx,
attestation_verifier_tx,
p2p_tx,
pool_tx,
subnet_tx,
sync_tx,
validator_tx,
Expand Down Expand Up @@ -381,6 +385,7 @@ where
self.update_store_snapshot();

ValidatorMessage::Tick(wait_group.clone(), tick).send(&self.validator_tx);
PoolMessage::Tick(tick).send(&self.pool_tx);

if changes.is_slot_updated() {
let slot = tick.slot;
Expand All @@ -391,6 +396,7 @@ where
self.retry_delayed(delayed, wait_group);
}

PoolMessage::Slot(slot).send(&self.pool_tx);
P2pMessage::Slot(slot).send(&self.p2p_tx);
SubnetMessage::Slot(wait_group.clone(), slot).send(&self.subnet_tx);

Expand Down Expand Up @@ -2277,12 +2283,14 @@ where
let (high_priority_tasks, low_priority_tasks) = self.thread_pool.task_counts();

metrics.set_collection_length(
module_path!(),
&type_name,
"delayed_until_block",
self.delayed_until_block.len(),
);

metrics.set_collection_length(
module_path!(),
&type_name,
"delayed_until_block_blocks",
self.delayed_until_block
Expand All @@ -2292,6 +2300,7 @@ where
);

metrics.set_collection_length(
module_path!(),
&type_name,
"delayed_until_block_attestations",
self.delayed_until_block
Expand All @@ -2301,6 +2310,7 @@ where
);

metrics.set_collection_length(
module_path!(),
&type_name,
"delayed_until_block_aggregates",
self.delayed_until_block
Expand All @@ -2310,12 +2320,14 @@ where
);

metrics.set_collection_length(
module_path!(),
&type_name,
"delayed_until_slot",
self.delayed_until_slot.len(),
);

metrics.set_collection_length(
module_path!(),
&type_name,
"delayed_until_slot_blocks",
self.delayed_until_slot
Expand All @@ -2325,6 +2337,7 @@ where
);

metrics.set_collection_length(
module_path!(),
&type_name,
"delayed_until_slot_attestations",
self.delayed_until_slot
Expand All @@ -2334,6 +2347,7 @@ where
);

metrics.set_collection_length(
module_path!(),
&type_name,
"delayed_until_slot_aggregates",
self.delayed_until_slot
Expand All @@ -2342,8 +2356,19 @@ where
.sum(),
);

metrics.set_collection_length(&type_name, "high_priority_tasks", high_priority_tasks);
metrics.set_collection_length(&type_name, "low_priority_tasks", low_priority_tasks);
metrics.set_collection_length(
module_path!(),
&type_name,
"high_priority_tasks",
high_priority_tasks,
);

metrics.set_collection_length(
module_path!(),
&type_name,
"low_priority_tasks",
low_priority_tasks,
);

self.store.track_collection_metrics(metrics);
}
Expand Down
1 change: 1 addition & 0 deletions fork_choice_control/src/specialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ where
futures::sink::drain(),
futures::sink::drain(),
futures::sink::drain(),
futures::sink::drain(),
storage,
core::iter::empty(),
)
Expand Down
37 changes: 33 additions & 4 deletions fork_choice_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3005,11 +3005,29 @@ impl<P: Preset> Store<P> {
pub fn track_collection_metrics(&self, metrics: &Arc<Metrics>) {
let type_name = tynm::type_name::<Self>();

metrics.set_collection_length(&type_name, "blob_store", self.blob_cache.size());
metrics.set_collection_length(&type_name, "finalized", self.finalized().len());
metrics.set_collection_length(&type_name, "unfinalized", self.unfinalized().len());
metrics.set_collection_length(
module_path!(),
&type_name,
"blob_store",
self.blob_cache.size(),
);

metrics.set_collection_length(
module_path!(),
&type_name,
"finalized",
self.finalized().len(),
);

metrics.set_collection_length(
module_path!(),
&type_name,
"unfinalized",
self.unfinalized().len(),
);

metrics.set_collection_length(
module_path!(),
&type_name,
"unfinalized_segment_blocks",
self.unfinalized
Expand All @@ -3019,38 +3037,49 @@ impl<P: Preset> Store<P> {
);

metrics.set_collection_length(
module_path!(),
&type_name,
"finalized_indices",
self.finalized_indices.len(),
);

metrics.set_collection_length(
module_path!(),
&type_name,
"unfinalized_locations",
self.unfinalized_locations.len(),
);

metrics.set_collection_length(
"fork_choice_store",
&type_name,
"justified_active_balances",
self.justified_active_balances.len(),
);

metrics.set_collection_length(&type_name, "latest_messages", self.latest_messages.len());
metrics.set_collection_length(
"fork_choice_store",
&type_name,
"latest_messages",
self.latest_messages.len(),
);

metrics.set_collection_length(
module_path!(),
&type_name,
"checkpoint_states",
self.checkpoint_states.len(),
);

metrics.set_collection_length(
module_path!(),
&type_name,
"current_slot_attestations",
self.current_slot_attestations.len(),
);

metrics.set_collection_length(
module_path!(),
&type_name,
"preprocessed_states",
self.preprocessed_states.len(),
Expand Down
2 changes: 2 additions & 0 deletions http_api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl<P: Preset> Context<P> {
let (fc_to_attestation_verifier_tx, fc_to_attestation_verifier_rx) =
futures::channel::mpsc::unbounded();
let (fc_to_p2p_tx, fc_to_p2p_rx) = futures::channel::mpsc::unbounded();
let (fc_to_pool_tx, _) = futures::channel::mpsc::unbounded();
let (fc_to_subnet_tx, fc_to_subnet_rx) = futures::channel::mpsc::unbounded();
let (fc_to_sync_tx, fc_to_sync_rx) = futures::channel::mpsc::unbounded();
let (fc_to_validator_tx, fc_to_validator_rx) = futures::channel::mpsc::unbounded();
Expand Down Expand Up @@ -183,6 +184,7 @@ impl<P: Preset> Context<P> {
fc_to_api_tx,
fc_to_attestation_verifier_tx,
fc_to_p2p_tx,
fc_to_pool_tx,
fc_to_subnet_tx,
fc_to_sync_tx,
fc_to_validator_tx,
Expand Down
2 changes: 2 additions & 0 deletions liveness_tracker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,14 @@ impl<P: Preset, W: Wait> LivenessTracker<P, W> {
let type_name = tynm::type_name::<Self>();

metrics.set_collection_length(
module_path!(),
&type_name,
"live_validators_epochs",
self.live_validators.keys().len(),
);

metrics.set_collection_length(
module_path!(),
&type_name,
"live_validators_indexes",
self.live_validators.values().map(BitVec::len).sum(),
Expand Down
1 change: 1 addition & 0 deletions operation_pools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ tap = { workspace = true }
tokio = { workspace = true }
transition_functions = { workspace = true }
try_from_iterator = { workspace = true }
tynm = { workspace = true }
typenum = { workspace = true }
types = { workspace = true }

Expand Down
2 changes: 2 additions & 0 deletions operation_pools/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub use crate::{
bls_to_execution_change_pool::{
BlsToExecutionChangePool, Service as BlsToExecutionChangePoolService,
},
manager::Manager,
messages::{PoolToApiMessage, PoolToLivenessMessage, PoolToP2pMessage},
misc::{Origin, PoolAdditionOutcome, PoolRejectionReason},
sync_committee_agg_pool::Manager as SyncCommitteeAggPool,
Expand All @@ -20,6 +21,7 @@ mod attestation_agg_pool {
}

mod bls_to_execution_change_pool;
mod manager;
mod messages;
mod misc;

Expand Down
Loading

0 comments on commit 8c7fe09

Please sign in to comment.