Skip to content

Commit

Permalink
impl basic bidder
Browse files Browse the repository at this point in the history
  • Loading branch information
ralexstokes committed Apr 30, 2024
1 parent 33db163 commit 5b89ae8
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 74 deletions.
4 changes: 2 additions & 2 deletions example.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ extra_data = "0x68656C6C6F20776F726C640A" # "hello world"
execution_mnemonic = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about"

[builder.bidder]
# number of milliseconds to submit bids ahead of the target slot
bidding_deadline_ms = 1000
# amount in milliseconds of time to wait until submitting bids
wait_until_ms = 3000
# [optional] amount of value to bid as a fraction of the payload's revenue
# if missing, defaults to 1.0 (100%)
# validation: should be between [0, 1] inclusive.
Expand Down
64 changes: 42 additions & 22 deletions mev-build-rs/src/auctioneer/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ pub struct Service<
context: Arc<Context>,
// TODO consolidate this somewhere...
genesis_time: u64,
bidder: Sender<BidderMessage>,
bid_dispatch: Receiver<BidderMessage>,
bidder_tx: Sender<BidderMessage>,
bidder_rx: Receiver<BidderMessage>,

auction_schedule: AuctionSchedule,
open_auctions: HashMap<PayloadId, Arc<AuctionContext>>,
Expand All @@ -117,8 +117,8 @@ impl<
pub fn new(
clock: broadcast::Receiver<ClockMessage>,
builder: PayloadBuilderHandle<Engine>,
bidder: Sender<BidderMessage>,
bid_dispatch: Receiver<BidderMessage>,
bidder_tx: Sender<BidderMessage>,
bidder_rx: Receiver<BidderMessage>,
mut config: Config,
context: Arc<Context>,
genesis_time: u64,
Expand All @@ -140,8 +140,8 @@ impl<
config,
context,
genesis_time,
bidder,
bid_dispatch,
bidder_tx,
bidder_rx,
auction_schedule: Default::default(),
open_auctions: Default::default(),
}
Expand Down Expand Up @@ -215,7 +215,7 @@ impl<
// TODO: consider data layout in `open_auctions`
let auction = self.open_auctions.entry(payload_id).or_insert_with(|| Arc::new(auction));

self.bidder.send(BidderMessage::NewAuction(auction.clone())).await.expect("can send");
self.bidder_tx.send(BidderMessage::NewAuction(auction.clone())).await.expect("can send");
}

async fn on_payload_attributes(&mut self, attributes: BuilderPayloadBuilderAttributes) {
Expand All @@ -237,18 +237,38 @@ impl<
}

async fn process_bid_update(&mut self, message: BidderMessage) {
if let BidderMessage::Dispatch(payload_id, _keep_alive) = message {
// TODO: may want to keep payload job running...
// NOTE: move back to builder interface over payload builder, that can also do this on
// its own thread?
if let Some(payload) = self.payload_store.resolve(payload_id).await {
match payload {
Ok(payload) => self.submit_payload(payload).await,
Err(err) => warn!(%err, "payload resolution failed"),
match message {
BidderMessage::RevenueQuery(payload_id, tx) => {
// TODO: store this payload (by hash) so that the bid that returns targets something
// stable...
if let Some(payload) = self.payload_store.best_payload(payload_id).await {
match payload {
Ok(payload) => {
// TODO: send more dynamic updates
// by the time the bidder submits a value the best payload may have
// already changed
tx.send(Ok(payload.fees())).expect("can send");
return
}
Err(err) => warn!(%err, "could not get best payload from payload store"),
}
}
// fallback
tx.send(Err(Error::MissingPayload(payload_id))).expect("can send");
}
BidderMessage::Dispatch { payload_id, value, keep_alive: _keep_alive } => {
// TODO: forward keep alive signal to builder
self.builder_tx.send((payload_id, value)).await.expect("can send");
if let Some(payload) = self.payload_store.resolve(payload_id).await {
match payload {
Ok(payload) => self.submit_payload(payload).await,
Err(err) => warn!(%err, "payload resolution failed"),
}
} else {
warn!(%payload_id, "no payload could be retrieved from payload store for bid")
}
} else {
warn!(%payload_id, "no payload could be retrieved from payload store for bid")
}
_ => {}
}
}

Expand Down Expand Up @@ -285,12 +305,12 @@ impl<
}
}

async fn dispatch_clock(&mut self, message: ClockMessage) {
async fn process_clock(&mut self, message: ClockMessage) {
let ClockMessage::NewEpoch(epoch) = message;
self.on_epoch(epoch).await;
}

async fn dispatch_payload_event(&mut self, event: Events<Engine>) {
async fn process_payload_event(&mut self, event: Events<Engine>) {
if let Events::Attributes(attributes) = event {
self.on_payload_attributes(attributes).await;
}
Expand All @@ -312,12 +332,12 @@ impl<

loop {
tokio::select! {
Ok(message) = self.clock.recv() => self.dispatch_clock(message).await,
Ok(message) = self.clock.recv() => self.process_clock(message).await,
Some(event) = payload_events.next() => match event {
Ok(event) => self.dispatch_payload_event(event).await,
Ok(event) => self.process_payload_event(event).await,
Err(err) => warn!(%err, "error getting payload event"),
},
Some(message) = self.bid_dispatch.recv() => self.process_bid_update(message).await,
Some(message) = self.bidder_rx.recv() => self.process_bid_update(message).await,
}
}
}
Expand Down
15 changes: 14 additions & 1 deletion mev-build-rs/src/bidder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
mod service;
pub mod strategies;

use std::time::Duration;

use reth::primitives::U256;
pub use service::{Message, Service};
pub use strategies::Config;

pub use service::{BidStatus, KeepAlive, Message, Service};
/// Do we expect to submit more bids or not?
#[derive(Debug, Clone, Copy)]
pub enum KeepAlive {
Yes,
}

pub enum Bid {
Wait(Duration),
Submit { value: U256, keep_alive: KeepAlive },
}
95 changes: 65 additions & 30 deletions mev-build-rs/src/bidder/service.rs
Original file line number Diff line number Diff line change
@@ -1,59 +1,94 @@
use crate::{
auctioneer::AuctionContext,
bidder::{strategies::DeadlineBidder, Config},
bidder::{strategies::BasicStrategy, Bid, Config, KeepAlive},
Error,
};
use ethereum_consensus::clock::duration_until;
use reth::{
api::PayloadBuilderAttributes, payload::PayloadId, primitives::U256, tasks::TaskExecutor,
};
use std::sync::Arc;
use tokio::sync::mpsc::{Receiver, Sender};
use std::{sync::Arc, time::Duration};
use tokio::{
sync::{
mpsc::{Receiver, Sender},
oneshot,
},
time::{sleep, timeout},
};
use tracing::debug;

/// All bidding routines stop this many seconds *after* the timestamp of the proposal
/// regardless of what the bidding strategy suggests
pub const DEFAULT_BIDDING_DEADLINE_AFTER_SLOT: u64 = 1;

pub enum Message {
NewAuction(Arc<AuctionContext>),
Dispatch(PayloadId, KeepAlive),
}

#[derive(Debug)]
pub enum KeepAlive {
No,
}

pub enum BidStatus {
Submit { value: U256, keep_alive: KeepAlive },
Dispatch { payload_id: PayloadId, value: U256, keep_alive: KeepAlive },
RevenueQuery(PayloadId, oneshot::Sender<Result<U256, Error>>),
}

pub struct Service {
auctioneer: Receiver<Message>,
bid_dispatch: Sender<Message>,
auctioneer_rx: Receiver<Message>,
auctioneer_tx: Sender<Message>,
executor: TaskExecutor,
config: Config,
}

impl Service {
pub fn new(
auctioneer: Receiver<Message>,
bid_dispatch: Sender<Message>,
auctioneer_rx: Receiver<Message>,
auctioneer_tx: Sender<Message>,
executor: TaskExecutor,
config: Config,
) -> Self {
Self { auctioneer, bid_dispatch, executor, config }
Self { auctioneer_rx, auctioneer_tx, executor, config }
}

fn start_bid(&mut self, auction: Arc<AuctionContext>) {
let dispatcher = self.bid_dispatch.clone();
let auctioneer = self.auctioneer_tx.clone();
// TODO: make strategies configurable...
let mut strategy = DeadlineBidder::new(&self.config);
let mut strategy = BasicStrategy::new(&self.config);
let duration_after_slot = Duration::from_secs(DEFAULT_BIDDING_DEADLINE_AFTER_SLOT);
let max_bidding_duration = duration_until(auction.attributes.timestamp())
.checked_add(duration_after_slot)
.unwrap_or_default();
self.executor.spawn_blocking(async move {
// TODO get current fees from builder
let fees = U256::from(100);
let BidStatus::Submit { value: _value, keep_alive } =
strategy.run(&auction, fees).await;
// TODO send value to builder
// TODO issues with timeout and open channels?
let _ = timeout(max_bidding_duration, async move {
let payload_id = auction.attributes.payload_id();
let mut should_run = KeepAlive::Yes;
while matches!(should_run, KeepAlive::Yes) {
let (tx, rx) = oneshot::channel();
let message = Message::RevenueQuery(payload_id, tx);
auctioneer.send(message).await.expect("can send");
let current_revenue = match rx.await.expect("can recv") {
Ok(fees) => fees,
Err(err) => {
// NOTE: if there was an error, try to fetch
// again without running a strategy
// TODO: handle case when the auction has terminated and we should
// also terminate
debug!(%err, "could not get current revenue; trying again");
continue
}
};

dispatcher
.send(Message::Dispatch(auction.attributes.payload_id(), keep_alive))
.await
.expect("can send");
match strategy.run(&auction, current_revenue).await {
Bid::Wait(duration) => {
sleep(duration).await;
continue
}
Bid::Submit { value, keep_alive } => {
should_run = keep_alive;
auctioneer
.send(Message::Dispatch { payload_id, value, keep_alive })
.await
.expect("can send");
}
}
}
})
.await;
});
}

Expand All @@ -66,7 +101,7 @@ impl Service {
pub async fn spawn(mut self) {
loop {
tokio::select! {
Some(message) = self.auctioneer.recv() => self.dispatch(message).await,
Some(message) = self.auctioneer_rx.recv() => self.dispatch(message).await,
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
use crate::{
auctioneer::AuctionContext,
bidder::{BidStatus, KeepAlive},
bidder::{Bid, KeepAlive},
};
use ethereum_consensus::clock::duration_until;
use reth::{api::PayloadBuilderAttributes, primitives::U256};
use serde::Deserialize;
use std::time::Duration;
use tokio::time::sleep;
use tokio::time::{interval, Interval};

pub const DEFAULT_BID_INTERVAL: u64 = 1;

#[derive(Deserialize, Debug, Default, Clone)]
pub struct Config {
// amount in milliseconds
pub bidding_deadline_ms: u64,
// amount in milliseconds of time to wait until submitting bids
pub wait_until_ms: u64,
// amount to bid as a fraction of the block's value
// if missing, default to 100%
// TODO: use to price bid
pub bid_percent: Option<f64>,
// amount to add from the builder's wallet as a subsidy to the auction bid
// TODO: use to adjust bid
// if missing, defaults to 0
pub subsidy_wei: Option<U256>,
}

Expand All @@ -26,17 +27,20 @@ pub struct Config {
///
/// For example, if the `deadline` is 1 second, then the bidder will return
/// a value to bid one second before the start of the build's target slot.
pub struct DeadlineBidder {
deadline: Duration,
pub struct BasicStrategy {
wait_until: Duration,
bid_interval: Interval,
bid_percent: f64,
subsidy_wei: U256,
}

impl DeadlineBidder {
impl BasicStrategy {
pub fn new(config: &Config) -> Self {
let deadline = Duration::from_millis(config.bidding_deadline_ms);
let wait_until = Duration::from_millis(config.wait_until_ms);
let bid_interval = interval(Duration::from_secs(DEFAULT_BID_INTERVAL));
Self {
deadline,
wait_until,
bid_interval,
bid_percent: config.bid_percent.unwrap_or(1.0).clamp(0.0, 1.0),
subsidy_wei: config.subsidy_wei.unwrap_or(U256::ZERO),
}
Expand All @@ -48,11 +52,19 @@ impl DeadlineBidder {
value
}

pub async fn run(&mut self, auction: &AuctionContext, current_revenue: U256) -> BidStatus {
let value = self.compute_value(current_revenue);
pub async fn run(&mut self, auction: &AuctionContext, current_revenue: U256) -> Bid {
// First, we wait until we are near the auction deadline
let target = duration_until(auction.attributes.timestamp());
let duration = target.checked_sub(self.deadline).unwrap_or_default();
sleep(duration).await;
BidStatus::Submit { value, keep_alive: KeepAlive::No }
let wait_until = target.checked_sub(self.wait_until).unwrap_or_default();
if !wait_until.is_zero() {
return Bid::Wait(wait_until)
}

// If we are near the auction deadline, start submitting bids
// with one bid per tick of the interval
self.bid_interval.tick().await;

let value = self.compute_value(current_revenue);
Bid::Submit { value, keep_alive: KeepAlive::Yes }
}
}
4 changes: 2 additions & 2 deletions mev-build-rs/src/bidder/strategies/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
mod deadline;
mod basic;

pub use deadline::{Config, DeadlineBidder};
pub use basic::{BasicStrategy, Config};
4 changes: 3 additions & 1 deletion mev-build-rs/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use alloy_signer_wallet::WalletError;
use ethereum_consensus::Error as ConsensusError;
use reth::payload::error::PayloadBuilderError;
use reth::payload::{error::PayloadBuilderError, PayloadId};
use thiserror::Error;

#[derive(Error, Debug)]
Expand All @@ -11,4 +11,6 @@ pub enum Error {
PayloadBuilderError(#[from] PayloadBuilderError),
#[error(transparent)]
WalletError(#[from] WalletError),
#[error("could not find payload {0}")]
MissingPayload(PayloadId),
}

0 comments on commit 5b89ae8

Please sign in to comment.