diff --git a/example.config.toml b/example.config.toml index 47286e02..0b35db98 100644 --- a/example.config.toml +++ b/example.config.toml @@ -37,8 +37,8 @@ extra_data = "0x68656C6C6F20776F726C640A" # "hello world" execution_mnemonic = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about" [builder.bidder] -# number of milliseconds to submit bids ahead of the target slot -bidding_deadline_ms = 1000 +# amount in milliseconds of time to wait until submitting bids +wait_until_ms = 3000 # [optional] amount of value to bid as a fraction of the payload's revenue # if missing, defaults to 1.0 (100%) # validation: should be between [0, 1] inclusive. diff --git a/mev-build-rs/src/auctioneer/service.rs b/mev-build-rs/src/auctioneer/service.rs index 81e9ee76..d980e902 100644 --- a/mev-build-rs/src/auctioneer/service.rs +++ b/mev-build-rs/src/auctioneer/service.rs @@ -100,8 +100,8 @@ pub struct Service< context: Arc, // TODO consolidate this somewhere... genesis_time: u64, - bidder: Sender, - bid_dispatch: Receiver, + bidder_tx: Sender, + bidder_rx: Receiver, auction_schedule: AuctionSchedule, open_auctions: HashMap>, @@ -117,8 +117,8 @@ impl< pub fn new( clock: broadcast::Receiver, builder: PayloadBuilderHandle, - bidder: Sender, - bid_dispatch: Receiver, + bidder_tx: Sender, + bidder_rx: Receiver, mut config: Config, context: Arc, genesis_time: u64, @@ -140,8 +140,8 @@ impl< config, context, genesis_time, - bidder, - bid_dispatch, + bidder_tx, + bidder_rx, auction_schedule: Default::default(), open_auctions: Default::default(), } @@ -215,7 +215,7 @@ impl< // TODO: consider data layout in `open_auctions` let auction = self.open_auctions.entry(payload_id).or_insert_with(|| Arc::new(auction)); - self.bidder.send(BidderMessage::NewAuction(auction.clone())).await.expect("can send"); + self.bidder_tx.send(BidderMessage::NewAuction(auction.clone())).await.expect("can send"); } async fn on_payload_attributes(&mut self, attributes: BuilderPayloadBuilderAttributes) { @@ -237,18 +237,38 @@ impl< } async fn process_bid_update(&mut self, message: BidderMessage) { - if let BidderMessage::Dispatch(payload_id, _keep_alive) = message { - // TODO: may want to keep payload job running... - // NOTE: move back to builder interface over payload builder, that can also do this on - // its own thread? - if let Some(payload) = self.payload_store.resolve(payload_id).await { - match payload { - Ok(payload) => self.submit_payload(payload).await, - Err(err) => warn!(%err, "payload resolution failed"), + match message { + BidderMessage::RevenueQuery(payload_id, tx) => { + // TODO: store this payload (by hash) so that the bid that returns targets something + // stable... + if let Some(payload) = self.payload_store.best_payload(payload_id).await { + match payload { + Ok(payload) => { + // TODO: send more dynamic updates + // by the time the bidder submits a value the best payload may have + // already changed + tx.send(Ok(payload.fees())).expect("can send"); + return + } + Err(err) => warn!(%err, "could not get best payload from payload store"), + } + } + // fallback + tx.send(Err(Error::MissingPayload(payload_id))).expect("can send"); + } + BidderMessage::Dispatch { payload_id, value, keep_alive: _keep_alive } => { + // TODO: forward keep alive signal to builder + self.builder_tx.send((payload_id, value)).await.expect("can send"); + if let Some(payload) = self.payload_store.resolve(payload_id).await { + match payload { + Ok(payload) => self.submit_payload(payload).await, + Err(err) => warn!(%err, "payload resolution failed"), + } + } else { + warn!(%payload_id, "no payload could be retrieved from payload store for bid") } - } else { - warn!(%payload_id, "no payload could be retrieved from payload store for bid") } + _ => {} } } @@ -285,12 +305,12 @@ impl< } } - async fn dispatch_clock(&mut self, message: ClockMessage) { + async fn process_clock(&mut self, message: ClockMessage) { let ClockMessage::NewEpoch(epoch) = message; self.on_epoch(epoch).await; } - async fn dispatch_payload_event(&mut self, event: Events) { + async fn process_payload_event(&mut self, event: Events) { if let Events::Attributes(attributes) = event { self.on_payload_attributes(attributes).await; } @@ -312,12 +332,12 @@ impl< loop { tokio::select! { - Ok(message) = self.clock.recv() => self.dispatch_clock(message).await, + Ok(message) = self.clock.recv() => self.process_clock(message).await, Some(event) = payload_events.next() => match event { - Ok(event) => self.dispatch_payload_event(event).await, + Ok(event) => self.process_payload_event(event).await, Err(err) => warn!(%err, "error getting payload event"), }, - Some(message) = self.bid_dispatch.recv() => self.process_bid_update(message).await, + Some(message) = self.bidder_rx.recv() => self.process_bid_update(message).await, } } } diff --git a/mev-build-rs/src/bidder/mod.rs b/mev-build-rs/src/bidder/mod.rs index 5d0dd8cc..7e001b20 100644 --- a/mev-build-rs/src/bidder/mod.rs +++ b/mev-build-rs/src/bidder/mod.rs @@ -1,6 +1,19 @@ mod service; pub mod strategies; +use std::time::Duration; + +use reth::primitives::U256; +pub use service::{Message, Service}; pub use strategies::Config; -pub use service::{BidStatus, KeepAlive, Message, Service}; +/// Do we expect to submit more bids or not? +#[derive(Debug, Clone, Copy)] +pub enum KeepAlive { + Yes, +} + +pub enum Bid { + Wait(Duration), + Submit { value: U256, keep_alive: KeepAlive }, +} diff --git a/mev-build-rs/src/bidder/service.rs b/mev-build-rs/src/bidder/service.rs index 699b9e27..52784172 100644 --- a/mev-build-rs/src/bidder/service.rs +++ b/mev-build-rs/src/bidder/service.rs @@ -1,59 +1,94 @@ use crate::{ auctioneer::AuctionContext, - bidder::{strategies::DeadlineBidder, Config}, + bidder::{strategies::BasicStrategy, Bid, Config, KeepAlive}, + Error, }; +use ethereum_consensus::clock::duration_until; use reth::{ api::PayloadBuilderAttributes, payload::PayloadId, primitives::U256, tasks::TaskExecutor, }; -use std::sync::Arc; -use tokio::sync::mpsc::{Receiver, Sender}; +use std::{sync::Arc, time::Duration}; +use tokio::{ + sync::{ + mpsc::{Receiver, Sender}, + oneshot, + }, + time::{sleep, timeout}, +}; +use tracing::debug; + +/// All bidding routines stop this many seconds *after* the timestamp of the proposal +/// regardless of what the bidding strategy suggests +pub const DEFAULT_BIDDING_DEADLINE_AFTER_SLOT: u64 = 1; pub enum Message { NewAuction(Arc), - Dispatch(PayloadId, KeepAlive), -} - -#[derive(Debug)] -pub enum KeepAlive { - No, -} - -pub enum BidStatus { - Submit { value: U256, keep_alive: KeepAlive }, + Dispatch { payload_id: PayloadId, value: U256, keep_alive: KeepAlive }, + RevenueQuery(PayloadId, oneshot::Sender>), } pub struct Service { - auctioneer: Receiver, - bid_dispatch: Sender, + auctioneer_rx: Receiver, + auctioneer_tx: Sender, executor: TaskExecutor, config: Config, } impl Service { pub fn new( - auctioneer: Receiver, - bid_dispatch: Sender, + auctioneer_rx: Receiver, + auctioneer_tx: Sender, executor: TaskExecutor, config: Config, ) -> Self { - Self { auctioneer, bid_dispatch, executor, config } + Self { auctioneer_rx, auctioneer_tx, executor, config } } fn start_bid(&mut self, auction: Arc) { - let dispatcher = self.bid_dispatch.clone(); + let auctioneer = self.auctioneer_tx.clone(); // TODO: make strategies configurable... - let mut strategy = DeadlineBidder::new(&self.config); + let mut strategy = BasicStrategy::new(&self.config); + let duration_after_slot = Duration::from_secs(DEFAULT_BIDDING_DEADLINE_AFTER_SLOT); + let max_bidding_duration = duration_until(auction.attributes.timestamp()) + .checked_add(duration_after_slot) + .unwrap_or_default(); self.executor.spawn_blocking(async move { - // TODO get current fees from builder - let fees = U256::from(100); - let BidStatus::Submit { value: _value, keep_alive } = - strategy.run(&auction, fees).await; - // TODO send value to builder + // TODO issues with timeout and open channels? + let _ = timeout(max_bidding_duration, async move { + let payload_id = auction.attributes.payload_id(); + let mut should_run = KeepAlive::Yes; + while matches!(should_run, KeepAlive::Yes) { + let (tx, rx) = oneshot::channel(); + let message = Message::RevenueQuery(payload_id, tx); + auctioneer.send(message).await.expect("can send"); + let current_revenue = match rx.await.expect("can recv") { + Ok(fees) => fees, + Err(err) => { + // NOTE: if there was an error, try to fetch + // again without running a strategy + // TODO: handle case when the auction has terminated and we should + // also terminate + debug!(%err, "could not get current revenue; trying again"); + continue + } + }; - dispatcher - .send(Message::Dispatch(auction.attributes.payload_id(), keep_alive)) - .await - .expect("can send"); + match strategy.run(&auction, current_revenue).await { + Bid::Wait(duration) => { + sleep(duration).await; + continue + } + Bid::Submit { value, keep_alive } => { + should_run = keep_alive; + auctioneer + .send(Message::Dispatch { payload_id, value, keep_alive }) + .await + .expect("can send"); + } + } + } + }) + .await; }); } @@ -66,7 +101,7 @@ impl Service { pub async fn spawn(mut self) { loop { tokio::select! { - Some(message) = self.auctioneer.recv() => self.dispatch(message).await, + Some(message) = self.auctioneer_rx.recv() => self.dispatch(message).await, } } } diff --git a/mev-build-rs/src/bidder/strategies/deadline.rs b/mev-build-rs/src/bidder/strategies/basic.rs similarity index 59% rename from mev-build-rs/src/bidder/strategies/deadline.rs rename to mev-build-rs/src/bidder/strategies/basic.rs index 80558a11..984b85bf 100644 --- a/mev-build-rs/src/bidder/strategies/deadline.rs +++ b/mev-build-rs/src/bidder/strategies/basic.rs @@ -1,23 +1,24 @@ use crate::{ auctioneer::AuctionContext, - bidder::{BidStatus, KeepAlive}, + bidder::{Bid, KeepAlive}, }; use ethereum_consensus::clock::duration_until; use reth::{api::PayloadBuilderAttributes, primitives::U256}; use serde::Deserialize; use std::time::Duration; -use tokio::time::sleep; +use tokio::time::{interval, Interval}; + +pub const DEFAULT_BID_INTERVAL: u64 = 1; #[derive(Deserialize, Debug, Default, Clone)] pub struct Config { - // amount in milliseconds - pub bidding_deadline_ms: u64, + // amount in milliseconds of time to wait until submitting bids + pub wait_until_ms: u64, // amount to bid as a fraction of the block's value // if missing, default to 100% - // TODO: use to price bid pub bid_percent: Option, // amount to add from the builder's wallet as a subsidy to the auction bid - // TODO: use to adjust bid + // if missing, defaults to 0 pub subsidy_wei: Option, } @@ -26,17 +27,20 @@ pub struct Config { /// /// For example, if the `deadline` is 1 second, then the bidder will return /// a value to bid one second before the start of the build's target slot. -pub struct DeadlineBidder { - deadline: Duration, +pub struct BasicStrategy { + wait_until: Duration, + bid_interval: Interval, bid_percent: f64, subsidy_wei: U256, } -impl DeadlineBidder { +impl BasicStrategy { pub fn new(config: &Config) -> Self { - let deadline = Duration::from_millis(config.bidding_deadline_ms); + let wait_until = Duration::from_millis(config.wait_until_ms); + let bid_interval = interval(Duration::from_secs(DEFAULT_BID_INTERVAL)); Self { - deadline, + wait_until, + bid_interval, bid_percent: config.bid_percent.unwrap_or(1.0).clamp(0.0, 1.0), subsidy_wei: config.subsidy_wei.unwrap_or(U256::ZERO), } @@ -48,11 +52,19 @@ impl DeadlineBidder { value } - pub async fn run(&mut self, auction: &AuctionContext, current_revenue: U256) -> BidStatus { - let value = self.compute_value(current_revenue); + pub async fn run(&mut self, auction: &AuctionContext, current_revenue: U256) -> Bid { + // First, we wait until we are near the auction deadline let target = duration_until(auction.attributes.timestamp()); - let duration = target.checked_sub(self.deadline).unwrap_or_default(); - sleep(duration).await; - BidStatus::Submit { value, keep_alive: KeepAlive::No } + let wait_until = target.checked_sub(self.wait_until).unwrap_or_default(); + if !wait_until.is_zero() { + return Bid::Wait(wait_until) + } + + // If we are near the auction deadline, start submitting bids + // with one bid per tick of the interval + self.bid_interval.tick().await; + + let value = self.compute_value(current_revenue); + Bid::Submit { value, keep_alive: KeepAlive::Yes } } } diff --git a/mev-build-rs/src/bidder/strategies/mod.rs b/mev-build-rs/src/bidder/strategies/mod.rs index 6924a0c8..b844f17d 100644 --- a/mev-build-rs/src/bidder/strategies/mod.rs +++ b/mev-build-rs/src/bidder/strategies/mod.rs @@ -1,3 +1,3 @@ -mod deadline; +mod basic; -pub use deadline::{Config, DeadlineBidder}; +pub use basic::{BasicStrategy, Config}; diff --git a/mev-build-rs/src/error.rs b/mev-build-rs/src/error.rs index d48e323f..6fd791f3 100644 --- a/mev-build-rs/src/error.rs +++ b/mev-build-rs/src/error.rs @@ -1,6 +1,6 @@ use alloy_signer_wallet::WalletError; use ethereum_consensus::Error as ConsensusError; -use reth::payload::error::PayloadBuilderError; +use reth::payload::{error::PayloadBuilderError, PayloadId}; use thiserror::Error; #[derive(Error, Debug)] @@ -11,4 +11,6 @@ pub enum Error { PayloadBuilderError(#[from] PayloadBuilderError), #[error(transparent)] WalletError(#[from] WalletError), + #[error("could not find payload {0}")] + MissingPayload(PayloadId), }