Skip to content

Commit

Permalink
add Bidder abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
ralexstokes committed Sep 7, 2023
1 parent df6cc54 commit 96c8c5b
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 88 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ members = ["bin/mev", "mev-boost-rs", "mev-relay-rs", "mev-build-rs", "mev-rs"]
default-members = ["bin/mev"]

[workspace.dependencies]
ethereum-consensus = { git = "https://github.com/ralexstokes/ethereum-consensus", rev = "4d4e3fb57584d9c3bba6b4fc488f23db4937ae79" }
beacon-api-client = { git = "https://github.com/ralexstokes/beacon-api-client", rev = "565d4e429ded4ffde944a96ec190ae00b1aba548" }
ethereum-consensus = { git = "https://github.com/ralexstokes/ethereum-consensus", rev = "8d85daf5b50e522bde351eabf6340ccfd0030db1" }
beacon-api-client = { git = "https://github.com/ralexstokes/beacon-api-client", rev = "98d075fc0af06eb794e89260d1306f89b01cabd8" }
ssz_rs = "0.9.0"

reth-payload-builder = { git = "https://github.com/paradigmxyz/reth", rev = "e3457b8" }
Expand Down
4 changes: 2 additions & 2 deletions bin/mev/src/cmd/build/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod reth_ext;
use anyhow::Result;
use clap::Args;
use mev_rs::Network;
use reth_ext::{launch_reth, RethNodeExt};
use reth_ext::{launch_reth_with, RethNodeExt};

#[derive(Debug, Args)]
#[clap(about = "🛠️ building blocks since 2023")]
Expand All @@ -15,7 +15,7 @@ pub struct Command {
impl Command {
pub async fn execute(&self, network: Network) -> Result<()> {
let ext = RethNodeExt { config_file: self.config_file.clone(), network };
launch_reth(ext).await;
launch_reth_with(ext).await;
Ok(())
}
}
29 changes: 23 additions & 6 deletions bin/mev/src/cmd/build/reth_ext.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::cmd::config::Config;
use clap::{Args, Parser};
use mev_build_rs::reth_builder::{Config as BuildConfig, Service};
use ethereum_consensus::{networks, state_transition::Context};
use mev_build_rs::reth_builder::{Config as BuildConfig, DeadlineBidder, Service};
use mev_rs::Network;
use reth::{
cli::ext::{RethCliExt, RethNodeCommandExt},
Expand All @@ -9,6 +10,7 @@ use reth::{
tasks::TaskManager,
};
use reth_payload_builder::PayloadBuilderService;
use std::{sync::Arc, time::Duration};

struct RethExt;

Expand Down Expand Up @@ -53,9 +55,24 @@ impl RethNodeCommandExt for RethNodeExt {
Tasks: reth::tasks::TaskSpawner + Clone + Unpin + 'static,
{
let build_config = self.to_config();
let (service, builder) =
Service::from(build_config, None, pool.clone(), provider.clone(), chain_spec.clone())
.unwrap();
let network = &build_config.network;
let context = Arc::new(Context::try_from(network)?);
let clock = context.clock().unwrap_or_else(|| {
let genesis_time = networks::typical_genesis_time(&context);
context.clock_at(genesis_time)
});
let deadline = Duration::from_millis(build_config.bidding_deadline_ms);
let bidder = DeadlineBidder::new(clock.clone(), deadline);
let (service, builder) = Service::from(
build_config,
context,
clock,
pool.clone(),
provider.clone(),
bidder,
chain_spec.clone(),
)
.unwrap();

let (payload_service, payload_builder) = PayloadBuilderService::new(builder);

Expand All @@ -73,14 +90,14 @@ impl RethNodeCommandExt for RethNodeExt {
}
});

executor.spawn_critical("mempool builder", fut);
executor.spawn_critical("boost builder", fut);
executor.spawn_critical("payload builder service", Box::pin(payload_service));

Ok(payload_builder)
}
}

pub(crate) async fn launch_reth(ext: RethNodeExt) {
pub(crate) async fn launch_reth_with(ext: RethNodeExt) {
let task_manager = TaskManager::new(tokio::runtime::Handle::current());
let task_executor = task_manager.executor();
let ctx = CliContext { task_executor };
Expand Down
1 change: 1 addition & 0 deletions example.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ relays = [
]
extra_data = "hello world"
execution_mnemonic = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about"
bidding_deadline_ms = 1000
42 changes: 42 additions & 0 deletions mev-build-rs/src/reth_builder/bidder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use crate::reth_builder::{build::Build, error::Error};
use async_trait::async_trait;
use ethereum_consensus::clock::{Clock, SystemTimeProvider};
use std::time::Duration;

pub enum Bid {
Continue,
Done,
}

#[async_trait]
pub trait Bidder {
async fn bid_for(&self, build: &Build) -> Result<Option<Bid>, Error>;
}

/// `DeadlineBidder` submits the best payload *once* at the `deadline`
/// expressed as a `Duration` *before* the start of the build's target slot.
///
/// For example, if the `deadline` is 1 second, then the bidder will return
/// a value to bid one second before the start of the build's target slot.
pub struct DeadlineBidder {
clock: Clock<SystemTimeProvider>,
deadline: Duration,
}

impl DeadlineBidder {
pub fn new(clock: Clock<SystemTimeProvider>, deadline: Duration) -> Self {
Self { clock, deadline }
}
}

#[async_trait]
impl Bidder for DeadlineBidder {
async fn bid_for(&self, build: &Build) -> Result<Option<Bid>, Error> {
let slot = build.context.slot;
let target = self.clock.duration_until_slot(slot);
let duration = target - self.deadline;
tokio::time::sleep(duration).await;

Ok(Some(Bid::Done))
}
}
29 changes: 23 additions & 6 deletions mev-build-rs/src/reth_builder/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ pub struct Build {
pub state: Mutex<State>,
}

type State = PayloadWithPayments;
#[derive(Default, Debug)]
pub struct State {
pub payload_with_payments: PayloadWithPayments,
stopped: bool,
}

impl Build {
pub fn new(context: BuildContext) -> Self {
Expand All @@ -110,7 +114,7 @@ impl Build {

pub fn value(&self) -> U256 {
let state = self.state.lock().unwrap();
state.proposer_payment
state.payload_with_payments.proposer_payment
}

pub fn prepare_bid(
Expand All @@ -121,15 +125,28 @@ impl Build {
) -> Result<(SignedBidSubmission, U256), Error> {
let build_context = &self.context;
let state = self.state.lock().unwrap();
let payload =
state.payload.as_ref().ok_or_else(|| Error::PayloadNotPrepared(build_context.id()))?;
let payment = &state.proposer_payment;
let builder_payment = state.builder_payment;
let payload_with_payments = &state.payload_with_payments;
let payload = payload_with_payments
.payload
.as_ref()
.ok_or_else(|| Error::PayloadNotPrepared(build_context.id()))?;
let payment = &payload_with_payments.proposer_payment;
let builder_payment = payload_with_payments.builder_payment;
Ok((
make_submission(secret_key, public_key, context, build_context, payload, payment)?,
builder_payment,
))
}

pub fn stop(&self) {
let mut state = self.state.lock().unwrap();
state.stopped = true;
}

pub fn is_stopped(&self) -> bool {
let state = self.state.lock().unwrap();
state.stopped
}
}

#[derive(Debug, Default)]
Expand Down
51 changes: 32 additions & 19 deletions mev-build-rs/src/reth_builder/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{
ops::Deref,
sync::{Arc, Mutex},
};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, Stream};

/// `Builder` builds blocks for proposers registered to connected relays.
Expand Down Expand Up @@ -49,12 +49,14 @@ pub struct Inner<Pool, Client> {
builder_wallet: LocalWallet,

payload_attributes_tx: mpsc::Sender<PayloadBuilderAttributes>,
builds_tx: mpsc::Sender<BuildIdentifier>,
state: Mutex<State>,
}

#[derive(Default, Debug)]
struct State {
payload_attributes_rx: Option<mpsc::Receiver<PayloadBuilderAttributes>>,
builds_rx: Option<mpsc::Receiver<BuildIdentifier>>,
// TODO: merge in `ProposerScheduler` here?
proposer_schedule:
BTreeMap<Slot, HashMap<BlsPublicKey, HashMap<ValidatorPreferences, Vec<RelayIndex>>>>,
Expand All @@ -75,10 +77,12 @@ impl<Pool, Client> Builder<Pool, Client> {
) -> Self {
let public_key = secret_key.public_key();

let (tx, rx) = mpsc::channel::<PayloadBuilderAttributes>(16);
let (attrs_tx, attrs_rx) = mpsc::channel::<PayloadBuilderAttributes>(16);
let (builds_tx, builds_rx) = mpsc::channel::<BuildIdentifier>(16);

let state = State {
payload_attributes_rx: Some(rx),
payload_attributes_rx: Some(attrs_rx),
builds_rx: Some(builds_rx),
proposer_schedule: Default::default(),
builds: Default::default(),
};
Expand All @@ -94,7 +98,8 @@ impl<Pool, Client> Builder<Pool, Client> {
chain_spec,
builder_wallet,
extra_data,
payload_attributes_tx: tx,
payload_attributes_tx: attrs_tx,
builds_tx,
state: Mutex::new(state),
}))
}
Expand Down Expand Up @@ -175,12 +180,26 @@ impl<Pool, Client> Builder<Pool, Client> {
}
}

pub fn stream_builds(&self) -> Result<impl Stream<Item = BuildIdentifier>, Error> {
let mut state = self.state.lock().unwrap();
let rx = state.builds_rx.take();
if let Some(rx) = rx {
Ok(ReceiverStream::new(rx))
} else {
Err(Error::Internal("can only yield payload attributes stream once"))
}
}

pub fn build_for(&self, id: &BuildIdentifier) -> Option<Arc<Build>> {
self.state.lock().unwrap().builds.get(id).cloned()
}

// TODO: pull out into "bidder" component
// TODO: support dynamic bidding, over "static" bidding with fixed percent payment
pub fn stop_build(&self, id: &BuildIdentifier) {
if let Some(build) = self.build_for(id) {
build.stop()
}
}

pub async fn submit_bid(&self, id: &BuildIdentifier) -> Result<(), Error> {
let build = self.build_for(id).ok_or_else(|| Error::MissingBuild(id.clone()))?;

Expand Down Expand Up @@ -357,19 +376,17 @@ impl<Pool: TransactionPool, Client: StateProviderFactory + BlockReaderIdExt> Bui
}

// Drives the build referenced by `id`. Inside a context where blocking is ok.
pub fn construct_best_payload(
&self,
id: &BuildIdentifier,
mut done: oneshot::Receiver<()>,
) -> Result<(), Error> {
pub fn start_build(&self, id: &BuildIdentifier) -> Result<(), Error> {
let build = self.build_for(id).ok_or_else(|| Error::MissingBuild(id.clone()))?;
if let Err(_) = self.builds_tx.blocking_send(id.clone()) {
tracing::warn!(id = ?id, "could not send build to stream of builds, listeners will ignore");
}
loop {
// TODO: pass in `done` to check more frequently...
let current_value = build.value();
match build_payload(&build.context, current_value, &self.client, &self.pool) {
Ok(BuildOutcome::BetterOrEqual(payload_with_payments)) => {
let mut state = build.state.lock().unwrap();
*state = payload_with_payments;
state.payload_with_payments = payload_with_payments;
}
Ok(BuildOutcome::Worse { threshold, provided }) => {
tracing::info!(
Expand All @@ -380,12 +397,8 @@ impl<Pool: TransactionPool, Client: StateProviderFactory + BlockReaderIdExt> Bui
}
Err(err) => tracing::warn!("error building payload: {err}"),
}
match done.try_recv() {
Ok(_) => return Ok(()),
Err(oneshot::error::TryRecvError::Empty) => {}
Err(oneshot::error::TryRecvError::Closed) => {
return Err(Error::UnexpectedChannelClosure)
}
if build.is_stopped() {
return Ok(())
}
// TODO: handle backpressure on how frequently we go to build...
std::thread::sleep(std::time::Duration::from_secs(1))
Expand Down
3 changes: 2 additions & 1 deletion mev-build-rs/src/reth_builder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/// Build payloads suitable for submission to `mev-boost` relays
/// using `reth` as an execution client.
mod bidder;
mod build;
mod builder;
mod error;
Expand All @@ -9,5 +10,5 @@ mod reth_ext;
mod service;
mod types;

pub use builder::Builder;
pub use bidder::DeadlineBidder;
pub use service::{Config, Service};
2 changes: 1 addition & 1 deletion mev-build-rs/src/reth_builder/reth_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
///
/// This module essentially implements a "no-op" builder from the point of view of `reth`,
/// and provides a touch point to signal new payload attributes to this crate's builder.
use crate::reth_builder::Builder;
use crate::reth_builder::builder::Builder;
use reth_payload_builder::{
error::PayloadBuilderError, BuiltPayload, KeepPayloadJobAlive, PayloadBuilderAttributes,
PayloadId, PayloadJob, PayloadJobGenerator,
Expand Down
Loading

0 comments on commit 96c8c5b

Please sign in to comment.