From ae99738d11b5b5ded5a6231f1c9b97526a63fcba Mon Sep 17 00:00:00 2001 From: Alex Stokes Date: Wed, 25 Oct 2023 18:00:40 -0600 Subject: [PATCH] wip: implement relay for "trusted" builders --- example.config.toml | 5 + mev-relay-rs/src/relay.rs | 217 ++++++++++++++++++++++--------- mev-relay-rs/src/service.rs | 25 ++-- mev-rs/src/error.rs | 21 ++- mev-rs/src/proposer_scheduler.rs | 91 +++++++++---- mev-rs/src/validator_registry.rs | 148 ++++++++++++--------- 6 files changed, 346 insertions(+), 161 deletions(-) diff --git a/example.config.toml b/example.config.toml index 39a58b9f..279bda32 100644 --- a/example.config.toml +++ b/example.config.toml @@ -12,6 +12,11 @@ host = "0.0.0.0" port = 28545 beacon_node_url = "http://127.0.0.1:5052" secret_key = "0x24b6e79cbc6267c6e527b4bf7a71747d42a58b10279366cf0c7bb4e2aa455901" +accepted_builders = [ + "0xa4476fe970fdd7bd4050955fa1261f60905ff41165cdbdb77d235589d1a090c3e91ae926eba96db77516d5088734818c", + "0x97e7aa4df6b120f30c17fcca3771aa9a37d0a873d2fe74b40f30a6b30458785f895fb82e5be304bd5d687ae18d836d73", + "0x8d48be80acd4aac4123686a01515b36c579e5608ab2114d4d6a7f2af272bb933719cb3b87ac23adb2c3ccec0547557f0", +] [builder] # builder BLS secret key diff --git a/mev-relay-rs/src/relay.rs b/mev-relay-rs/src/relay.rs index 00ffad54..43d202c1 100644 --- a/mev-relay-rs/src/relay.rs +++ b/mev-relay-rs/src/relay.rs @@ -2,22 +2,27 @@ use async_trait::async_trait; use beacon_api_client::mainnet::Client; use ethereum_consensus::{ builder::ValidatorRegistration, - capella::mainnet as capella, clock::get_current_unix_time_in_nanos, crypto::SecretKey, - primitives::{BlsPublicKey, Root, Slot, U256}, + primitives::{BlsPublicKey, Epoch, Root, Slot, U256}, state_transition::Context, }; use mev_rs::{ signing::{compute_consensus_signing_root, sign_builder_message, verify_signature}, types::{ - BidRequest, BuilderBid, ExecutionPayload, ExecutionPayloadHeader, SignedBlindedBeaconBlock, - SignedBuilderBid, SignedValidatorRegistration, + BidRequest, BidTrace, BuilderBid, ExecutionPayload, ExecutionPayloadHeader, + ProposerSchedule, SignedBidSubmission, SignedBlindedBeaconBlock, SignedBuilderBid, + SignedValidatorRegistration, }, - BlindedBlockProvider, Error, ValidatorRegistry, + BlindedBlockProvider, BlindedBlockRelayer, Error, ProposerScheduler, ValidatorRegistry, }; use parking_lot::Mutex; -use std::{collections::HashMap, ops::Deref, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + ops::Deref, + sync::Arc, +}; +use tracing::error; // `PROPOSAL_TOLERANCE_DELAY` controls how aggresively the relay drops "old" execution payloads // once they have been fetched from builders -- currently in response to an incoming request from a @@ -26,6 +31,17 @@ use std::{collections::HashMap, ops::Deref, sync::Arc}; // TODO likely drop this feature... const PROPOSAL_TOLERANCE_DELAY: Slot = 1; +fn to_header(execution_payload: &mut ExecutionPayload) -> Result { + let header = match execution_payload { + ExecutionPayload::Bellatrix(payload) => { + ExecutionPayloadHeader::Bellatrix(payload.try_into()?) + } + ExecutionPayload::Capella(payload) => ExecutionPayloadHeader::Capella(payload.try_into()?), + ExecutionPayload::Deneb(payload) => ExecutionPayloadHeader::Deneb(payload.try_into()?), + }; + Ok(header) +} + fn validate_bid_request(_bid_request: &BidRequest) -> Result<(), Error> { // TODO validations @@ -72,7 +88,7 @@ fn validate_signed_block( let payload_header = body.execution_payload_header(); let block_hash = payload_header.block_hash(); if block_hash != local_block_hash { - return Err(Error::UnknownBlock) + return Err(Error::InvalidExecutionPayloadInBlock) } // OPTIONAL: @@ -104,13 +120,22 @@ pub struct Inner { public_key: BlsPublicKey, genesis_validators_root: Root, validator_registry: ValidatorRegistry, - context: Arc, + proposer_scheduler: ProposerScheduler, + builder_registry: HashSet, + context: Context, state: Mutex, } +#[derive(Debug)] +struct BidContext { + signed_builder_bid: SignedBuilderBid, + execution_payload: ExecutionPayload, + value: U256, +} + #[derive(Debug, Default)] struct State { - execution_payloads: HashMap, + bids: HashMap, } impl Relay { @@ -118,40 +143,88 @@ impl Relay { genesis_validators_root: Root, beacon_node: Client, secret_key: SecretKey, - context: Arc, + accepted_builders: Vec, + context: Context, ) -> Self { let public_key = secret_key.public_key(); - let validator_registry = ValidatorRegistry::new(beacon_node); + let slots_per_epoch = context.slots_per_epoch; + let validator_registry = ValidatorRegistry::new(beacon_node.clone(), slots_per_epoch); + let proposer_scheduler = ProposerScheduler::new(beacon_node, slots_per_epoch); let inner = Inner { secret_key, public_key, genesis_validators_root, validator_registry, + proposer_scheduler, + builder_registry: HashSet::from_iter(accepted_builders), context, state: Default::default(), }; Self(Arc::new(inner)) } - async fn load_full_validator_set(&self) { - if let Err(err) = self.validator_registry.load().await { - tracing::error!("could not load validator set from provided beacon node; please check config and restart: {err}"); + pub async fn on_epoch(&self, epoch: Epoch) { + if let Err(err) = self.validator_registry.on_epoch(epoch).await { + error!(%err, "could not load validator set from provided beacon node"); + } + if let Err(err) = self.proposer_scheduler.on_epoch(epoch, &self.validator_registry).await { + error!(%err, "could not load proposer duties"); + } + } + + pub async fn on_slot(&self, slot: Slot) { + let mut state = self.state.lock(); + state.bids.retain(|bid_request, _| bid_request.slot + PROPOSAL_TOLERANCE_DELAY >= slot); + } + + fn validate_allowed_builder(&self, builder_public_key: &BlsPublicKey) -> Result<(), Error> { + if self.builder_registry.contains(builder_public_key) { + Ok(()) + } else { + Err(Error::BuilderNotRegistered(builder_public_key.clone())) } } - pub async fn initialize(&self) { - self.load_full_validator_set().await; + fn validate_bid_request(&self, bid_request: &BidRequest) -> Result<(), Error> { + validate_bid_request(bid_request) } - pub async fn on_slot(&self, slot: Slot, next_epoch: bool) { - if next_epoch { - // TODO grab validators more efficiently - self.load_full_validator_set().await; + fn validate_builder_submission( + &self, + _bid_trace: &BidTrace, + _execution_payload: &ExecutionPayload, + ) -> Result<(), Error> { + // TODO: + // verify payload matches proposer prefs (and proposer is registered) + // validate_execution_payload(execution_payload, value, preferences) + // verify bid trace block hash matches execution_payload block hash + Ok(()) + } + + fn insert_bid_if_greater( + &self, + bid_request: BidRequest, + mut execution_payload: ExecutionPayload, + value: U256, + ) -> Result<(), Error> { + { + let state = self.state.lock(); + if let Some(bid) = state.bids.get(&bid_request) { + if bid.value > value { + return Ok(()) + } + } } + let header = to_header(&mut execution_payload)?; + let mut bid = + BuilderBid { header, value: value.clone(), public_key: self.public_key.clone() }; + let signature = sign_builder_message(&mut bid, &self.secret_key, &self.context)?; + let signed_builder_bid = SignedBuilderBid { message: bid, signature }; + + let bid_context = BidContext { signed_builder_bid, execution_payload, value }; let mut state = self.state.lock(); - state - .execution_payloads - .retain(|bid_request, _| bid_request.slot + PROPOSAL_TOLERANCE_DELAY >= slot); + state.bids.insert(bid_request, bid_context); + Ok(()) } } @@ -162,41 +235,20 @@ impl BlindedBlockProvider for Relay { registrations: &mut [SignedValidatorRegistration], ) -> Result<(), Error> { let current_time = get_current_unix_time_in_nanos().try_into().expect("fits in type"); - self.validator_registry.validate_registrations( - registrations, - current_time, - &self.context, - )?; - Ok(()) + self.validator_registry + .process_registrations(registrations, current_time, &self.context) + .map_err(Error::RegistrationErrors) } async fn fetch_best_bid(&self, bid_request: &BidRequest) -> Result { - validate_bid_request(bid_request)?; - - let public_key = &bid_request.public_key; - let preferences = self - .validator_registry - .get_preferences(public_key) - .ok_or_else(|| Error::MissingPreferences(public_key.clone()))?; - - let value = U256::default(); - let header = { - let mut payload = ExecutionPayload::Capella(Default::default()); - let mut state = self.state.lock(); - - validate_execution_payload(&payload, &value, &preferences)?; - - let inner = payload.capella_mut().unwrap(); - let inner_header = capella::ExecutionPayloadHeader::try_from(inner)?; - let header = ExecutionPayloadHeader::Capella(inner_header); - - state.execution_payloads.insert(bid_request.clone(), payload); - header - }; - - let mut bid = BuilderBid { header, value, public_key: self.public_key.clone() }; - let signature = sign_builder_message(&mut bid, &self.secret_key, &self.context)?; - Ok(SignedBuilderBid { message: bid, signature }) + self.validate_bid_request(bid_request)?; + + let state = self.state.lock(); + let bid_context = state + .bids + .get(bid_request) + .ok_or_else(|| Error::NoBidPrepared(Box::new(bid_request.clone())))?; + Ok(bid_context.signed_builder_bid.clone()) } async fn open_bid( @@ -209,15 +261,21 @@ impl BlindedBlockProvider for Relay { let payload_header = body.execution_payload_header(); let parent_hash = payload_header.parent_hash().clone(); let proposer_index = block.proposer_index(); - let public_key = - self.validator_registry.get_public_key(proposer_index).map_err(Error::from)?; + let public_key = self + .validator_registry + .get_public_key(proposer_index) + .ok_or(Error::ValidatorIndexNotRegistered(proposer_index))?; let bid_request = BidRequest { slot, parent_hash, public_key }; - let payload = { - let mut state = self.state.lock(); - state.execution_payloads.remove(&bid_request).ok_or(Error::UnknownBid)? - }; + self.validate_bid_request(&bid_request)?; + + let mut state = self.state.lock(); + let bid_context = state + .bids + .remove(&bid_request) + .ok_or_else(|| Error::MissingBid(bid_request.clone()))?; + let payload = bid_context.execution_payload; validate_signed_block( signed_block, &bid_request.public_key, @@ -226,6 +284,43 @@ impl BlindedBlockProvider for Relay { &self.context, )?; + // TODO: any other validations required here? + Ok(payload) } } + +#[async_trait] +impl BlindedBlockRelayer for Relay { + async fn get_proposal_schedule(&self) -> Result, Error> { + self.proposer_scheduler.get_proposal_schedule().map_err(Into::into) + } + + async fn submit_bid(&self, signed_submission: &mut SignedBidSubmission) -> Result<(), Error> { + let (bid_request, value) = { + let bid_trace = &signed_submission.message; + let builder_public_key = &bid_trace.builder_public_key; + self.validate_allowed_builder(builder_public_key)?; + + let bid_request = BidRequest { + slot: bid_trace.slot, + parent_hash: bid_trace.parent_hash.clone(), + public_key: bid_trace.proposer_public_key.clone(), + }; + self.validate_bid_request(&bid_request)?; + + self.validate_builder_submission(bid_trace, &signed_submission.execution_payload)?; + (bid_request, bid_trace.value.clone()) + }; + + signed_submission.verify_signature(&self.context)?; + + let execution_payload = signed_submission.execution_payload.clone(); + // NOTE: this does _not_ respect cancellations + // TODO: move to regime where we track best bid by builder + // and also move logic to cursor best bid for auction off this API + self.insert_bid_if_greater(bid_request, execution_payload, value)?; + + Ok(()) + } +} diff --git a/mev-relay-rs/src/service.rs b/mev-relay-rs/src/service.rs index 5d64c6c3..c3674c51 100644 --- a/mev-relay-rs/src/service.rs +++ b/mev-relay-rs/src/service.rs @@ -3,12 +3,13 @@ use beacon_api_client::mainnet::Client; use ethereum_consensus::{ crypto::SecretKey, networks::{self, Network}, + primitives::BlsPublicKey, state_transition::Context, }; use futures::StreamExt; use mev_rs::{blinded_block_provider::Server as BlindedBlockProviderServer, Error}; use serde::Deserialize; -use std::{future::Future, net::Ipv4Addr, pin::Pin, sync::Arc, task::Poll}; +use std::{future::Future, net::Ipv4Addr, pin::Pin, task::Poll}; use tokio::task::{JoinError, JoinHandle}; use url::Url; @@ -18,6 +19,7 @@ pub struct Config { pub port: u16, pub beacon_node_url: String, pub secret_key: SecretKey, + pub accepted_builders: Vec, } impl Default for Config { @@ -27,6 +29,7 @@ impl Default for Config { port: 28545, beacon_node_url: "http://127.0.0.1:5052".into(), secret_key: Default::default(), + accepted_builders: Default::default(), } } } @@ -37,6 +40,7 @@ pub struct Service { beacon_node: Client, network: Network, secret_key: SecretKey, + accepted_builders: Vec, } impl Service { @@ -49,24 +53,29 @@ impl Service { beacon_node, network, secret_key: config.secret_key, + accepted_builders: config.accepted_builders, } } /// Configures the [`Relay`] and the [`BlindedBlockProviderServer`] and spawns both to /// individual tasks pub async fn spawn(self) -> Result { - let Self { host, port, beacon_node, network, secret_key } = self; + let Self { host, port, beacon_node, network, secret_key, accepted_builders } = self; let context = Context::try_from(network)?; let clock = context.clock().unwrap_or_else(|| { let genesis_time = networks::typical_genesis_time(&context); context.clock_at(genesis_time) }); - let context = Arc::new(context); let genesis_details = beacon_node.get_genesis_details().await?; let genesis_validators_root = genesis_details.genesis_validators_root; - let relay = Relay::new(genesis_validators_root, beacon_node, secret_key, context); - relay.initialize().await; + let relay = Relay::new( + genesis_validators_root, + beacon_node, + secret_key, + accepted_builders, + context, + ); let block_provider = relay.clone(); let server = BlindedBlockProviderServer::new(host, port, block_provider).spawn(); @@ -77,14 +86,14 @@ impl Service { tokio::pin!(slots); let mut current_epoch = clock.current_epoch().expect("after genesis"); - let mut next_epoch = false; + relay.on_epoch(current_epoch).await; while let Some(slot) = slots.next().await { let epoch = clock.epoch_for(slot); if epoch > current_epoch { current_epoch = epoch; - next_epoch = true; + relay.on_epoch(epoch).await; } - relay.on_slot(slot, next_epoch).await; + relay.on_slot(slot).await; } }); diff --git a/mev-rs/src/error.rs b/mev-rs/src/error.rs index 9de8ac89..f0a0cd46 100644 --- a/mev-rs/src/error.rs +++ b/mev-rs/src/error.rs @@ -1,7 +1,7 @@ use crate::types::BidRequest; use beacon_api_client::Error as ApiError; use ethereum_consensus::{ - primitives::{BlsPublicKey, ExecutionAddress, Hash32, Slot}, + primitives::{BlsPublicKey, ExecutionAddress, Hash32, Slot, ValidatorIndex}, Error as ConsensusError, }; use thiserror::Error; @@ -20,26 +20,33 @@ pub enum Error { MissingProposer(Slot), #[error("could not register with any relay")] CouldNotRegister, - #[error("no preferences found for validator with public key {0:?}")] - MissingPreferences(BlsPublicKey), + // #[error("no preferences found for validator with public key {0:?}")] + // MissingPreferences(BlsPublicKey), #[error("no payload returned for opened bid with block hash {0:?}")] MissingPayload(Hash32), #[error("payload gas limit does not match the proposer's preference")] InvalidGasLimit, #[error("data for an unexpected fork was provided")] InvalidFork, - #[error("block does not match the provided header")] - UnknownBlock, - #[error("payload request does not match any outstanding bid")] - UnknownBid, + #[error("execution payload does not match the provided header")] + InvalidExecutionPayloadInBlock, #[error("validator {0:?} does not have {1:?} fee recipient")] UnknownFeeRecipient(BlsPublicKey, ExecutionAddress), + + #[error("missing payload for {0}")] + MissingBid(BidRequest), #[error("validator with public key {0:?} is not currently registered")] ValidatorNotRegistered(BlsPublicKey), + #[error("validator with index {0} is not currently registered")] + ValidatorIndexNotRegistered(ValidatorIndex), + #[error("builder with public key {0:?} is not currently registered")] + BuilderNotRegistered(BlsPublicKey), #[error(transparent)] ValidatorRegistry(#[from] crate::validator_registry::Error), #[error(transparent)] ProposerScheduler(#[from] crate::proposer_scheduler::Error), + #[error("validator registration errors: {0:?}")] + RegistrationErrors(Vec), #[error(transparent)] Consensus(#[from] ConsensusError), #[error(transparent)] diff --git a/mev-rs/src/proposer_scheduler.rs b/mev-rs/src/proposer_scheduler.rs index 578eed2f..59f6a97d 100644 --- a/mev-rs/src/proposer_scheduler.rs +++ b/mev-rs/src/proposer_scheduler.rs @@ -1,10 +1,9 @@ -use beacon_api_client::{ - mainnet::Client, BeaconProposerRegistration, Error as ApiError, ProposerDuty, -}; -use ethereum_consensus::primitives::{BlsPublicKey, Epoch, Slot}; +use crate::{types::ProposerSchedule, validator_registry::ValidatorRegistry}; +use beacon_api_client::{mainnet::Client, Error as ApiError, ProposerDuty}; +use ethereum_consensus::primitives::{Epoch, Slot}; use parking_lot::Mutex; -use std::collections::HashMap; use thiserror::Error; +use tracing::warn; #[derive(Debug, Error)] pub enum Error { @@ -14,40 +13,88 @@ pub enum Error { pub struct ProposerScheduler { api: Client, + slots_per_epoch: Slot, state: Mutex, } #[derive(Default)] struct State { - proposer_schedule: HashMap, + // schedules are monotonically increasing by `slot` + // but may not be contiguous as schedules are created only + // if we have a valid registration from the proposer + proposer_schedule: Vec, } impl ProposerScheduler { - pub fn new(api: Client) -> Self { - Self { api, state: Default::default() } + pub fn new(api: Client, slots_per_epoch: Slot) -> Self { + Self { api, slots_per_epoch, state: Default::default() } } - pub async fn dispatch_proposer_preparations( + async fn fetch_duties_if_missing( &self, - preparations: &[BeaconProposerRegistration], + epoch: Epoch, + all_duties: &mut Vec, ) -> Result<(), Error> { - self.api.prepare_proposers(preparations).await.map_err(From::from) - } - - pub async fn fetch_duties(&self, epoch: Epoch) -> Result, Error> { + { + let slot = epoch * self.slots_per_epoch; + let state = self.state.lock(); + if state.proposer_schedule.iter().any(|schedule| schedule.slot >= slot) { + return Ok(()) + } + } // TODO be tolerant to re-orgs let (_dependent_root, duties) = self.api.get_proposer_duties(epoch).await?; - let mut state = self.state.lock(); - for duty in &duties { - let slot = duty.slot; - let public_key = &duty.public_key; - state.proposer_schedule.insert(slot, public_key.clone()); + all_duties.extend(duties); + Ok(()) + } + + // Fetches proposer duties for the current epoch `epoch` and the next epoch. + async fn fetch_new_duties(&self, epoch: Epoch) -> Vec { + let mut duties = vec![]; + if let Err(err) = self.fetch_duties_if_missing(epoch, &mut duties).await { + warn!(%err, epoch, "could not get proposer duties from consensus"); + } + if let Err(err) = self.fetch_duties_if_missing(epoch + 1, &mut duties).await { + warn!(%err, epoch = epoch + 1, "could not get proposer duties from consensus"); } - Ok(duties) + duties + } + + pub async fn on_epoch( + &self, + epoch: Epoch, + validator_registry: &ValidatorRegistry, + ) -> Result<(), Error> { + let extension = self + .fetch_new_duties(epoch) + .await + .iter() + .filter_map(|duty| { + let public_key = &duty.public_key; + validator_registry.get_signed_registration(public_key).map(|entry| { + ProposerSchedule { + slot: duty.slot, + validator_index: duty.validator_index, + entry: entry.clone(), + } + }) + }) + // collect so we do the work *before* grabbing the lock + .collect::>(); + + let slot = epoch * self.slots_per_epoch; + let mut state = self.state.lock(); + // drop old schedules + state.proposer_schedule.retain(|schedule| schedule.slot >= slot); + // add new schedules + state.proposer_schedule.extend(extension); + Ok(()) } - pub fn get_proposer_for(&self, slot: Slot) -> Option { + pub fn get_proposal_schedule(&self) -> Result, Error> { + // NOTE: if external APIs hold, then the expected schedules are + // those currently in the `state`. let state = self.state.lock(); - state.proposer_schedule.get(&slot).cloned() + Ok(state.proposer_schedule.clone()) } } diff --git a/mev-rs/src/validator_registry.rs b/mev-rs/src/validator_registry.rs index 4102b1c4..f8df8c47 100644 --- a/mev-rs/src/validator_registry.rs +++ b/mev-rs/src/validator_registry.rs @@ -7,20 +7,23 @@ use beacon_api_client::{ }; use ethereum_consensus::{ builder::ValidatorRegistration, - primitives::{BlsPublicKey, ExecutionAddress, ValidatorIndex}, + primitives::{BlsPublicKey, Epoch, Slot, ValidatorIndex}, state_transition::Context, Error as ConsensusError, }; -use parking_lot::Mutex; +use parking_lot::RwLock; +use rayon::prelude::*; use std::{cmp::Ordering, collections::HashMap}; use thiserror::Error; #[derive(Debug, Error)] pub enum Error { - #[error("invalid timestamp")] - InvalidTimestamp, - #[error("validator {0} had an invalid status {1}")] - InactiveValidator(BlsPublicKey, ValidatorStatus), + #[error("local time is {1} but registration has timestamp from future: {0:?}")] + FutureRegistration(ValidatorRegistration, u64), + #[error("validator has registration from timestamp {1}; outdated registration: {0:?}")] + OutdatedRegistration(ValidatorRegistration, u64), + #[error("registration is for validator with invalid status {1}: {0:?}")] + ValidatorStatus(ValidatorRegistration, ValidatorStatus), #[error("missing knowledge of pubkey in validator set")] UnknownPubkey, #[error("missing knowledge of index in validator set")] @@ -32,11 +35,12 @@ pub enum Error { } fn validate_registration_is_not_from_future( - timestamp: u64, + message: &ValidatorRegistration, current_timestamp: u64, ) -> Result<(), Error> { + let timestamp = message.timestamp; if timestamp > current_timestamp + 10 { - Err(Error::InvalidTimestamp) + Err(Error::FutureRegistration(message.clone(), current_timestamp)) } else { Ok(()) } @@ -60,37 +64,43 @@ enum ValidatorRegistrationStatus { } fn validate_validator_status( + message: &ValidatorRegistration, status: ValidatorStatus, - public_key: &BlsPublicKey, ) -> Result<(), Error> { if matches!(status, ValidatorStatus::Pending | ValidatorStatus::ActiveOngoing) { Ok(()) } else { - Err(Error::InactiveValidator(public_key.clone(), status)) + Err(Error::ValidatorStatus(message.clone(), status)) } } #[derive(Default, Debug)] pub struct State { + // data from registered validators validator_preferences: HashMap, + // data from consensus validators: HashMap, pubkeys_by_index: HashMap, } +// Maintains validators we are aware of pub struct ValidatorRegistry { client: Client, - state: Mutex, + slots_per_epoch: Slot, + state: RwLock, } impl ValidatorRegistry { - pub fn new(client: Client) -> Self { - let state = State::default(); - Self { client, state: Mutex::new(state) } + pub fn new(client: Client, slots_per_epoch: Slot) -> Self { + let state = RwLock::new(Default::default()); + Self { client, slots_per_epoch, state } } - pub async fn load(&self) -> Result<(), Error> { - let summaries = self.client.get_validators(StateId::Head, &[], &[]).await?; - let mut state = self.state.lock(); + // TODO: load more efficiently + pub async fn on_epoch(&self, epoch: Epoch) -> Result<(), Error> { + let slot = epoch * self.slots_per_epoch; + let summaries = self.client.get_validators(StateId::Slot(slot), &[], &[]).await?; + let mut state = self.state.write(); for summary in summaries.into_iter() { let public_key = summary.validator.public_key.clone(); state.pubkeys_by_index.insert(summary.index, public_key.clone()); @@ -99,66 +109,56 @@ impl ValidatorRegistry { Ok(()) } - pub fn get_public_key(&self, index: ValidatorIndex) -> Result { - let state = self.state.lock(); - state.pubkeys_by_index.get(&index).cloned().ok_or(Error::UnknownIndex) - } - - pub fn get_validator_index(&self, public_key: &BlsPublicKey) -> Option { - let state = self.state.lock(); - state.validators.get(public_key).map(|v| v.index) + pub fn get_public_key(&self, index: ValidatorIndex) -> Option { + let state = self.state.read(); + state.pubkeys_by_index.get(&index).cloned() } - pub fn get_preferences(&self, public_key: &BlsPublicKey) -> Option { - let state = self.state.lock(); - state.validator_preferences.get(public_key).map(|registration| registration.message.clone()) - } + // pub fn get_validator_index(&self, public_key: &BlsPublicKey) -> Option { + // let state = self.state.read(); + // state.validators.get(public_key).map(|v| v.index) + // } - pub fn find_public_key_by_fee_recipient( + pub fn get_signed_registration( &self, - fee_recipient: &ExecutionAddress, - ) -> Option { - let state = self.state.lock(); - state - .validator_preferences - .iter() - .find(|&(_, preferences)| &preferences.message.fee_recipient == fee_recipient) - .map(|(key, _)| key.clone()) + public_key: &BlsPublicKey, + ) -> Option { + let state = self.state.read(); + state.validator_preferences.get(public_key).cloned() } - pub fn validate_registrations( - &self, - registrations: &mut [SignedValidatorRegistration], + // pub fn find_public_key_by_fee_recipient( + // &self, + // fee_recipient: &ExecutionAddress, + // ) -> Option { + // let state = self.state.lock(); + // state + // .validator_preferences + // .iter() + // .find(|&(_, preferences)| &preferences.message.fee_recipient == fee_recipient) + // .map(|(key, _)| key.clone()) + // } + + fn process_registration<'a>( + &'a self, + registration: &'a mut SignedValidatorRegistration, current_timestamp: u64, context: &Context, - ) -> Result<(), Error> { - for registration in registrations.iter_mut() { - // TODO one failure should not fail the others... - self.validate_registration(registration, current_timestamp, context)?; - } - Ok(()) - } - - fn validate_registration( - &self, - registration: &mut SignedValidatorRegistration, - current_timestamp: u64, - context: &Context, - ) -> Result<(), Error> { - let mut state = self.state.lock(); + ) -> Result, Error> { + let state = self.state.read(); let latest_timestamp = state .validator_preferences .get(®istration.message.public_key) .map(|r| r.message.timestamp); let message = &mut registration.message; - validate_registration_is_not_from_future(message.timestamp, current_timestamp)?; + validate_registration_is_not_from_future(message, current_timestamp)?; let registration_status = if let Some(latest_timestamp) = latest_timestamp { let status = determine_validator_registration_status(message.timestamp, latest_timestamp); if matches!(status, ValidatorRegistrationStatus::Outdated) { - return Err(Error::InvalidTimestamp) + return Err(Error::OutdatedRegistration(message.clone(), latest_timestamp)) } status } else { @@ -171,16 +171,38 @@ impl ValidatorRegistry { .get(public_key) .map(|validator| validator.status) .ok_or(Error::UnknownPubkey)?; - validate_validator_status(validator_status, public_key)?; + validate_validator_status(message, validator_status)?; let signing_root = compute_builder_signing_root(message, context)?; let public_key = &message.public_key; verify_signature(public_key, signing_root.as_ref(), ®istration.signature)?; - if matches!(registration_status, ValidatorRegistrationStatus::New) { - let public_key = registration.message.public_key.clone(); - state.validator_preferences.insert(public_key, registration.clone()); + let update = if matches!(registration_status, ValidatorRegistrationStatus::New) { + Some(registration) + } else { + None + }; + Ok(update) + } + + pub fn process_registrations( + &self, + registrations: &mut [SignedValidatorRegistration], + current_timestamp: u64, + context: &Context, + ) -> Result<(), Vec> { + let (updates, errs): (Vec<_>, Vec<_>) = registrations + .par_iter_mut() + .map(|registration| self.process_registration(registration, current_timestamp, context)) + .partition(|result| result.is_ok()); + let mut state = self.state.write(); + for update in updates { + if let Some(signed_registration) = update.expect("validated successfully") { + let public_key = signed_registration.message.public_key.clone(); + state.validator_preferences.insert(public_key, signed_registration.clone()); + } } - Ok(()) + + Err(errs.into_iter().map(|err| err.expect_err("validation failed")).collect()) } }