Skip to content

Commit

Permalink
Merge pull request #255 from ralexstokes/boost/actor-refactor
Browse files Browse the repository at this point in the history
misc refactoring for boost
  • Loading branch information
ralexstokes authored May 12, 2024
2 parents 44101b0 + 93ff3f3 commit 711d04d
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 119 deletions.
4 changes: 3 additions & 1 deletion bin/mev/src/cmd/boost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ impl Command {
info!("configured for `{network}`");

if let Some(config) = config.boost {
Ok(Service::from(network, config).spawn().await?.await?)
let service = Service::from(network, config);
let handle = service.spawn()?;
Ok(handle.await?)
} else {
Err(eyre::eyre!("missing boost config from file provided"))
}
Expand Down
215 changes: 125 additions & 90 deletions mev-boost-rs/src/relay_mux.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use async_trait::async_trait;
use ethereum_consensus::{
primitives::{BlsPublicKey, Epoch, Slot, U256},
crypto::KzgCommitment,
primitives::{BlsPublicKey, Hash32, Slot, U256},
state_transition::Context,
};
use futures_util::{stream, StreamExt};
Expand All @@ -16,23 +17,22 @@ use mev_rs::{
use parking_lot::Mutex;
use rand::prelude::*;
use std::{cmp::Ordering, collections::HashMap, ops::Deref, sync::Arc, time::Duration};
use tokio::time::timeout;
use tracing::{debug, info, warn};

// See note in the `mev-relay-rs::Relay` about this constant.
// TODO likely drop this feature...
const PROPOSAL_TOLERANCE_DELAY: Slot = 1;
// Track an auction for this amount of time, in slots.
const AUCTION_LIFETIME: u64 = 2;
// Give relays this amount of time in seconds to process validator registrations.
const VALIDATOR_REGISTRATION_TIME_OUT_SECS: u64 = 4;
// Give relays this amount of time in seconds to return bids.
const FETCH_BEST_BID_TIME_OUT_SECS: u64 = 1;
// Give relays this amount of time in seconds to respond with a payload.
const FETCH_PAYLOAD_TIME_OUT_SECS: u64 = 1;

fn bid_key_from(
signed_block: &SignedBlindedBeaconBlock,
public_key: &BlsPublicKey,
) -> AuctionRequest {
let slot = signed_block.message().slot();
let parent_hash =
signed_block.message().body().execution_payload_header().parent_hash().clone();

AuctionRequest { slot, parent_hash, public_key: public_key.clone() }
#[derive(Debug)]
struct AuctionContext {
slot: Slot,
relays: Vec<Arc<Relay>>,
}

fn validate_bid(
Expand All @@ -52,6 +52,35 @@ fn validate_bid(
.map_err(Into::into)
}

fn validate_payload(
contents: &AuctionContents,
expected_block_hash: &Hash32,
expected_commitments: Option<&[KzgCommitment]>,
) -> Result<(), BoostError> {
let provided_block_hash = contents.execution_payload().block_hash();
if expected_block_hash != provided_block_hash {
return Err(BoostError::InvalidPayloadHash {
expected: expected_block_hash.clone(),
provided: provided_block_hash.clone(),
})
}
let provided_commitments = contents.blobs_bundle().map(|bundle| &bundle.commitments);
match (expected_commitments, provided_commitments) {
(Some(expected), Some(provided)) => {
if expected == provided.as_ref() {
Ok(())
} else {
Err(BoostError::InvalidPayloadBlobs {
expected: expected.to_vec(),
provided: provided.to_vec(),
})
}
}
(None, None) => Ok(()),
_ => Err(BoostError::InvalidPayloadUnexpectedBlobs),
}
}

// Select the most valuable bids in `bids`, breaking ties by `block_hash`
fn select_best_bids(bids: impl Iterator<Item = (usize, U256)>) -> Vec<usize> {
let (best_indices, _value) =
Expand Down Expand Up @@ -81,42 +110,39 @@ impl Deref for RelayMux {

pub struct Inner {
relays: Vec<Arc<Relay>>,
context: Context,
context: Arc<Context>,
state: Mutex<State>,
}

#[derive(Debug, Default)]
struct State {
// map from bid requests to index of `Relay` in collection
outstanding_bids: HashMap<AuctionRequest, Vec<Arc<Relay>>>,
current_epoch_registration_count: usize,
latest_pubkey: BlsPublicKey,
outstanding_bids: HashMap<Hash32, Arc<AuctionContext>>,
}

impl RelayMux {
pub fn new(relays: impl Iterator<Item = Relay>, context: Context) -> Self {
let inner =
Inner { relays: relays.map(Arc::new).collect(), context, state: Default::default() };
pub fn new(relays: Vec<Relay>, context: Arc<Context>) -> Self {
let inner = Inner {
relays: relays.into_iter().map(Arc::new).collect(),
context,
state: Default::default(),
};
Self(Arc::new(inner))
}

pub fn on_slot(&self, slot: Slot) {
debug!(slot, "processing");
let retain_slot = slot - AUCTION_LIFETIME;
let mut state = self.state.lock();
state
.outstanding_bids
.retain(|auction_request, _| auction_request.slot + PROPOSAL_TOLERANCE_DELAY >= slot);
state.outstanding_bids.retain(|_, auction| auction.slot >= retain_slot);
}

pub fn on_epoch(&self, epoch: Epoch) {
debug!(epoch, "processing");
let count = {
let mut state = self.state.lock();
let count = state.current_epoch_registration_count;
state.current_epoch_registration_count = 0;
count
};
info!(count, epoch, "processed validator registrations")
fn get_context(&self, key: &Hash32) -> Result<Arc<AuctionContext>, Error> {
let state = self.state.lock();
state
.outstanding_bids
.get(key)
.cloned()
.ok_or_else::<Error, _>(|| BoostError::MissingOpenBid(key.clone()).into())
}
}

Expand All @@ -128,28 +154,33 @@ impl BlindedBlockProvider for RelayMux {
) -> Result<(), Error> {
let responses = stream::iter(self.relays.iter().cloned())
.map(|relay| async {
let response = relay.register_validators(registrations).await;
(relay, response)
let request = relay.register_validators(registrations);
let duration = Duration::from_secs(VALIDATOR_REGISTRATION_TIME_OUT_SECS);
let result = timeout(duration, request).await;
(relay, result)
})
.buffer_unordered(self.relays.len())
.filter_map(|(relay, result)| async move {
match result {
Ok(Ok(_)) => Some(()),
Ok(Err(err)) => {
warn!(%err, %relay, "failure when registering validator(s)");
None
}
Err(_) => {
warn!(%relay, "timeout when registering validator(s)");
None
}
}
})
.collect::<Vec<_>>()
.await;

let mut num_failures = 0;
for (relay, response) in responses {
if let Err(err) = response {
num_failures += 1;
warn!(%relay, %err, "failed to register validator");
}
}

if num_failures == self.relays.len() {
if responses.is_empty() {
Err(BoostError::CouldNotRegister.into())
} else {
let count = registrations.len();
info!(count, "sent validator registrations");
let mut state = self.state.lock();
state.current_epoch_registration_count += registrations.len();
Ok(())
}
}
Expand All @@ -160,16 +191,14 @@ impl BlindedBlockProvider for RelayMux {
) -> Result<SignedBuilderBid, Error> {
let bids = stream::iter(self.relays.iter().cloned())
.map(|relay| async {
let response = tokio::time::timeout(
Duration::from_secs(FETCH_BEST_BID_TIME_OUT_SECS),
relay.fetch_best_bid(auction_request),
)
.await;
(relay, response)
})
let request = relay.fetch_best_bid(auction_request);
let duration = Duration::from_secs(FETCH_BEST_BID_TIME_OUT_SECS);
let result = timeout(duration, request).await;
(relay, result)
})
.buffer_unordered(self.relays.len())
.filter_map(|(relay, response)| async {
match response {
.filter_map(|(relay, result)| async {
match result {
Ok(Ok(bid)) => {
if let Err(err) = validate_bid(&bid, &relay.public_key, &self.context) {
warn!(%err, %relay, "invalid signed builder bid");
Expand Down Expand Up @@ -221,19 +250,20 @@ impl BlindedBlockProvider for RelayMux {
}
}

let relays_desc = best_relays
.iter()
.map(|relay| format!("{relay}"))
.reduce(|desc, next| format!("{desc}, {next}"))
.expect("at least one relay");
info!(%auction_request, %best_bid, relays=relays_desc, "acquired best bid");
let slot = auction_request.slot;
info!(
slot,
parent_hash = ?auction_request.parent_hash,
public_key = ?auction_request.public_key,
%best_bid,
relays = ?best_relays,
"acquired best bid"
);

{
let mut state = self.state.lock();
// assume the next request to open a bid corresponds to the current request
// TODO consider if the relay mux should have more knowledge about the proposal
state.latest_pubkey = auction_request.public_key.clone();
state.outstanding_bids.insert(auction_request.clone(), best_relays);
let auction_context = AuctionContext { slot, relays: best_relays };
state.outstanding_bids.insert(best_block_hash.clone(), Arc::new(auction_context));
}

Ok(best_bid.clone())
Expand All @@ -243,42 +273,47 @@ impl BlindedBlockProvider for RelayMux {
&self,
signed_block: &SignedBlindedBeaconBlock,
) -> Result<AuctionContents, Error> {
let (auction_request, relays) = {
let mut state = self.state.lock();
let key = bid_key_from(signed_block, &state.latest_pubkey);
// TODO: do not `remove` so this endpoint can be retried
let relays = state
.outstanding_bids
.remove(&key)
.ok_or_else::<Error, _>(|| BoostError::MissingOpenBid.into())?;
(key, relays)
};
let block = signed_block.message();
let slot = block.slot();
let body = block.body();
let expected_block_hash = body.execution_payload_header().block_hash().clone();
let context = self.get_context(&expected_block_hash)?;

let signed_block = &signed_block;
let responses = stream::iter(relays)
let responses = stream::iter(context.relays.iter().cloned())
.map(|relay| async move {
let response = relay.open_bid(signed_block).await;
(relay, response)
let request = relay.open_bid(signed_block);
let duration = Duration::from_secs(FETCH_PAYLOAD_TIME_OUT_SECS);
let result = timeout(duration, request).await;
(relay, result)
})
.buffer_unordered(self.relays.len())
.filter_map(|(relay, result)| async move {
match result {
Ok(response) => Some((relay, response)),
Err(_) => {
warn!( %relay, "timeout when opening bid");
None
}
}
})
.collect::<Vec<_>>()
.await;

let block = signed_block.message();
let block_body = block.body();
let payload_header = block_body.execution_payload_header();
let expected_block_hash = payload_header.block_hash();
for (relay, response) in responses.into_iter() {
match response {
Ok(auction_contents) => {
let block_hash = auction_contents.execution_payload().block_hash();
if block_hash == expected_block_hash {
info!(%auction_request, %block_hash, %relay, "acquired payload");
Ok(auction_contents) => match validate_payload(
&auction_contents,
&expected_block_hash,
body.blob_kzg_commitments().map(|commitments| commitments.as_slice()),
) {
Ok(_) => {
info!(%slot, block_hash = %expected_block_hash, %relay, "acquired payload");
return Ok(auction_contents)
} else {
warn!(?block_hash, ?expected_block_hash, %relay, "incorrect block hash delivered by relay");
}
}
Err(err) => {
warn!(?err, ?relay, "could not validate payload");
}
},
Err(err) => {
warn!(%err, %relay, "error opening bid");
}
Expand Down
Loading

0 comments on commit 711d04d

Please sign in to comment.