diff --git a/mev-build-rs/src/auctioneer.rs b/mev-build-rs/src/auctioneer.rs index 5517601e..a3ad809b 100644 --- a/mev-build-rs/src/auctioneer.rs +++ b/mev-build-rs/src/auctioneer.rs @@ -1,12 +1,13 @@ use crate::{ - auction_schedule::{AuctionSchedule, Proposals}, - bidder::{AuctionContext, BidRequest, DeadlineBidder}, - builder::{KeepAlive, Message as BuilderMessage}, - service::ClockMessage, + auction_schedule::{AuctionSchedule, Proposals, Proposer}, + bidder::{AuctionContext, BidRequest, DeadlineBidder, Message as BidderMessage}, + payload::builder_attributes::{BuilderPayloadBuilderAttributes, ProposalAttributes}, + service::{ClockMessage, DEFAULT_COMPONENT_CHANNEL_SIZE}, utils::compat::{to_bytes20, to_bytes32, to_execution_payload}, Error, }; use ethereum_consensus::{ + clock::convert_timestamp_to_slot, crypto::SecretKey, primitives::{BlsPublicKey, Epoch, Slot}, state_transition::Context, @@ -18,18 +19,31 @@ use mev_rs::{ BlindedBlockRelayer, Relay, }; use reth::{ - api::PayloadBuilderAttributes, - payload::{EthBuiltPayload, PayloadId}, + api::{EngineTypes, PayloadBuilderAttributes}, + payload::{EthBuiltPayload, Events, PayloadBuilderHandle, PayloadId, PayloadStore}, tasks::TaskExecutor, }; use serde::Deserialize; use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::sync::{ broadcast, - mpsc::{Receiver, Sender}, - oneshot, + mpsc::{self, Receiver, Sender}, }; -use tracing::{info, warn}; +use tokio_stream::StreamExt; +use tracing::{error, info, warn}; + +fn make_attributes_for_proposer( + attributes: &BuilderPayloadBuilderAttributes, + proposer: &Proposer, +) -> BuilderPayloadBuilderAttributes { + let proposal = ProposalAttributes { + proposer_gas_limit: proposer.gas_limit, + proposer_fee_recipient: proposer.fee_recipient, + }; + let mut attributes = attributes.clone(); + attributes.attach_proposal(proposal); + attributes +} fn prepare_submission( payload: EthBuiltPayload, @@ -56,40 +70,51 @@ fn prepare_submission( #[derive(Deserialize, Debug, Default, Clone)] pub struct Config { + /// Secret key used to sign builder messages to relay pub secret_key: SecretKey, #[serde(skip)] + /// Public key corresponding to secret key pub public_key: BlsPublicKey, + /// List of relays to submit bids pub relays: Vec, } -pub enum Message { - ProposalQuery(Slot, oneshot::Sender>), - // TODO: can likely scope to just payload ids, as long as they are linked to corresponding - // proposals and keep `AuctionContext` local to here - NewAuctions(Vec), - BuiltPayload(EthBuiltPayload), -} - -pub struct Auctioneer { - msgs: Receiver, +pub struct Auctioneer< + Engine: EngineTypes< + PayloadBuilderAttributes = BuilderPayloadBuilderAttributes, + BuiltPayload = EthBuiltPayload, + >, +> { clock: broadcast::Receiver, - builder: Sender, + builder: PayloadBuilderHandle, + payload_store: PayloadStore, relays: Vec>, - auction_schedule: AuctionSchedule, - open_auctions: HashMap>, executor: TaskExecutor, config: Config, context: Arc, + // TODO consolidate this somewhere... + genesis_time: u64, + bidder_tx: Sender, + bidder: Receiver, + + auction_schedule: AuctionSchedule, + open_auctions: HashMap>, } -impl Auctioneer { +impl< + Engine: EngineTypes< + PayloadBuilderAttributes = BuilderPayloadBuilderAttributes, + BuiltPayload = EthBuiltPayload, + > + 'static, + > Auctioneer +{ pub fn new( - msgs: Receiver, clock: broadcast::Receiver, - builder: Sender, + builder: PayloadBuilderHandle, executor: TaskExecutor, mut config: Config, context: Arc, + genesis_time: u64, ) -> Self { let relays = parse_relay_endpoints(&config.relays) .into_iter() @@ -97,70 +122,138 @@ impl Auctioneer { .collect::>(); config.public_key = config.secret_key.public_key(); + + let payload_store = builder.clone().into(); + + let (bidder_tx, bidder) = mpsc::channel(DEFAULT_COMPONENT_CHANNEL_SIZE); + Self { - msgs, clock, builder, + payload_store, relays, - auction_schedule: Default::default(), - open_auctions: Default::default(), executor, config, context, + genesis_time, + bidder_tx, + bidder, + auction_schedule: Default::default(), + open_auctions: Default::default(), + } + } + + async fn on_epoch(&mut self, epoch: Epoch) { + // TODO: parallel fetch, join set? + // TODO: batch updates to auction schedule + // TODO: consider fast data access once this stabilizes + for relay in self.relays.iter() { + match relay.get_proposal_schedule().await { + Ok(schedule) => { + let slots = self.auction_schedule.process(relay.clone(), &schedule); + info!(epoch, ?slots, %relay, "processed proposer schedule"); + } + Err(err) => { + warn!(err = %err, "error fetching proposer schedule from relay") + } + } } + + // NOTE: clear stale state + let slot = epoch * self.context.slots_per_epoch; + self.auction_schedule.clear(slot); + self.open_auctions.retain(|_, auction| auction.slot >= slot); } fn take_proposals(&mut self, slot: Slot) -> Option { self.auction_schedule.take_matching_proposals(slot) } + async fn process_proposals( + &self, + slot: Slot, + attributes: BuilderPayloadBuilderAttributes, + proposals: Proposals, + ) -> Vec { + let mut new_auctions = vec![]; + for (proposer, relays) in proposals { + let attributes = make_attributes_for_proposer(&attributes, &proposer); + + if self.start_build(&attributes).await.is_some() { + // TODO: can likely skip full attributes in `AuctionContext` + // TODO: consider data layout here... + let auction = AuctionContext { slot, attributes, proposer, relays }; + new_auctions.push(auction); + } + } + new_auctions + } + + async fn start_build(&self, attributes: &BuilderPayloadBuilderAttributes) -> Option { + // TODO: necessary to get response, other than no error? + match self.builder.new_payload(attributes.clone()).await { + Ok(payload_id) => { + let attributes_payload_id = attributes.payload_id(); + if payload_id != attributes_payload_id { + error!(%payload_id, %attributes_payload_id, "mismatch between computed payload id and the one returned by the payload builder"); + } + Some(payload_id) + } + Err(err) => { + warn!(%err, "builder could not start build with payload builder"); + None + } + } + } + fn process_new_auction(&mut self, auction: AuctionContext) { let payload_id = auction.attributes.payload_id(); - self.open_auctions.insert(payload_id, Arc::new(auction)); - let auction = self.open_auctions.get(&payload_id).unwrap().clone(); + // TODO: consider data layout in `open_auctions` + let auction = self.open_auctions.entry(payload_id).or_insert_with(|| Arc::new(auction)); - let builder = self.builder.clone(); // TODO refactor into independent actor // this works for now, but want bidding to happen on separate thread + let auctioneer = self.bidder_tx.clone(); + let auction = auction.clone(); self.executor.spawn_blocking(async move { let deadline = Duration::from_secs(1); let bidder = DeadlineBidder::new(deadline); match bidder.make_bid(&auction).await { - BidRequest::Ready(payload_id) => { - builder - .send(BuilderMessage::FetchPayload(payload_id, KeepAlive::No)) - .await - .expect("can send"); + BidRequest::Ready(payload_id, keep_alive) => { + auctioneer.send((payload_id, keep_alive)).await.expect("can send"); } } }); } - fn process_new_auctions(&mut self, auctions: Vec) { - for auction in auctions { - self.process_new_auction(auction); + async fn on_payload_attributes(&mut self, attributes: BuilderPayloadBuilderAttributes) { + // TODO: ignore already processed attributes + + let slot = convert_timestamp_to_slot( + attributes.timestamp(), + self.genesis_time, + self.context.seconds_per_slot, + ) + .expect("is past genesis"); + // TODO: consolidate once stable + if let Some(proposals) = self.take_proposals(slot) { + let auctions = self.process_proposals(slot, attributes, proposals).await; + for auction in auctions { + self.process_new_auction(auction); + } } } - async fn on_epoch(&mut self, epoch: Epoch) { - // TODO: concurrent fetch - // TODO: batch updates to auction schedule - for relay in self.relays.iter() { - match relay.get_proposal_schedule().await { - Ok(schedule) => { - let slots = self.auction_schedule.process(relay.clone(), &schedule); - info!(epoch, ?slots, %relay, "processed proposer schedule"); - } - Err(err) => { - warn!(err = %err, "error fetching proposer schedule from relay") - } + async fn process_bid(&mut self, (payload_id, _keep_alive): BidderMessage) { + // TODO: may want to keep payload job running... + if let Some(payload) = self.payload_store.resolve(payload_id).await { + match payload { + Ok(payload) => self.submit_payload(payload).await, + Err(err) => warn!(%err, "payload resolution failed"), } + } else { + warn!(%payload_id, "no payload could be retrieved from payload store for bid") } - - let slot = epoch * self.context.slots_per_epoch; - self.auction_schedule.clear(slot); - - self.open_auctions.retain(|_, auction| auction.slot >= slot); } async fn submit_payload(&self, payload: EthBuiltPayload) { @@ -183,6 +276,7 @@ impl Auctioneer { ) { Ok(signed_submission) => { let relays = &auction.relays; + // TODO: parallel dispatch for relay in relays { if let Err(err) = relay.submit_bid(&signed_submission).await { warn!(%err, ?relay, slot = auction.slot, "could not submit payload"); @@ -195,21 +289,14 @@ impl Auctioneer { } } - async fn dispatch(&mut self, message: Message) { - use Message::*; - match message { - ProposalQuery(slot, tx) => { - let proposals = self.take_proposals(slot); - tx.send(proposals).expect("can send"); - } - NewAuctions(auctions) => self.process_new_auctions(auctions), - BuiltPayload(payload) => self.submit_payload(payload).await, - } + async fn dispatch_clock(&mut self, message: ClockMessage) { + let ClockMessage::NewEpoch(epoch) = message; + self.on_epoch(epoch).await; } - async fn dispatch_clock(&mut self, message: ClockMessage) { - if let ClockMessage::NewEpoch(epoch) = message { - self.on_epoch(epoch).await; + async fn dispatch_payload_event(&mut self, event: Events) { + if let Events::Attributes(attributes) = event { + self.on_payload_attributes(attributes).await; } } @@ -224,10 +311,17 @@ impl Auctioneer { } } + let mut payload_events = + self.builder.subscribe().await.expect("can subscribe to events").into_stream(); + loop { tokio::select! { - Some(message) = self.msgs.recv() => self.dispatch(message).await, Ok(message) = self.clock.recv() => self.dispatch_clock(message).await, + Some(event) = payload_events.next() => match event { + Ok(event) => self.dispatch_payload_event(event).await, + Err(err) => warn!(%err, "error getting payload event"), + }, + Some(message) = self.bidder.recv() => self.process_bid(message).await, } } } diff --git a/mev-build-rs/src/bidder.rs b/mev-build-rs/src/bidder.rs index 5f0ffa94..c541bb77 100644 --- a/mev-build-rs/src/bidder.rs +++ b/mev-build-rs/src/bidder.rs @@ -7,6 +7,13 @@ use reth::{api::PayloadBuilderAttributes, payload::PayloadId}; use std::time::Duration; use tokio::time::sleep; +pub type Message = (PayloadId, KeepAlive); + +#[derive(Debug)] +pub enum KeepAlive { + No, +} + #[derive(Debug)] pub struct AuctionContext { pub slot: Slot, @@ -25,7 +32,7 @@ pub struct DeadlineBidder { } pub enum BidRequest { - Ready(PayloadId), + Ready(PayloadId, KeepAlive), } impl DeadlineBidder { @@ -37,6 +44,6 @@ impl DeadlineBidder { let target = duration_until(auction.attributes.timestamp()); let duration = target.checked_sub(self.deadline).unwrap_or_default(); sleep(duration).await; - BidRequest::Ready(auction.attributes.payload_id()) + BidRequest::Ready(auction.attributes.payload_id(), KeepAlive::No) } } diff --git a/mev-build-rs/src/builder.rs b/mev-build-rs/src/builder.rs deleted file mode 100644 index 14246522..00000000 --- a/mev-build-rs/src/builder.rs +++ /dev/null @@ -1,196 +0,0 @@ -use crate::{ - auction_schedule::{Proposals, Proposer}, - auctioneer::Message as AuctioneerMessage, - bidder::AuctionContext, - payload::builder_attributes::{BuilderPayloadBuilderAttributes, ProposalAttributes}, - Error, -}; -use ethereum_consensus::{ - clock::convert_timestamp_to_slot, primitives::Slot, state_transition::Context, -}; -use reth::{ - api::{EngineTypes, PayloadBuilderAttributes}, - payload::{EthBuiltPayload, Events, PayloadBuilderHandle, PayloadId, PayloadStore}, - primitives::{Address, Bytes}, -}; -use serde::Deserialize; -use std::sync::Arc; -use tokio::sync::{ - mpsc::{Receiver, Sender}, - oneshot, -}; -use tokio_stream::StreamExt; -use tracing::{error, warn}; - -fn make_attributes_for_proposer( - attributes: &BuilderPayloadBuilderAttributes, - proposer: &Proposer, -) -> BuilderPayloadBuilderAttributes { - let proposal = ProposalAttributes { - proposer_gas_limit: proposer.gas_limit, - proposer_fee_recipient: proposer.fee_recipient, - }; - let mut attributes = attributes.clone(); - attributes.attach_proposal(proposal); - attributes -} - -pub enum KeepAlive { - No, -} - -pub enum Message { - FetchPayload(PayloadId, KeepAlive), -} - -#[derive(Deserialize, Debug, Default, Clone)] -pub struct Config { - pub fee_recipient: Option
, - pub genesis_time: Option, - pub extra_data: Option, - pub execution_mnemonic: String, -} - -pub struct Builder< - Engine: EngineTypes< - PayloadBuilderAttributes = BuilderPayloadBuilderAttributes, - BuiltPayload = EthBuiltPayload, - >, -> { - msgs: Receiver, - auctioneer: Sender, - payload_builder: PayloadBuilderHandle, - payload_store: PayloadStore, - context: Arc, - genesis_time: u64, -} - -impl< - Engine: EngineTypes< - PayloadBuilderAttributes = BuilderPayloadBuilderAttributes, - BuiltPayload = EthBuiltPayload, - > + 'static, - > Builder -{ - pub fn new( - msgs: Receiver, - auctioneer: Sender, - payload_builder: PayloadBuilderHandle, - context: Arc, - genesis_time: u64, - ) -> Self { - let payload_store = payload_builder.clone().into(); - Self { msgs, auctioneer, payload_builder, payload_store, context, genesis_time } - } - - pub async fn process_proposals( - &self, - slot: Slot, - attributes: BuilderPayloadBuilderAttributes, - proposals: Option, - ) -> Result, Error> { - let mut new_auctions = vec![]; - - if let Some(proposals) = proposals { - for (proposer, relays) in proposals { - let attributes = make_attributes_for_proposer(&attributes, &proposer); - - if self.start_build(&attributes).await.is_some() { - // TODO: can likely skip full attributes in `AuctionContext` - let auction = AuctionContext { slot, attributes, proposer, relays }; - new_auctions.push(auction); - } - } - } - Ok(new_auctions) - } - - async fn start_build(&self, attributes: &BuilderPayloadBuilderAttributes) -> Option { - match self.payload_builder.new_payload(attributes.clone()).await { - Ok(payload_id) => { - let attributes_payload_id = attributes.payload_id(); - if payload_id != attributes_payload_id { - error!(%payload_id, %attributes_payload_id, "mismatch between computed payload id and the one returned by the payload builder"); - } - Some(payload_id) - } - Err(err) => { - warn!(%err, "builder could not start build with payload builder"); - None - } - } - } - - async fn on_payload_attributes(&self, attributes: BuilderPayloadBuilderAttributes) { - // TODO: ignore already processed attributes - - // TODO: move slot calc to auctioneer? - let slot = convert_timestamp_to_slot( - attributes.timestamp(), - self.genesis_time, - self.context.seconds_per_slot, - ) - .expect("is past genesis"); - let (tx, rx) = oneshot::channel(); - self.auctioneer.send(AuctioneerMessage::ProposalQuery(slot, tx)).await.expect("can send"); - let proposals = rx.await.expect("can recv"); - let auctions = self.process_proposals(slot, attributes, proposals).await; - match auctions { - Ok(auctions) => { - self.auctioneer - .send(AuctioneerMessage::NewAuctions(auctions)) - .await - .expect("can send"); - } - Err(err) => { - warn!(%err, "could not send new auctions to auctioneer"); - } - } - } - - async fn send_payload_to_auctioneer(&self, payload_id: PayloadId, _keep_alive: KeepAlive) { - let maybe_payload = self.payload_store.resolve(payload_id).await; - if let Some(payload) = maybe_payload { - match payload { - Ok(payload) => self - .auctioneer - .send(AuctioneerMessage::BuiltPayload(payload)) - .await - .expect("can send"), - Err(err) => { - warn!(%err, %payload_id, "error resolving payload") - } - } - } else { - warn!(%payload_id, "could not resolve payload") - } - } - - async fn dispatch(&self, message: Message) { - match message { - Message::FetchPayload(payload_id, keep_alive) => { - self.send_payload_to_auctioneer(payload_id, keep_alive).await; - } - } - } - - pub async fn spawn(mut self) { - let mut payload_events = - self.payload_builder.subscribe().await.expect("can subscribe to events").into_stream(); - loop { - tokio::select! { - Some(message) = self.msgs.recv() => self.dispatch(message).await, - Some(event) = payload_events.next() => match event { - Ok(event) => { - if let Events::Attributes(attributes) = event { - self.on_payload_attributes(attributes).await; - } - } - Err(err) => { - warn!(%err, "error getting payload events"); - } - } - } - } - } -} diff --git a/mev-build-rs/src/lib.rs b/mev-build-rs/src/lib.rs index f53d3a18..3e73702c 100644 --- a/mev-build-rs/src/lib.rs +++ b/mev-build-rs/src/lib.rs @@ -1,7 +1,6 @@ mod auction_schedule; mod auctioneer; mod bidder; -mod builder; mod error; mod node; mod payload; diff --git a/mev-build-rs/src/payload/service_builder.rs b/mev-build-rs/src/payload/service_builder.rs index 2fd41e33..c90a0894 100644 --- a/mev-build-rs/src/payload/service_builder.rs +++ b/mev-build-rs/src/payload/service_builder.rs @@ -1,10 +1,10 @@ use crate::{ - builder::Config, node::BuilderEngineTypes, payload::{ builder::PayloadBuilder, job_generator::{PayloadJobGenerator, PayloadJobGeneratorConfig}, }, + service::BuilderConfig as Config, Error, }; use alloy_signer_wallet::{coins_bip39::English, LocalWallet, MnemonicBuilder}; diff --git a/mev-build-rs/src/service.rs b/mev-build-rs/src/service.rs index 782ed6f1..2fd5461d 100644 --- a/mev-build-rs/src/service.rs +++ b/mev-build-rs/src/service.rs @@ -1,6 +1,5 @@ use crate::{ auctioneer::{Auctioneer, Config as AuctioneerConfig}, - builder::{Builder, Config as BuilderConfig}, node::BuilderNode, payload::{ builder_attributes::BuilderPayloadBuilderAttributes, service_builder::PayloadServiceBuilder, @@ -15,17 +14,14 @@ use reth::{ api::EngineTypes, builder::{InitState, WithLaunchContext}, payload::{EthBuiltPayload, PayloadBuilderHandle}, - primitives::NamedChain, + primitives::{Address, Bytes, NamedChain}, tasks::TaskExecutor, }; use reth_db::DatabaseEnv; use serde::Deserialize; use std::{path::PathBuf, sync::Arc}; use tokio::{ - sync::{ - broadcast::{self, Sender}, - mpsc, - }, + sync::broadcast::{self, Sender}, time::sleep, }; use tokio_stream::StreamExt; @@ -33,6 +29,14 @@ use tracing::warn; pub const DEFAULT_COMPONENT_CHANNEL_SIZE: usize = 16; +#[derive(Deserialize, Debug, Default, Clone)] +pub struct BuilderConfig { + pub fee_recipient: Option
, + pub genesis_time: Option, + pub extra_data: Option, + pub execution_mnemonic: String, +} + #[derive(Deserialize, Debug, Default, Clone)] pub struct Config { // TODO: move to bidder @@ -58,8 +62,7 @@ pub struct Services< BuiltPayload = EthBuiltPayload, >, > { - pub auctioneer: Auctioneer, - pub builder: Builder, + pub auctioneer: Auctioneer, pub clock: SystemClock, pub clock_tx: Sender, pub context: Arc, @@ -76,29 +79,23 @@ pub async fn construct< task_executor: TaskExecutor, payload_builder: PayloadBuilderHandle, ) -> Result, Error> { - let (clock_tx, clock_rx) = broadcast::channel(DEFAULT_COMPONENT_CHANNEL_SIZE); let context = Arc::new(Context::try_from(network)?); let genesis_time = get_genesis_time(&context, config.beacon_node_url.as_ref(), None).await; let clock = context.clock_at(genesis_time); - let (builder_tx, builder_rx) = mpsc::channel(DEFAULT_COMPONENT_CHANNEL_SIZE); - let (auctioneer_tx, auctioneer_rx) = mpsc::channel(DEFAULT_COMPONENT_CHANNEL_SIZE); - - let builder = - Builder::new(builder_rx, auctioneer_tx, payload_builder, context.clone(), genesis_time); - + let (clock_tx, clock_rx) = broadcast::channel(DEFAULT_COMPONENT_CHANNEL_SIZE); let auctioneer = Auctioneer::new( - auctioneer_rx, clock_rx, - builder_tx, + payload_builder, task_executor, config.auctioneer, context.clone(), + genesis_time, ); - Ok(Services { auctioneer, builder, clock, clock_tx, context }) + Ok(Services { auctioneer, clock, clock_tx, context }) } fn custom_network_from_config_directory(path: PathBuf) -> Network { @@ -140,11 +137,10 @@ pub async fn launch( let task_executor = handle.node.task_executor.clone(); let payload_builder = handle.node.payload_builder.clone(); - let Services { auctioneer, builder, clock, clock_tx, context } = + let Services { auctioneer, clock, clock_tx, context } = construct(network, config, task_executor, payload_builder).await?; handle.node.task_executor.spawn_critical("mev-builder/auctioneer", auctioneer.spawn()); - handle.node.task_executor.spawn_critical("mev-builder/builder", builder.spawn()); handle.node.task_executor.spawn_critical("mev-builder/clock", async move { if clock.before_genesis() { let duration = clock.duration_until_next_slot(); @@ -152,15 +148,13 @@ pub async fn launch( sleep(duration).await; } + // TODO: block on sync here to avoid spurious first PA? + let mut current_epoch = clock.current_epoch().expect("past genesis"); clock_tx.send(ClockMessage::NewEpoch(current_epoch)).expect("can send"); let mut slots = clock.into_stream(); while let Some(slot) = slots.next().await { - if let Err(err) = clock_tx.send(ClockMessage::NewSlot) { - let msg = err.0; - warn!(?msg, "could not update receivers with new slot") - } let epoch = slot / context.slots_per_epoch; if epoch > current_epoch { current_epoch = epoch; @@ -177,6 +171,5 @@ pub async fn launch( #[derive(Debug, Clone)] pub enum ClockMessage { - NewSlot, NewEpoch(Epoch), }