Skip to content

Commit

Permalink
harden validation and add timeouts for requests
Browse files Browse the repository at this point in the history
  • Loading branch information
ralexstokes committed May 12, 2024
1 parent 4c70b4c commit c06d555
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 51 deletions.
106 changes: 57 additions & 49 deletions mev-boost-rs/src/relay_mux.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_trait::async_trait;
use ethereum_consensus::{
deneb::polynomial_commitments::KzgCommitment,
crypto::KzgCommitment,
primitives::{BlsPublicKey, Hash32, Slot, U256},
state_transition::Context,
};
Expand All @@ -17,18 +17,22 @@ use mev_rs::{
use parking_lot::Mutex;
use rand::prelude::*;
use std::{cmp::Ordering, collections::HashMap, ops::Deref, sync::Arc, time::Duration};
use tokio::time::timeout;
use tracing::{debug, info, warn};

// Track an auction for this amount of time, in slots.
const AUCTION_LIFETIME: u64 = 2;
// Give relays this amount of time in seconds to process validator registrations.
const VALIDATOR_REGISTRATION_TIME_OUT_SECS: u64 = 4;
// Give relays this amount of time in seconds to return bids.
const FETCH_BEST_BID_TIME_OUT_SECS: u64 = 1;
// Give relays this amount of time in seconds to respond with a payload.
const FETCH_PAYLOAD_TIME_OUT_SECS: u64 = 1;

#[derive(Debug)]
struct AuctionContext {
slot: Slot,
relays: Vec<Arc<Relay>>,
blob_commitments: Option<Vec<KzgCommitment>>,
}

fn validate_bid(
Expand All @@ -49,7 +53,6 @@ fn validate_bid(
}

fn validate_payload(
context: &AuctionContext,
contents: &AuctionContents,
expected_block_hash: &Hash32,
expected_commitments: Option<&[KzgCommitment]>,
Expand All @@ -62,20 +65,19 @@ fn validate_payload(
})
}
let provided_commitments = contents.blobs_bundle().map(|bundle| &bundle.commitments);
match (&context.blob_commitments, expected_commitments, provided_commitments) {
(Some(bid), Some(expected), Some(provided)) => {
let bid_matches = bid == expected;
let provided_matches = bid == provided.as_ref();
if bid_matches && provided_matches {
match (expected_commitments, provided_commitments) {
(Some(expected), Some(provided)) => {
if expected == provided.as_ref() {
Ok(())
} else {
// TODO: provide error data
Err(BoostError::InvalidPayloadBlobs)
Err(BoostError::InvalidPayloadBlobs {
expected: expected.to_vec(),
provided: provided.to_vec(),
})
}
}
(None, None, None) => Ok(()),
// TODO: provide error data
_ => Err(BoostError::InvalidPayloadBlobs),
(None, None) => Ok(()),
_ => Err(BoostError::InvalidPayloadUnexpectedBlobs),
}
}

Expand Down Expand Up @@ -151,23 +153,30 @@ impl BlindedBlockProvider for RelayMux {
registrations: &[SignedValidatorRegistration],
) -> Result<(), Error> {
let responses = stream::iter(self.relays.iter().cloned())
.map(|relay| async move {
let response = relay.register_validators(registrations).await;
(relay, response)
.map(|relay| async {
let request = relay.register_validators(registrations);
let duration = Duration::from_secs(VALIDATOR_REGISTRATION_TIME_OUT_SECS);
let result = timeout(duration, request).await;
(relay, result)
})
.buffer_unordered(self.relays.len())
.filter_map(|(relay, result)| async move {
match result {
Ok(Ok(_)) => Some(()),
Ok(Err(err)) => {
warn!(%err, %relay, "failure when registering validator(s)");
None
}
Err(_) => {
warn!(%relay, "timeout when registering validator(s)");
None
}
}
})
.collect::<Vec<_>>()
.await;

let mut num_failures = 0;
for (relay, response) in responses {
if let Err(err) = response {
num_failures += 1;
warn!(%relay, %err, "failed to register validator");
}
}

if num_failures == self.relays.len() {
if responses.is_empty() {
Err(BoostError::CouldNotRegister.into())
} else {
let count = registrations.len();
Expand All @@ -181,17 +190,15 @@ impl BlindedBlockProvider for RelayMux {
auction_request: &AuctionRequest,
) -> Result<SignedBuilderBid, Error> {
let bids = stream::iter(self.relays.iter().cloned())
.map(|relay| async move {
let response = tokio::time::timeout(
Duration::from_secs(FETCH_BEST_BID_TIME_OUT_SECS),
relay.fetch_best_bid(auction_request),
)
.await;
(relay, response)
})
.map(|relay| async {
let request = relay.fetch_best_bid(auction_request);
let duration = Duration::from_secs(FETCH_BEST_BID_TIME_OUT_SECS);
let result = timeout(duration, request).await;
(relay, result)
})
.buffer_unordered(self.relays.len())
.filter_map(|(relay, response)| async {
match response {
.filter_map(|(relay, result)| async {
match result {
Ok(Ok(bid)) => {
if let Err(err) = validate_bid(&bid, &relay.public_key, &self.context) {
warn!(%err, %relay, "invalid signed builder bid");
Expand Down Expand Up @@ -255,15 +262,7 @@ impl BlindedBlockProvider for RelayMux {

{
let mut state = self.state.lock();
let auction_context = AuctionContext {
slot,
relays: best_relays,
// TODO what value does this add?
blob_commitments: best_bid
.message
.blob_kzg_commitments()
.map(|commitments| commitments.to_vec()),
};
let auction_context = AuctionContext { slot, relays: best_relays };
state.outstanding_bids.insert(best_block_hash.clone(), Arc::new(auction_context));
}

Expand All @@ -280,20 +279,29 @@ impl BlindedBlockProvider for RelayMux {
let expected_block_hash = body.execution_payload_header().block_hash().clone();
let context = self.get_context(&expected_block_hash)?;

// TODO: avoid clone; move to join set
let responses = stream::iter(context.relays.clone())
let responses = stream::iter(context.relays.iter().cloned())
.map(|relay| async move {
let response = relay.open_bid(signed_block).await;
(relay, response)
let request = relay.open_bid(signed_block);
let duration = Duration::from_secs(FETCH_PAYLOAD_TIME_OUT_SECS);
let result = timeout(duration, request).await;
(relay, result)
})
.buffer_unordered(self.relays.len())
.filter_map(|(relay, result)| async move {
match result {
Ok(response) => Some((relay, response)),
Err(_) => {
warn!( %relay, "timeout when opening bid");
None
}
}
})
.collect::<Vec<_>>()
.await;

for (relay, response) in responses.into_iter() {
match response {
Ok(auction_contents) => match validate_payload(
&context,
&auction_contents,
&expected_block_hash,
body.blob_kzg_commitments().map(|commitments| commitments.as_slice()),
Expand Down
9 changes: 7 additions & 2 deletions mev-rs/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::types::AuctionRequest;
use beacon_api_client::Error as ApiError;
use ethereum_consensus::{
crypto::KzgCommitment,
primitives::{BlsPublicKey, ExecutionAddress, Hash32, ValidatorIndex},
Error as ConsensusError, Fork,
};
Expand All @@ -18,8 +19,12 @@ pub enum BoostError {
MissingPayload(Hash32),
#[error("returned payload block hash {provided} did not match expected {expected}")]
InvalidPayloadHash { expected: Hash32, provided: Hash32 },
#[error("signed block did not match the expected blob commitments")]
InvalidPayloadBlobs,
#[error("blobs provided when they were unexpected")]
InvalidPayloadUnexpectedBlobs,
#[error(
"signed block did not match the expected blob commitments ({expected:?} vs {provided:?})"
)]
InvalidPayloadBlobs { expected: Vec<KzgCommitment>, provided: Vec<KzgCommitment> },
}

#[derive(Debug, Error)]
Expand Down

0 comments on commit c06d555

Please sign in to comment.