diff --git a/bin/mev/src/cmd/boost.rs b/bin/mev/src/cmd/boost.rs index 5537ab51..273ee327 100644 --- a/bin/mev/src/cmd/boost.rs +++ b/bin/mev/src/cmd/boost.rs @@ -21,7 +21,9 @@ impl Command { info!("configured for `{network}`"); if let Some(config) = config.boost { - Ok(Service::from(network, config).spawn().await?.await?) + let service = Service::from(network, config); + let handle = service.spawn()?; + Ok(handle.await?) } else { Err(eyre::eyre!("missing boost config from file provided")) } diff --git a/mev-boost-rs/src/relay_mux.rs b/mev-boost-rs/src/relay_mux.rs index 287fd1ee..a6494996 100644 --- a/mev-boost-rs/src/relay_mux.rs +++ b/mev-boost-rs/src/relay_mux.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; use ethereum_consensus::{ - primitives::{BlsPublicKey, Epoch, Slot, U256}, + crypto::KzgCommitment, + primitives::{BlsPublicKey, Hash32, Slot, U256}, state_transition::Context, }; use futures_util::{stream, StreamExt}; @@ -16,23 +17,22 @@ use mev_rs::{ use parking_lot::Mutex; use rand::prelude::*; use std::{cmp::Ordering, collections::HashMap, ops::Deref, sync::Arc, time::Duration}; +use tokio::time::timeout; use tracing::{debug, info, warn}; -// See note in the `mev-relay-rs::Relay` about this constant. -// TODO likely drop this feature... -const PROPOSAL_TOLERANCE_DELAY: Slot = 1; +// Track an auction for this amount of time, in slots. +const AUCTION_LIFETIME: u64 = 2; +// Give relays this amount of time in seconds to process validator registrations. +const VALIDATOR_REGISTRATION_TIME_OUT_SECS: u64 = 4; // Give relays this amount of time in seconds to return bids. const FETCH_BEST_BID_TIME_OUT_SECS: u64 = 1; +// Give relays this amount of time in seconds to respond with a payload. +const FETCH_PAYLOAD_TIME_OUT_SECS: u64 = 1; -fn bid_key_from( - signed_block: &SignedBlindedBeaconBlock, - public_key: &BlsPublicKey, -) -> AuctionRequest { - let slot = signed_block.message().slot(); - let parent_hash = - signed_block.message().body().execution_payload_header().parent_hash().clone(); - - AuctionRequest { slot, parent_hash, public_key: public_key.clone() } +#[derive(Debug)] +struct AuctionContext { + slot: Slot, + relays: Vec>, } fn validate_bid( @@ -52,6 +52,35 @@ fn validate_bid( .map_err(Into::into) } +fn validate_payload( + contents: &AuctionContents, + expected_block_hash: &Hash32, + expected_commitments: Option<&[KzgCommitment]>, +) -> Result<(), BoostError> { + let provided_block_hash = contents.execution_payload().block_hash(); + if expected_block_hash != provided_block_hash { + return Err(BoostError::InvalidPayloadHash { + expected: expected_block_hash.clone(), + provided: provided_block_hash.clone(), + }) + } + let provided_commitments = contents.blobs_bundle().map(|bundle| &bundle.commitments); + match (expected_commitments, provided_commitments) { + (Some(expected), Some(provided)) => { + if expected == provided.as_ref() { + Ok(()) + } else { + Err(BoostError::InvalidPayloadBlobs { + expected: expected.to_vec(), + provided: provided.to_vec(), + }) + } + } + (None, None) => Ok(()), + _ => Err(BoostError::InvalidPayloadUnexpectedBlobs), + } +} + // Select the most valuable bids in `bids`, breaking ties by `block_hash` fn select_best_bids(bids: impl Iterator) -> Vec { let (best_indices, _value) = @@ -81,42 +110,39 @@ impl Deref for RelayMux { pub struct Inner { relays: Vec>, - context: Context, + context: Arc, state: Mutex, } #[derive(Debug, Default)] struct State { - // map from bid requests to index of `Relay` in collection - outstanding_bids: HashMap>>, - current_epoch_registration_count: usize, - latest_pubkey: BlsPublicKey, + outstanding_bids: HashMap>, } impl RelayMux { - pub fn new(relays: impl Iterator, context: Context) -> Self { - let inner = - Inner { relays: relays.map(Arc::new).collect(), context, state: Default::default() }; + pub fn new(relays: Vec, context: Arc) -> Self { + let inner = Inner { + relays: relays.into_iter().map(Arc::new).collect(), + context, + state: Default::default(), + }; Self(Arc::new(inner)) } pub fn on_slot(&self, slot: Slot) { debug!(slot, "processing"); + let retain_slot = slot - AUCTION_LIFETIME; let mut state = self.state.lock(); - state - .outstanding_bids - .retain(|auction_request, _| auction_request.slot + PROPOSAL_TOLERANCE_DELAY >= slot); + state.outstanding_bids.retain(|_, auction| auction.slot >= retain_slot); } - pub fn on_epoch(&self, epoch: Epoch) { - debug!(epoch, "processing"); - let count = { - let mut state = self.state.lock(); - let count = state.current_epoch_registration_count; - state.current_epoch_registration_count = 0; - count - }; - info!(count, epoch, "processed validator registrations") + fn get_context(&self, key: &Hash32) -> Result, Error> { + let state = self.state.lock(); + state + .outstanding_bids + .get(key) + .cloned() + .ok_or_else::(|| BoostError::MissingOpenBid(key.clone()).into()) } } @@ -128,28 +154,33 @@ impl BlindedBlockProvider for RelayMux { ) -> Result<(), Error> { let responses = stream::iter(self.relays.iter().cloned()) .map(|relay| async { - let response = relay.register_validators(registrations).await; - (relay, response) + let request = relay.register_validators(registrations); + let duration = Duration::from_secs(VALIDATOR_REGISTRATION_TIME_OUT_SECS); + let result = timeout(duration, request).await; + (relay, result) }) .buffer_unordered(self.relays.len()) + .filter_map(|(relay, result)| async move { + match result { + Ok(Ok(_)) => Some(()), + Ok(Err(err)) => { + warn!(%err, %relay, "failure when registering validator(s)"); + None + } + Err(_) => { + warn!(%relay, "timeout when registering validator(s)"); + None + } + } + }) .collect::>() .await; - let mut num_failures = 0; - for (relay, response) in responses { - if let Err(err) = response { - num_failures += 1; - warn!(%relay, %err, "failed to register validator"); - } - } - - if num_failures == self.relays.len() { + if responses.is_empty() { Err(BoostError::CouldNotRegister.into()) } else { let count = registrations.len(); info!(count, "sent validator registrations"); - let mut state = self.state.lock(); - state.current_epoch_registration_count += registrations.len(); Ok(()) } } @@ -160,16 +191,14 @@ impl BlindedBlockProvider for RelayMux { ) -> Result { let bids = stream::iter(self.relays.iter().cloned()) .map(|relay| async { - let response = tokio::time::timeout( - Duration::from_secs(FETCH_BEST_BID_TIME_OUT_SECS), - relay.fetch_best_bid(auction_request), - ) - .await; - (relay, response) - }) + let request = relay.fetch_best_bid(auction_request); + let duration = Duration::from_secs(FETCH_BEST_BID_TIME_OUT_SECS); + let result = timeout(duration, request).await; + (relay, result) + }) .buffer_unordered(self.relays.len()) - .filter_map(|(relay, response)| async { - match response { + .filter_map(|(relay, result)| async { + match result { Ok(Ok(bid)) => { if let Err(err) = validate_bid(&bid, &relay.public_key, &self.context) { warn!(%err, %relay, "invalid signed builder bid"); @@ -221,19 +250,20 @@ impl BlindedBlockProvider for RelayMux { } } - let relays_desc = best_relays - .iter() - .map(|relay| format!("{relay}")) - .reduce(|desc, next| format!("{desc}, {next}")) - .expect("at least one relay"); - info!(%auction_request, %best_bid, relays=relays_desc, "acquired best bid"); + let slot = auction_request.slot; + info!( + slot, + parent_hash = ?auction_request.parent_hash, + public_key = ?auction_request.public_key, + %best_bid, + relays = ?best_relays, + "acquired best bid" + ); { let mut state = self.state.lock(); - // assume the next request to open a bid corresponds to the current request - // TODO consider if the relay mux should have more knowledge about the proposal - state.latest_pubkey = auction_request.public_key.clone(); - state.outstanding_bids.insert(auction_request.clone(), best_relays); + let auction_context = AuctionContext { slot, relays: best_relays }; + state.outstanding_bids.insert(best_block_hash.clone(), Arc::new(auction_context)); } Ok(best_bid.clone()) @@ -243,42 +273,47 @@ impl BlindedBlockProvider for RelayMux { &self, signed_block: &SignedBlindedBeaconBlock, ) -> Result { - let (auction_request, relays) = { - let mut state = self.state.lock(); - let key = bid_key_from(signed_block, &state.latest_pubkey); - // TODO: do not `remove` so this endpoint can be retried - let relays = state - .outstanding_bids - .remove(&key) - .ok_or_else::(|| BoostError::MissingOpenBid.into())?; - (key, relays) - }; + let block = signed_block.message(); + let slot = block.slot(); + let body = block.body(); + let expected_block_hash = body.execution_payload_header().block_hash().clone(); + let context = self.get_context(&expected_block_hash)?; - let signed_block = &signed_block; - let responses = stream::iter(relays) + let responses = stream::iter(context.relays.iter().cloned()) .map(|relay| async move { - let response = relay.open_bid(signed_block).await; - (relay, response) + let request = relay.open_bid(signed_block); + let duration = Duration::from_secs(FETCH_PAYLOAD_TIME_OUT_SECS); + let result = timeout(duration, request).await; + (relay, result) }) .buffer_unordered(self.relays.len()) + .filter_map(|(relay, result)| async move { + match result { + Ok(response) => Some((relay, response)), + Err(_) => { + warn!( %relay, "timeout when opening bid"); + None + } + } + }) .collect::>() .await; - let block = signed_block.message(); - let block_body = block.body(); - let payload_header = block_body.execution_payload_header(); - let expected_block_hash = payload_header.block_hash(); for (relay, response) in responses.into_iter() { match response { - Ok(auction_contents) => { - let block_hash = auction_contents.execution_payload().block_hash(); - if block_hash == expected_block_hash { - info!(%auction_request, %block_hash, %relay, "acquired payload"); + Ok(auction_contents) => match validate_payload( + &auction_contents, + &expected_block_hash, + body.blob_kzg_commitments().map(|commitments| commitments.as_slice()), + ) { + Ok(_) => { + info!(%slot, block_hash = %expected_block_hash, %relay, "acquired payload"); return Ok(auction_contents) - } else { - warn!(?block_hash, ?expected_block_hash, %relay, "incorrect block hash delivered by relay"); } - } + Err(err) => { + warn!(?err, ?relay, "could not validate payload"); + } + }, Err(err) => { warn!(%err, %relay, "error opening bid"); } diff --git a/mev-boost-rs/src/service.rs b/mev-boost-rs/src/service.rs index 92b196b2..eaf24f8a 100644 --- a/mev-boost-rs/src/service.rs +++ b/mev-boost-rs/src/service.rs @@ -4,11 +4,11 @@ use futures_util::StreamExt; use mev_rs::{ blinded_block_provider::Server as BlindedBlockProviderServer, get_genesis_time, - relay::{parse_relay_endpoints, Relay, RelayEndpoint}, + relay::{parse_relay_endpoints, Relay}, Error, }; use serde::Deserialize; -use std::{future::Future, net::Ipv4Addr, pin::Pin, task::Poll}; +use std::{future::Future, net::Ipv4Addr, pin::Pin, sync::Arc, task::Poll}; use tokio::task::{JoinError, JoinHandle}; use tracing::{info, warn}; @@ -29,20 +29,19 @@ impl Default for Config { pub struct Service { host: Ipv4Addr, port: u16, - relays: Vec, + relays: Vec, network: Network, config: Config, } impl Service { pub fn from(network: Network, config: Config) -> Self { - let relays = parse_relay_endpoints(&config.relays); + let relays = parse_relay_endpoints(&config.relays).into_iter().map(Relay::from).collect(); Self { host: config.host, port: config.port, relays, network, config } } - /// Spawns a new [`RelayMux`] and [`BlindedBlockProviderServer`] task - pub async fn spawn(self) -> Result { + pub fn spawn(self) -> Result { let Self { host, port, relays, network, config } = self; if relays.is_empty() { @@ -52,30 +51,20 @@ impl Service { info!(count, ?relays, "configured with relay(s)"); } - let relays = relays.into_iter().map(Relay::from); - - let context = Context::try_from(network)?; - let genesis_time = get_genesis_time(&context, config.beacon_node_url.as_ref(), None).await; - let clock = context.clock_at(genesis_time); - let relay_mux = RelayMux::new(relays, context); + let context = Arc::new(Context::try_from(network)?); + let relay_mux = RelayMux::new(relays, context.clone()); let relay_mux_clone = relay_mux.clone(); let relay_task = tokio::spawn(async move { let relay_mux = relay_mux_clone; + let genesis_time = + get_genesis_time(&context, config.beacon_node_url.as_ref(), None).await; + let clock = context.clock_at(genesis_time); let mut slots = clock.clone().into_stream(); // NOTE: this will block until genesis if we are before the genesis time - let current_slot = slots.next().await.expect("some next slot"); - let mut current_epoch = clock.epoch_for(current_slot); - while let Some(slot) = slots.next().await { relay_mux.on_slot(slot); - - let epoch = clock.epoch_for(slot); - if epoch != current_epoch { - relay_mux.on_epoch(epoch); - current_epoch = epoch; - } } }); @@ -85,9 +74,6 @@ impl Service { } } -/// Contains the handles to spawned [`RelayMux`] and [`BlindedBlockProviderServer`] tasks -/// -/// This struct is created by the [`Service::spawn`] function #[pin_project::pin_project] pub struct ServiceHandle { #[pin] diff --git a/mev-boost-rs/tests/integration.rs b/mev-boost-rs/tests/integration.rs index eb53f84f..e63850ba 100644 --- a/mev-boost-rs/tests/integration.rs +++ b/mev-boost-rs/tests/integration.rs @@ -99,7 +99,7 @@ async fn test_end_to_end() { let mux_port = config.port; let service = Service::from(network, config); - service.spawn().await.unwrap(); + service.spawn().unwrap(); let beacon_node = RelayClient::new(ApiClient::new( Url::parse(&format!("http://127.0.0.1:{mux_port}")).unwrap(), diff --git a/mev-rs/src/error.rs b/mev-rs/src/error.rs index 2fe0094e..a5c31acc 100644 --- a/mev-rs/src/error.rs +++ b/mev-rs/src/error.rs @@ -1,6 +1,7 @@ use crate::types::AuctionRequest; use beacon_api_client::Error as ApiError; use ethereum_consensus::{ + crypto::KzgCommitment, primitives::{BlsPublicKey, ExecutionAddress, Hash32, ValidatorIndex}, Error as ConsensusError, Fork, }; @@ -10,12 +11,20 @@ use thiserror::Error; pub enum BoostError { #[error("bid public key {bid} does not match relay public key {relay}")] BidPublicKeyMismatch { bid: BlsPublicKey, relay: BlsPublicKey }, - #[error("could not find relay with outstanding bid to accept")] - MissingOpenBid, + #[error("could not find relay with outstanding bid to accept for block {0}")] + MissingOpenBid(Hash32), #[error("could not register with any relay")] CouldNotRegister, #[error("no payload returned for opened bid with block hash {0:?}")] MissingPayload(Hash32), + #[error("returned payload block hash {provided} did not match expected {expected}")] + InvalidPayloadHash { expected: Hash32, provided: Hash32 }, + #[error("blobs provided when they were unexpected")] + InvalidPayloadUnexpectedBlobs, + #[error( + "signed block did not match the expected blob commitments ({expected:?} vs {provided:?})" + )] + InvalidPayloadBlobs { expected: Vec, provided: Vec }, } #[derive(Debug, Error)] diff --git a/mev-rs/src/types/mod.rs b/mev-rs/src/types/mod.rs index a64a595e..a3859356 100644 --- a/mev-rs/src/types/mod.rs +++ b/mev-rs/src/types/mod.rs @@ -10,7 +10,7 @@ pub use block_submission::{BidTrace, SignedBidSubmission}; pub use builder_bid::{BuilderBid, SignedBuilderBid}; pub use ethereum_consensus::builder::SignedValidatorRegistration; pub use ethereum_consensus_types::{ - ExecutionPayload, ExecutionPayloadHeader, SignedBlindedBeaconBlock, + BlindedBeaconBlockBody, ExecutionPayload, ExecutionPayloadHeader, SignedBlindedBeaconBlock, }; pub use proposer_schedule::*;