Skip to content

Commit

Permalink
consolidate Builder functionality into Auctioneer
Browse files Browse the repository at this point in the history
  • Loading branch information
ralexstokes committed Apr 29, 2024
1 parent 397cf56 commit bffd32d
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 295 deletions.
234 changes: 164 additions & 70 deletions mev-build-rs/src/auctioneer.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::{
auction_schedule::{AuctionSchedule, Proposals},
bidder::{AuctionContext, BidRequest, DeadlineBidder},
builder::{KeepAlive, Message as BuilderMessage},
service::ClockMessage,
auction_schedule::{AuctionSchedule, Proposals, Proposer},
bidder::{AuctionContext, BidRequest, DeadlineBidder, Message as BidderMessage},
payload::builder_attributes::{BuilderPayloadBuilderAttributes, ProposalAttributes},
service::{ClockMessage, DEFAULT_COMPONENT_CHANNEL_SIZE},
utils::compat::{to_bytes20, to_bytes32, to_execution_payload},
Error,
};
use ethereum_consensus::{
clock::convert_timestamp_to_slot,
crypto::SecretKey,
primitives::{BlsPublicKey, Epoch, Slot},
state_transition::Context,
Expand All @@ -18,18 +19,31 @@ use mev_rs::{
BlindedBlockRelayer, Relay,
};
use reth::{
api::PayloadBuilderAttributes,
payload::{EthBuiltPayload, PayloadId},
api::{EngineTypes, PayloadBuilderAttributes},
payload::{EthBuiltPayload, Events, PayloadBuilderHandle, PayloadId, PayloadStore},
tasks::TaskExecutor,
};
use serde::Deserialize;
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::{
broadcast,
mpsc::{Receiver, Sender},
oneshot,
mpsc::{self, Receiver, Sender},
};
use tracing::{info, warn};
use tokio_stream::StreamExt;
use tracing::{error, info, warn};

fn make_attributes_for_proposer(
attributes: &BuilderPayloadBuilderAttributes,
proposer: &Proposer,
) -> BuilderPayloadBuilderAttributes {
let proposal = ProposalAttributes {
proposer_gas_limit: proposer.gas_limit,
proposer_fee_recipient: proposer.fee_recipient,
};
let mut attributes = attributes.clone();
attributes.attach_proposal(proposal);
attributes
}

fn prepare_submission(
payload: EthBuiltPayload,
Expand All @@ -56,111 +70,190 @@ fn prepare_submission(

#[derive(Deserialize, Debug, Default, Clone)]
pub struct Config {
/// Secret key used to sign builder messages to relay
pub secret_key: SecretKey,
#[serde(skip)]
/// Public key corresponding to secret key
pub public_key: BlsPublicKey,
/// List of relays to submit bids
pub relays: Vec<String>,
}

pub enum Message {
ProposalQuery(Slot, oneshot::Sender<Option<Proposals>>),
// TODO: can likely scope to just payload ids, as long as they are linked to corresponding
// proposals and keep `AuctionContext` local to here
NewAuctions(Vec<AuctionContext>),
BuiltPayload(EthBuiltPayload),
}

pub struct Auctioneer {
msgs: Receiver<Message>,
pub struct Auctioneer<
Engine: EngineTypes<
PayloadBuilderAttributes = BuilderPayloadBuilderAttributes,
BuiltPayload = EthBuiltPayload,
>,
> {
clock: broadcast::Receiver<ClockMessage>,
builder: Sender<BuilderMessage>,
builder: PayloadBuilderHandle<Engine>,
payload_store: PayloadStore<Engine>,
relays: Vec<Arc<Relay>>,
auction_schedule: AuctionSchedule,
open_auctions: HashMap<PayloadId, Arc<AuctionContext>>,
executor: TaskExecutor,
config: Config,
context: Arc<Context>,
// TODO consolidate this somewhere...
genesis_time: u64,
bidder_tx: Sender<BidderMessage>,
bidder: Receiver<BidderMessage>,

auction_schedule: AuctionSchedule,
open_auctions: HashMap<PayloadId, Arc<AuctionContext>>,
}

impl Auctioneer {
impl<
Engine: EngineTypes<
PayloadBuilderAttributes = BuilderPayloadBuilderAttributes,
BuiltPayload = EthBuiltPayload,
> + 'static,
> Auctioneer<Engine>
{
pub fn new(
msgs: Receiver<Message>,
clock: broadcast::Receiver<ClockMessage>,
builder: Sender<BuilderMessage>,
builder: PayloadBuilderHandle<Engine>,
executor: TaskExecutor,
mut config: Config,
context: Arc<Context>,
genesis_time: u64,
) -> Self {
let relays = parse_relay_endpoints(&config.relays)
.into_iter()
.map(|endpoint| Arc::new(Relay::from(endpoint)))
.collect::<Vec<_>>();

config.public_key = config.secret_key.public_key();

let payload_store = builder.clone().into();

let (bidder_tx, bidder) = mpsc::channel(DEFAULT_COMPONENT_CHANNEL_SIZE);

Self {
msgs,
clock,
builder,
payload_store,
relays,
auction_schedule: Default::default(),
open_auctions: Default::default(),
executor,
config,
context,
genesis_time,
bidder_tx,
bidder,
auction_schedule: Default::default(),
open_auctions: Default::default(),
}
}

async fn on_epoch(&mut self, epoch: Epoch) {
// TODO: parallel fetch, join set?
// TODO: batch updates to auction schedule
// TODO: consider fast data access once this stabilizes
for relay in self.relays.iter() {
match relay.get_proposal_schedule().await {
Ok(schedule) => {
let slots = self.auction_schedule.process(relay.clone(), &schedule);
info!(epoch, ?slots, %relay, "processed proposer schedule");
}
Err(err) => {
warn!(err = %err, "error fetching proposer schedule from relay")
}
}
}

// NOTE: clear stale state
let slot = epoch * self.context.slots_per_epoch;
self.auction_schedule.clear(slot);
self.open_auctions.retain(|_, auction| auction.slot >= slot);
}

fn take_proposals(&mut self, slot: Slot) -> Option<Proposals> {
self.auction_schedule.take_matching_proposals(slot)
}

async fn process_proposals(
&self,
slot: Slot,
attributes: BuilderPayloadBuilderAttributes,
proposals: Proposals,
) -> Vec<AuctionContext> {
let mut new_auctions = vec![];
for (proposer, relays) in proposals {
let attributes = make_attributes_for_proposer(&attributes, &proposer);

if self.start_build(&attributes).await.is_some() {
// TODO: can likely skip full attributes in `AuctionContext`
// TODO: consider data layout here...
let auction = AuctionContext { slot, attributes, proposer, relays };
new_auctions.push(auction);
}
}
new_auctions
}

async fn start_build(&self, attributes: &BuilderPayloadBuilderAttributes) -> Option<PayloadId> {
// TODO: necessary to get response, other than no error?
match self.builder.new_payload(attributes.clone()).await {
Ok(payload_id) => {
let attributes_payload_id = attributes.payload_id();
if payload_id != attributes_payload_id {
error!(%payload_id, %attributes_payload_id, "mismatch between computed payload id and the one returned by the payload builder");
}
Some(payload_id)
}
Err(err) => {
warn!(%err, "builder could not start build with payload builder");
None
}
}
}

fn process_new_auction(&mut self, auction: AuctionContext) {
let payload_id = auction.attributes.payload_id();
self.open_auctions.insert(payload_id, Arc::new(auction));
let auction = self.open_auctions.get(&payload_id).unwrap().clone();
// TODO: consider data layout in `open_auctions`
let auction = self.open_auctions.entry(payload_id).or_insert_with(|| Arc::new(auction));

let builder = self.builder.clone();
// TODO refactor into independent actor
// this works for now, but want bidding to happen on separate thread
let auctioneer = self.bidder_tx.clone();
let auction = auction.clone();
self.executor.spawn_blocking(async move {
let deadline = Duration::from_secs(1);
let bidder = DeadlineBidder::new(deadline);
match bidder.make_bid(&auction).await {
BidRequest::Ready(payload_id) => {
builder
.send(BuilderMessage::FetchPayload(payload_id, KeepAlive::No))
.await
.expect("can send");
BidRequest::Ready(payload_id, keep_alive) => {
auctioneer.send((payload_id, keep_alive)).await.expect("can send");
}
}
});
}

fn process_new_auctions(&mut self, auctions: Vec<AuctionContext>) {
for auction in auctions {
self.process_new_auction(auction);
async fn on_payload_attributes(&mut self, attributes: BuilderPayloadBuilderAttributes) {
// TODO: ignore already processed attributes

let slot = convert_timestamp_to_slot(
attributes.timestamp(),
self.genesis_time,
self.context.seconds_per_slot,
)
.expect("is past genesis");
// TODO: consolidate once stable
if let Some(proposals) = self.take_proposals(slot) {
let auctions = self.process_proposals(slot, attributes, proposals).await;
for auction in auctions {
self.process_new_auction(auction);
}
}
}

async fn on_epoch(&mut self, epoch: Epoch) {
// TODO: concurrent fetch
// TODO: batch updates to auction schedule
for relay in self.relays.iter() {
match relay.get_proposal_schedule().await {
Ok(schedule) => {
let slots = self.auction_schedule.process(relay.clone(), &schedule);
info!(epoch, ?slots, %relay, "processed proposer schedule");
}
Err(err) => {
warn!(err = %err, "error fetching proposer schedule from relay")
}
async fn process_bid(&mut self, (payload_id, _keep_alive): BidderMessage) {
// TODO: may want to keep payload job running...
if let Some(payload) = self.payload_store.resolve(payload_id).await {
match payload {
Ok(payload) => self.submit_payload(payload).await,
Err(err) => warn!(%err, "payload resolution failed"),
}
} else {
warn!(%payload_id, "no payload could be retrieved from payload store for bid")
}

let slot = epoch * self.context.slots_per_epoch;
self.auction_schedule.clear(slot);

self.open_auctions.retain(|_, auction| auction.slot >= slot);
}

async fn submit_payload(&self, payload: EthBuiltPayload) {
Expand All @@ -183,6 +276,7 @@ impl Auctioneer {
) {
Ok(signed_submission) => {
let relays = &auction.relays;
// TODO: parallel dispatch
for relay in relays {
if let Err(err) = relay.submit_bid(&signed_submission).await {
warn!(%err, ?relay, slot = auction.slot, "could not submit payload");
Expand All @@ -195,21 +289,14 @@ impl Auctioneer {
}
}

async fn dispatch(&mut self, message: Message) {
use Message::*;
match message {
ProposalQuery(slot, tx) => {
let proposals = self.take_proposals(slot);
tx.send(proposals).expect("can send");
}
NewAuctions(auctions) => self.process_new_auctions(auctions),
BuiltPayload(payload) => self.submit_payload(payload).await,
}
async fn dispatch_clock(&mut self, message: ClockMessage) {
let ClockMessage::NewEpoch(epoch) = message;
self.on_epoch(epoch).await;
}

async fn dispatch_clock(&mut self, message: ClockMessage) {
if let ClockMessage::NewEpoch(epoch) = message {
self.on_epoch(epoch).await;
async fn dispatch_payload_event(&mut self, event: Events<Engine>) {
if let Events::Attributes(attributes) = event {
self.on_payload_attributes(attributes).await;
}
}

Expand All @@ -224,10 +311,17 @@ impl Auctioneer {
}
}

let mut payload_events =
self.builder.subscribe().await.expect("can subscribe to events").into_stream();

loop {
tokio::select! {
Some(message) = self.msgs.recv() => self.dispatch(message).await,
Ok(message) = self.clock.recv() => self.dispatch_clock(message).await,
Some(event) = payload_events.next() => match event {
Ok(event) => self.dispatch_payload_event(event).await,
Err(err) => warn!(%err, "error getting payload event"),
},
Some(message) = self.bidder.recv() => self.process_bid(message).await,
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions mev-build-rs/src/bidder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ use reth::{api::PayloadBuilderAttributes, payload::PayloadId};
use std::time::Duration;
use tokio::time::sleep;

pub type Message = (PayloadId, KeepAlive);

#[derive(Debug)]
pub enum KeepAlive {
No,
}

#[derive(Debug)]
pub struct AuctionContext {
pub slot: Slot,
Expand All @@ -25,7 +32,7 @@ pub struct DeadlineBidder {
}

pub enum BidRequest {
Ready(PayloadId),
Ready(PayloadId, KeepAlive),
}

impl DeadlineBidder {
Expand All @@ -37,6 +44,6 @@ impl DeadlineBidder {
let target = duration_until(auction.attributes.timestamp());
let duration = target.checked_sub(self.deadline).unwrap_or_default();
sleep(duration).await;
BidRequest::Ready(auction.attributes.payload_id())
BidRequest::Ready(auction.attributes.payload_id(), KeepAlive::No)
}
}
Loading

0 comments on commit bffd32d

Please sign in to comment.