From ae022d265dd22f7e43cacdf1d3cd5bd1021549eb Mon Sep 17 00:00:00 2001 From: Alex Stokes Date: Thu, 2 May 2024 10:37:27 -0600 Subject: [PATCH] bugfix: fetch relay proposer schedules more frequently avoids a race condition at epoch boundary where builder is faster than relay to update --- mev-build-rs/src/auctioneer/service.rs | 30 ++++++++++++++++++++------ mev-build-rs/src/service.rs | 12 ++++++++++- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/mev-build-rs/src/auctioneer/service.rs b/mev-build-rs/src/auctioneer/service.rs index 75941d35..c7e64373 100644 --- a/mev-build-rs/src/auctioneer/service.rs +++ b/mev-build-rs/src/auctioneer/service.rs @@ -33,7 +33,11 @@ use tokio::sync::{ mpsc::{Receiver, Sender}, }; use tokio_stream::StreamExt; -use tracing::{error, info, trace, warn}; +use tracing::{debug, error, info, trace, warn}; + +// Fetch new proposer schedules from all connected relays at this period into the epoch +// E.g. a value of `2` corresponds to being half-way into the epoch. +const PROPOSAL_SCHEDULE_INTERVAL: u64 = 2; fn make_attributes_for_proposer( attributes: &BuilderPayloadBuilderAttributes, @@ -176,22 +180,33 @@ impl< } } - async fn on_epoch(&mut self, epoch: Epoch) { - // TODO: parallel fetch, join set? + async fn fetch_proposer_schedules(&mut self, slot: Slot) { + // TODO: consider moving to new task on another thread, can do parallel fetch (join set) + // and not block others at this interval // TODO: batch updates to auction schedule // TODO: consider fast data access once this stabilizes for relay in self.relays.iter() { match relay.get_proposal_schedule().await { Ok(schedule) => { let slots = self.auction_schedule.process(relay.clone(), &schedule); - info!(epoch, ?slots, %relay, "processed proposer schedule"); + info!(slot, ?slots, %relay, "processed proposer schedule"); } Err(err) => { warn!(err = %err, "error fetching proposer schedule from relay") } } } + } + async fn on_slot(&mut self, slot: Slot) { + debug!(slot, "processed"); + if (slot * PROPOSAL_SCHEDULE_INTERVAL) % self.context.slots_per_epoch == 0 { + self.fetch_proposer_schedules(slot).await; + } + } + + async fn on_epoch(&mut self, epoch: Epoch) { + debug!(epoch, "processed"); // NOTE: clear stale state let retain_slot = epoch * self.context.slots_per_epoch; self.auction_schedule.clear(retain_slot); @@ -351,8 +366,11 @@ impl< } async fn process_clock(&mut self, message: ClockMessage) { - let ClockMessage::NewEpoch(epoch) = message; - self.on_epoch(epoch).await; + use ClockMessage::*; + match message { + NewSlot(slot) => self.on_slot(slot).await, + NewEpoch(epoch) => self.on_epoch(epoch).await, + } } async fn process_payload_event(&mut self, event: Events) { diff --git a/mev-build-rs/src/service.rs b/mev-build-rs/src/service.rs index 4a1c563f..e27ba093 100644 --- a/mev-build-rs/src/service.rs +++ b/mev-build-rs/src/service.rs @@ -7,7 +7,8 @@ use crate::{ }, }; use ethereum_consensus::{ - clock::SystemClock, networks::Network, primitives::Epoch, state_transition::Context, + clock::SystemClock, deneb::Slot, networks::Network, primitives::Epoch, + state_transition::Context, }; use eyre::OptionExt; use mev_rs::{get_genesis_time, Error}; @@ -149,12 +150,20 @@ pub async fn launch( // TODO: block on sync here to avoid spurious first PA? + if let Err(err) = clock_tx.send(ClockMessage::NewSlot(current_slot)) { + let msg = err.0; + warn!(?msg, "could not update receivers with new slot") + } if let Err(err) = clock_tx.send(ClockMessage::NewEpoch(current_epoch)) { let msg = err.0; warn!(?msg, "could not update receivers with new epoch"); } while let Some(slot) = slots.next().await { + if let Err(err) = clock_tx.send(ClockMessage::NewSlot(slot)) { + let msg = err.0; + warn!(?msg, "could not update receivers with new slot") + } let epoch = clock.epoch_for(slot); if epoch > current_epoch { current_epoch = epoch; @@ -171,5 +180,6 @@ pub async fn launch( #[derive(Debug, Clone)] pub enum ClockMessage { + NewSlot(Slot), NewEpoch(Epoch), }