Skip to content

Commit

Permalink
Merge pull request #232 from ralexstokes/debug-proposer-scheduling
Browse files Browse the repository at this point in the history
bugfix: fetch relay proposer schedules more frequently
  • Loading branch information
ralexstokes authored May 2, 2024
2 parents fea9aae + ae022d2 commit 8b75efe
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 7 deletions.
30 changes: 24 additions & 6 deletions mev-build-rs/src/auctioneer/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ use tokio::sync::{
mpsc::{Receiver, Sender},
};
use tokio_stream::StreamExt;
use tracing::{error, info, trace, warn};
use tracing::{debug, error, info, trace, warn};

// Fetch new proposer schedules from all connected relays at this period into the epoch
// E.g. a value of `2` corresponds to being half-way into the epoch.
const PROPOSAL_SCHEDULE_INTERVAL: u64 = 2;

fn make_attributes_for_proposer(
attributes: &BuilderPayloadBuilderAttributes,
Expand Down Expand Up @@ -176,22 +180,33 @@ impl<
}
}

async fn on_epoch(&mut self, epoch: Epoch) {
// TODO: parallel fetch, join set?
async fn fetch_proposer_schedules(&mut self, slot: Slot) {
// TODO: consider moving to new task on another thread, can do parallel fetch (join set)
// and not block others at this interval
// TODO: batch updates to auction schedule
// TODO: consider fast data access once this stabilizes
for relay in self.relays.iter() {
match relay.get_proposal_schedule().await {
Ok(schedule) => {
let slots = self.auction_schedule.process(relay.clone(), &schedule);
info!(epoch, ?slots, %relay, "processed proposer schedule");
info!(slot, ?slots, %relay, "processed proposer schedule");
}
Err(err) => {
warn!(err = %err, "error fetching proposer schedule from relay")
}
}
}
}

async fn on_slot(&mut self, slot: Slot) {
debug!(slot, "processed");
if (slot * PROPOSAL_SCHEDULE_INTERVAL) % self.context.slots_per_epoch == 0 {
self.fetch_proposer_schedules(slot).await;
}
}

async fn on_epoch(&mut self, epoch: Epoch) {
debug!(epoch, "processed");
// NOTE: clear stale state
let retain_slot = epoch * self.context.slots_per_epoch;
self.auction_schedule.clear(retain_slot);
Expand Down Expand Up @@ -351,8 +366,11 @@ impl<
}

async fn process_clock(&mut self, message: ClockMessage) {
let ClockMessage::NewEpoch(epoch) = message;
self.on_epoch(epoch).await;
use ClockMessage::*;
match message {
NewSlot(slot) => self.on_slot(slot).await,
NewEpoch(epoch) => self.on_epoch(epoch).await,
}
}

async fn process_payload_event(&mut self, event: Events<Engine>) {
Expand Down
12 changes: 11 additions & 1 deletion mev-build-rs/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use crate::{
},
};
use ethereum_consensus::{
clock::SystemClock, networks::Network, primitives::Epoch, state_transition::Context,
clock::SystemClock, deneb::Slot, networks::Network, primitives::Epoch,
state_transition::Context,
};
use eyre::OptionExt;
use mev_rs::{get_genesis_time, Error};
Expand Down Expand Up @@ -149,12 +150,20 @@ pub async fn launch(

// TODO: block on sync here to avoid spurious first PA?

if let Err(err) = clock_tx.send(ClockMessage::NewSlot(current_slot)) {
let msg = err.0;
warn!(?msg, "could not update receivers with new slot")
}
if let Err(err) = clock_tx.send(ClockMessage::NewEpoch(current_epoch)) {
let msg = err.0;
warn!(?msg, "could not update receivers with new epoch");
}

while let Some(slot) = slots.next().await {
if let Err(err) = clock_tx.send(ClockMessage::NewSlot(slot)) {
let msg = err.0;
warn!(?msg, "could not update receivers with new slot")
}
let epoch = clock.epoch_for(slot);
if epoch > current_epoch {
current_epoch = epoch;
Expand All @@ -171,5 +180,6 @@ pub async fn launch(

#[derive(Debug, Clone)]
pub enum ClockMessage {
NewSlot(Slot),
NewEpoch(Epoch),
}

0 comments on commit 8b75efe

Please sign in to comment.