Skip to content

Commit

Permalink
Merge pull request #226 from ralexstokes/integrate-kurtosis
Browse files Browse the repository at this point in the history
debug #222
  • Loading branch information
ralexstokes authored May 1, 2024
2 parents 91f416c + 72d1459 commit 08c20a3
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 54 deletions.
5 changes: 4 additions & 1 deletion mev-boost-rs/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ impl Service {
let relay_task = tokio::spawn(async move {
let relay_mux = relay_mux_clone;
let mut slots = clock.clone().into_stream();
let mut current_epoch = clock.current_epoch().expect("after genesis");

// NOTE: this will block until genesis if we are before the genesis time
let current_slot = slots.next().await.expect("some next slot");
let mut current_epoch = clock.epoch_for(current_slot);

while let Some(slot) = slots.next().await {
relay_mux.on_slot(slot);
Expand Down
8 changes: 5 additions & 3 deletions mev-build-rs/src/auctioneer/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,14 +271,16 @@ impl<
// TODO: send more dynamic updates
// by the time the bidder submits a value the best payload may have
// already changed
tx.send(Ok(payload.fees())).expect("can send");
tx.send(Some(payload.fees())).expect("can send");
return
}
Err(err) => warn!(%err, "could not get best payload from payload store"),
}
}
// fallback
tx.send(Err(Error::MissingPayload(payload_id))).expect("can send");
// NOTE: if no payload was found, the auction has been terminated
if let Err(err) = tx.send(None) {
warn!(?err, "could not send after failure to retrieve payload");
}
}
BidderMessage::Dispatch { payload_id, value: _value, keep_alive: _keep_alive } => {
// TODO: forward keep alive signal to builder
Expand Down
1 change: 1 addition & 0 deletions mev-build-rs/src/bidder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub use strategies::Config;
/// Do we expect to submit more bids or not?
#[derive(Debug, Clone, Copy)]
pub enum KeepAlive {
No,
Yes,
}

Expand Down
16 changes: 5 additions & 11 deletions mev-build-rs/src/bidder/service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::{
auctioneer::AuctionContext,
bidder::{strategies::BasicStrategy, Bid, Config, KeepAlive},
Error,
};
use ethereum_consensus::clock::duration_until;
use reth::{
Expand All @@ -15,7 +14,6 @@ use tokio::{
},
time::{sleep, timeout},
};
use tracing::debug;

/// All bidding routines stop this many seconds *after* the timestamp of the proposal
/// regardless of what the bidding strategy suggests
Expand All @@ -24,7 +22,7 @@ pub const DEFAULT_BIDDING_DEADLINE_AFTER_SLOT: u64 = 1;
pub enum Message {
NewAuction(Arc<AuctionContext>),
Dispatch { payload_id: PayloadId, value: U256, keep_alive: KeepAlive },
RevenueQuery(PayloadId, oneshot::Sender<Result<U256, Error>>),
RevenueQuery(PayloadId, oneshot::Sender<Option<U256>>),
}

pub struct Service {
Expand Down Expand Up @@ -64,14 +62,10 @@ impl Service {
let message = Message::RevenueQuery(payload_id, tx);
auctioneer.send(message).await.expect("can send");
let current_revenue = match rx.await.expect("can recv") {
Ok(fees) => fees,
Err(err) => {
// NOTE: if there was an error, try to fetch
// again without running a strategy
// TODO: handle case when the auction has terminated and we should
// also terminate
debug!(%err, "could not get current revenue; trying again");
continue
Some(fees) => fees,
None => {
// auction has ended
break
}
};

Expand Down
20 changes: 4 additions & 16 deletions mev-build-rs/src/bidder/strategies/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ use ethereum_consensus::clock::duration_until;
use reth::{api::PayloadBuilderAttributes, primitives::U256};
use serde::Deserialize;
use std::time::Duration;
use tokio::time::{interval, Interval};

pub const DEFAULT_BID_INTERVAL: u64 = 1;

#[derive(Deserialize, Debug, Default, Clone)]
pub struct Config {
Expand All @@ -22,25 +19,19 @@ pub struct Config {
pub subsidy_wei: Option<U256>,
}

/// `DeadlineBidder` submits the best payload *once* at the `deadline`
/// expressed as a `Duration` *before* the start of the build's target slot.
///
/// For example, if the `deadline` is 1 second, then the bidder will return
/// a value to bid one second before the start of the build's target slot.
/// `BasicStrategy` submits a bid once, waiting until a duration of `wait_until`
/// before the start of the target slot.
pub struct BasicStrategy {
wait_until: Duration,
bid_interval: Interval,
bid_percent: f64,
subsidy_wei: U256,
}

impl BasicStrategy {
pub fn new(config: &Config) -> Self {
let wait_until = Duration::from_millis(config.wait_until_ms);
let bid_interval = interval(Duration::from_secs(DEFAULT_BID_INTERVAL));
Self {
wait_until,
bid_interval,
bid_percent: config.bid_percent.unwrap_or(1.0).clamp(0.0, 1.0),
subsidy_wei: config.subsidy_wei.unwrap_or(U256::ZERO),
}
Expand All @@ -60,11 +51,8 @@ impl BasicStrategy {
return Bid::Wait(wait_until)
}

// If we are near the auction deadline, start submitting bids
// with one bid per tick of the interval
self.bid_interval.tick().await;

// Then, we submit our bid
let value = self.compute_value(current_revenue);
Bid::Submit { value, keep_alive: KeepAlive::Yes }
Bid::Submit { value, keep_alive: KeepAlive::No }
}
}
4 changes: 1 addition & 3 deletions mev-build-rs/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use alloy_signer_wallet::WalletError;
use ethereum_consensus::Error as ConsensusError;
use reth::payload::{error::PayloadBuilderError, PayloadId};
use reth::payload::error::PayloadBuilderError;
use thiserror::Error;

#[derive(Error, Debug)]
Expand All @@ -11,6 +11,4 @@ pub enum Error {
PayloadBuilderError(#[from] PayloadBuilderError),
#[error(transparent)]
WalletError(#[from] WalletError),
#[error("could not find payload {0}")]
MissingPayload(PayloadId),
}
35 changes: 16 additions & 19 deletions mev-build-rs/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@ use reth::{
use reth_db::DatabaseEnv;
use serde::Deserialize;
use std::{path::PathBuf, sync::Arc};
use tokio::{
sync::{
broadcast::{self, Sender},
mpsc,
},
time::sleep,
use tokio::sync::{
broadcast::{self, Sender},
mpsc,
};
use tokio_stream::StreamExt;
use tracing::warn;
Expand Down Expand Up @@ -61,7 +58,6 @@ pub struct Services<
pub bidder: Bidder,
pub clock: SystemClock,
pub clock_tx: Sender<ClockMessage>,
pub context: Arc<Context>,
}

pub async fn construct_services<
Expand Down Expand Up @@ -91,13 +87,13 @@ pub async fn construct_services<
bidder_tx,
bid_dispatch_rx,
config.auctioneer,
context.clone(),
context,
genesis_time,
);

let bidder = Bidder::new(bidder_rx, bid_dispatch_tx, task_executor, config.bidder);

Ok(Services { auctioneer, bidder, clock, clock_tx, context })
Ok(Services { auctioneer, bidder, clock, clock_tx })
}

fn custom_network_from_config_directory(path: PathBuf) -> Network {
Expand Down Expand Up @@ -139,26 +135,27 @@ pub async fn launch(

let task_executor = handle.node.task_executor.clone();
let payload_builder = handle.node.payload_builder.clone();
let Services { auctioneer, bidder, clock, clock_tx, context } =
let Services { auctioneer, bidder, clock, clock_tx } =
construct_services(network, config, task_executor, payload_builder).await?;

handle.node.task_executor.spawn_critical_blocking("mev-builder/auctioneer", auctioneer.spawn());
handle.node.task_executor.spawn_critical_blocking("mev-builder/bidder", bidder.spawn());
handle.node.task_executor.spawn_critical("mev-builder/clock", async move {
if clock.before_genesis() {
let duration = clock.duration_until_next_slot();
warn!(?duration, "waiting until genesis");
sleep(duration).await;
}
let mut slots = clock.clone().into_stream();

// NOTE: this will block until genesis if we are before the genesis time
let current_slot = slots.next().await.expect("some next slot");
let mut current_epoch = clock.epoch_for(current_slot);

// TODO: block on sync here to avoid spurious first PA?

let mut current_epoch = clock.current_epoch().expect("past genesis");
clock_tx.send(ClockMessage::NewEpoch(current_epoch)).expect("can send");
if let Err(err) = clock_tx.send(ClockMessage::NewEpoch(current_epoch)) {
let msg = err.0;
warn!(?msg, "could not update receivers with new epoch");
}

let mut slots = clock.into_stream();
while let Some(slot) = slots.next().await {
let epoch = slot / context.slots_per_epoch;
let epoch = clock.epoch_for(slot);
if epoch > current_epoch {
current_epoch = epoch;
if let Err(err) = clock_tx.send(ClockMessage::NewEpoch(epoch)) {
Expand Down
6 changes: 5 additions & 1 deletion mev-relay-rs/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,12 @@ impl Service {
let relay = tokio::spawn(async move {
let mut slots = clock.clone().into_stream();

let mut current_epoch = clock.current_epoch().expect("after genesis");
// NOTE: this will block until genesis if we are before the genesis time
let current_slot = slots.next().await.expect("some next slot");
let mut current_epoch = clock.epoch_for(current_slot);

relay.on_epoch(current_epoch).await;

while let Some(slot) = slots.next().await {
let epoch = clock.epoch_for(slot);
if epoch > current_epoch {
Expand Down

0 comments on commit 08c20a3

Please sign in to comment.