diff --git a/Cargo.lock b/Cargo.lock index fe398faf..1a4885f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4539,6 +4539,7 @@ dependencies = [ "ssz_rs", "thiserror", "tokio", + "tokio-stream", "tracing", "url", ] diff --git a/mev-build-rs/Cargo.toml b/mev-build-rs/Cargo.toml index e46ddeb1..7bb5c610 100644 --- a/mev-build-rs/Cargo.toml +++ b/mev-build-rs/Cargo.toml @@ -8,6 +8,7 @@ license = "MIT OR Apache-2.0" [dependencies] tokio = { version = "1.0", features = ["full"] } +tokio-stream = "0.1.14" tracing = "0.1" futures = "0.3.21" async-trait = "0.1.53" @@ -24,6 +25,7 @@ ssz_rs = { workspace = true } mev-rs = { path = "../mev-rs" } + revm = { workspace = true } reth-payload-builder = { workspace = true } reth-primitives = { workspace = true } diff --git a/mev-build-rs/src/mempool_builder/builder/build.rs b/mev-build-rs/src/mempool_builder/builder/build.rs new file mode 100644 index 00000000..6ccb415f --- /dev/null +++ b/mev-build-rs/src/mempool_builder/builder/build.rs @@ -0,0 +1,200 @@ +use crate::mempool_builder::builder::{Error, RelayIndex}; +use ethereum_consensus::{ + capella::mainnet as spec, + crypto::{hash, SecretKey}, + primitives::{BlsPublicKey, Bytes32, ExecutionAddress, Slot}, + ssz::{ByteList, ByteVector}, + state_transition::Context, +}; +use ethers::signers::LocalWallet; +use mev_rs::{ + signing::sign_builder_message, + types::{capella, BidTrace, ExecutionPayload, SignedBidSubmission}, +}; +use reth_primitives::{Bloom, Bytes, ChainSpec, SealedBlock, Withdrawal, H160, H256, U256}; +use revm::primitives::{BlockEnv, CfgEnv}; +use ssz_rs::prelude::*; +use std::sync::{Arc, Mutex}; + +pub type BuildIdentifier = Bytes32; + +fn to_bytes32(value: H256) -> Bytes32 { + Bytes32::try_from(value.as_bytes()).unwrap() +} + +fn to_bytes20(value: H160) -> ExecutionAddress { + ExecutionAddress::try_from(value.as_bytes()).unwrap() +} + +fn to_byte_vector(value: Bloom) -> ByteVector<256> { + ByteVector::<256>::try_from(value.as_bytes()).unwrap() +} + +fn to_u256(value: &U256) -> ssz_rs::U256 { + ssz_rs::U256::try_from_bytes_le(&value.to_le_bytes::<32>()).unwrap() +} + +fn to_execution_payload(value: &SealedBlock) -> ExecutionPayload { + let hash = value.hash(); + let header = &value.header; + let transactions = &value.body; + let withdrawals = &value.withdrawals; + let transactions = transactions + .iter() + .map(|t| spec::Transaction::try_from(t.envelope_encoded().as_ref()).unwrap()) + .collect::>(); + let withdrawals = withdrawals + .as_ref() + .unwrap() + .iter() + .map(|w| spec::Withdrawal { + index: w.index as usize, + validator_index: w.validator_index as usize, + address: to_bytes20(w.address), + amount: w.amount, + }) + .collect::>(); + + let payload = capella::ExecutionPayload { + parent_hash: to_bytes32(header.parent_hash), + fee_recipient: to_bytes20(header.beneficiary), + state_root: to_bytes32(header.state_root), + receipts_root: to_bytes32(header.receipts_root), + logs_bloom: to_byte_vector(header.logs_bloom), + prev_randao: to_bytes32(header.mix_hash), + block_number: header.number, + gas_limit: header.gas_limit, + gas_used: header.gas_used, + timestamp: header.timestamp, + extra_data: ByteList::try_from(header.extra_data.as_ref()).unwrap(), + base_fee_per_gas: ssz_rs::U256::from(header.base_fee_per_gas.unwrap_or_default()), + block_hash: to_bytes32(hash), + transactions: TryFrom::try_from(transactions).unwrap(), + withdrawals: TryFrom::try_from(withdrawals).unwrap(), + }; + ExecutionPayload::Capella(payload) +} + +fn make_submission( + signing_key: &SecretKey, + builder_public_key: &BlsPublicKey, + context: &Context, + build_context: &BuildContext, + payload: &SealedBlock, + payment: &U256, +) -> Result { + let mut message = BidTrace { + slot: build_context.slot, + parent_hash: to_bytes32(payload.parent_hash), + block_hash: to_bytes32(payload.hash), + builder_public_key: builder_public_key.clone(), + proposer_public_key: build_context.proposer.clone(), + proposer_fee_recipient: build_context.proposer_fee_recipient.clone(), + gas_limit: payload.gas_limit, + gas_used: payload.gas_used, + value: to_u256(&payment), + }; + let execution_payload = match to_execution_payload(payload) { + ExecutionPayload::Bellatrix(_) => unimplemented!(), + ExecutionPayload::Capella(payload) => payload, + ExecutionPayload::Deneb(_) => unimplemented!(), + }; + let signature = sign_builder_message(&mut message, signing_key, context)?; + Ok(SignedBidSubmission { message, execution_payload, signature }) +} + +#[derive(Debug, Clone)] +pub struct BuildContext { + pub slot: Slot, + pub parent_hash: H256, + pub proposer: BlsPublicKey, + pub timestamp: u64, + pub proposer_fee_recipient: ExecutionAddress, + pub prev_randao: H256, + pub withdrawals: Vec, + pub relays: Vec, + pub chain_spec: Arc, + pub block_env: BlockEnv, + pub cfg_env: CfgEnv, + pub extra_data: Bytes, + pub parent_block: Arc, + pub builder_wallet: LocalWallet, + // Amount of gas to reserve after building a payload + // e.g. used for end-of-block proposer payments + pub gas_reserve: u64, + // Amount of the block's value to bid to the proposer + pub bid_percent: f64, + // Amount to add to the block's value to bid to the proposer + pub subsidy: U256, +} + +pub fn compute_build_id(slot: Slot, parent_hash: H256, proposer: &BlsPublicKey) -> BuildIdentifier { + let mut data = Vec::with_capacity(88); + slot.serialize(&mut data).expect("can serialize"); + parent_hash.serialize(&mut data).expect("can serialize"); + proposer.serialize(&mut data).expect("can serialize"); + hash(data) +} + +impl BuildContext { + pub fn id(&self) -> BuildIdentifier { + compute_build_id(self.slot, self.parent_hash, &self.proposer) + } + + pub fn base_fee(&self) -> u64 { + self.block_env.basefee.to::() + } + + pub fn number(&self) -> u64 { + self.block_env.number.to::() + } + + pub fn gas_limit(&self) -> u64 { + self.block_env.gas_limit.try_into().unwrap_or(u64::MAX) + } +} + +#[derive(Debug)] +pub struct Build { + pub context: BuildContext, + pub state: Mutex, +} + +type State = PayloadWithPayments; + +impl Build { + pub fn new(context: BuildContext, payload_with_payments: PayloadWithPayments) -> Self { + Self { context, state: Mutex::new(payload_with_payments) } + } + + pub fn value(&self) -> U256 { + let state = self.state.lock().unwrap(); + state.proposer_payment + } + + pub fn prepare_bid( + &self, + secret_key: &SecretKey, + public_key: &BlsPublicKey, + context: &Context, + ) -> Result<(SignedBidSubmission, U256), Error> { + let build_context = &self.context; + let state = self.state.lock().unwrap(); + let payload = &state.payload; + let payment = &state.proposer_payment; + let builder_payment = state.builder_payment; + Ok(( + make_submission(secret_key, public_key, context, build_context, payload, payment)?, + builder_payment, + )) + } +} + +#[derive(Debug)] +pub struct PayloadWithPayments { + // TODO: refactor here to `Option<_>` + // can migrate to "take" pattern and minimize lock contention + pub payload: SealedBlock, + pub proposer_payment: U256, + pub builder_payment: U256, +} diff --git a/mev-build-rs/src/mempool_builder/builder/compat.rs b/mev-build-rs/src/mempool_builder/builder/compat.rs deleted file mode 100644 index 93a36a02..00000000 --- a/mev-build-rs/src/mempool_builder/builder/compat.rs +++ /dev/null @@ -1,65 +0,0 @@ -use ethereum_consensus::{ - capella::mainnet as spec, - primitives::{Bytes32, ExecutionAddress}, - ssz::{ByteList, ByteVector}, -}; -use mev_rs::types::{capella, ExecutionPayload}; -use reth_primitives::{Bloom, SealedBlock, H160, H256, U256}; -use ssz_rs::prelude::*; - -pub fn to_bytes32(value: H256) -> Bytes32 { - Bytes32::try_from(value.as_bytes()).unwrap() -} - -pub fn to_bytes20(value: H160) -> ExecutionAddress { - ExecutionAddress::try_from(value.as_bytes()).unwrap() -} - -pub fn to_byte_vector(value: Bloom) -> ByteVector<256> { - ByteVector::<256>::try_from(value.as_bytes()).unwrap() -} - -pub fn to_u256(value: U256) -> ssz_rs::U256 { - ssz_rs::U256::try_from_bytes_le(&value.to_le_bytes::<32>()).unwrap() -} - -pub fn to_execution_payload(value: &SealedBlock) -> ExecutionPayload { - let hash = value.hash(); - let header = &value.header; - let transactions = &value.body; - let withdrawals = &value.withdrawals; - let transactions = transactions - .iter() - .map(|t| spec::Transaction::try_from(t.envelope_encoded().as_ref()).unwrap()) - .collect::>(); - let withdrawals = withdrawals - .as_ref() - .unwrap() - .iter() - .map(|w| spec::Withdrawal { - index: w.index as usize, - validator_index: w.validator_index as usize, - address: to_bytes20(w.address), - amount: w.amount, - }) - .collect::>(); - - let payload = capella::ExecutionPayload { - parent_hash: to_bytes32(header.parent_hash), - fee_recipient: to_bytes20(header.beneficiary), - state_root: to_bytes32(header.state_root), - receipts_root: to_bytes32(header.receipts_root), - logs_bloom: to_byte_vector(header.logs_bloom), - prev_randao: to_bytes32(header.mix_hash), - block_number: header.number, - gas_limit: header.gas_limit, - gas_used: header.gas_used, - timestamp: header.timestamp, - extra_data: ByteList::try_from(header.extra_data.as_ref()).unwrap(), - base_fee_per_gas: ssz_rs::U256::from(header.base_fee_per_gas.unwrap_or_default()), - block_hash: to_bytes32(hash), - transactions: TryFrom::try_from(transactions).unwrap(), - withdrawals: TryFrom::try_from(withdrawals).unwrap(), - }; - ExecutionPayload::Capella(payload) -} diff --git a/mev-build-rs/src/mempool_builder/builder/mod.rs b/mev-build-rs/src/mempool_builder/builder/mod.rs index 34d0f01a..a9c9a4c0 100644 --- a/mev-build-rs/src/mempool_builder/builder/mod.rs +++ b/mev-build-rs/src/mempool_builder/builder/mod.rs @@ -1,99 +1,47 @@ -/// Contains logic to build payloads suitable for submission -/// to `mev-boost` relays using `reth` as an execution client. -/// Payload building logic is heavily inspired by -/// the `reth-basic-payload-builder` package in the `reth` codebase. -mod compat; +/// Build payloads suitable for submission to `mev-boost` relays +/// using `reth` as an execution client. +mod build; mod payload_builder; -mod payload_job_generator; +mod reth_ext; -use compat::*; +pub(crate) use build::*; use payload_builder::*; use ethereum_consensus::{ - crypto::{hash, SecretKey}, - primitives::{BlsPublicKey, Bytes32, Epoch, ExecutionAddress, Hash32, Slot}, + crypto::SecretKey, + primitives::{BlsPublicKey, Epoch, ExecutionAddress, Slot}, state_transition::{Context, Error as ConsensusError}, }; use ethers::signers::{LocalWallet, Signer}; -use mev_rs::{ - blinded_block_relayer::BlindedBlockRelayer, - signing::sign_builder_message, - types::{BidTrace, ExecutionPayload, ProposerSchedule, SignedBidReceipt, SignedBidSubmission}, - Relay, -}; +use mev_rs::{blinded_block_relayer::BlindedBlockRelayer, types::ProposerSchedule, Relay}; use reth_interfaces::Error as RethError; use reth_payload_builder::PayloadBuilderAttributes; -use reth_primitives::{BlockNumberOrTag, Bytes, ChainSpec, SealedBlock, Withdrawal, H256, U256}; +use reth_primitives::{BlockNumberOrTag, Bytes, ChainSpec, H256, U256}; use reth_provider::BlockSource; -use revm::primitives::{BlockEnv, CfgEnv, EVMError}; +use revm::primitives::EVMError; use ssz_rs::prelude::*; use std::{ - cmp, collections::{BTreeMap, HashMap}, ops::Deref, sync::{Arc, Mutex}, }; use thiserror::Error; -use tokio::sync::{mpsc::Sender, oneshot}; - -#[derive(Default, PartialEq, Eq, Debug, Clone)] -pub struct PayloadWithFees { - payload: SealedBlock, - // amount this payload will pay the proposer - fees: U256, - // amount this payload will pay the builder - revenue: U256, -} - -impl cmp::PartialOrd for PayloadWithFees { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.fees.cmp(&other.fees)) - } -} - -impl cmp::Ord for PayloadWithFees { - fn cmp(&self, other: &Self) -> cmp::Ordering { - self.partial_cmp(other).expect("always returns some cmp") - } -} +use tokio::sync::{mpsc, oneshot}; +use tokio_stream::{wrappers::ReceiverStream, Stream}; -fn make_submission( - signing_key: &SecretKey, - builder_public_key: &BlsPublicKey, - context: &Context, - build_context: &BuildContext, - payload: &SealedBlock, - payment: U256, -) -> Result { - let mut message = BidTrace { - slot: build_context.slot, - parent_hash: to_bytes32(payload.parent_hash), - block_hash: to_bytes32(payload.hash), - builder_public_key: builder_public_key.clone(), - proposer_public_key: build_context.proposer.clone(), - proposer_fee_recipient: build_context.proposer_fee_recipient.clone(), - gas_limit: payload.gas_limit, - gas_used: payload.gas_used, - value: to_u256(payment), - }; - let execution_payload = match to_execution_payload(payload) { - ExecutionPayload::Capella(payload) => payload, - ExecutionPayload::Bellatrix(_) => unimplemented!(), - }; - let signature = sign_builder_message(&mut message, signing_key, context)?; - Ok(SignedBidSubmission { message, execution_payload, signature }) -} +pub(crate) type RelayIndex = usize; +type ValidatorPreference = (ExecutionAddress, u64); #[derive(Error, Debug)] pub enum Error { #[error("no validators registered for slot {0}")] NoRegisteredValidatorsForSlot(Slot), #[error("already building for identifier {0:?}")] - DuplicateJobRequest(BuildIdentifier), + DuplicatebuildRequest(BuildIdentifier), #[error("channel was unexpectedly closed")] UnexpectedChannelClosure, - #[error("missing a job request with identifier {0}")] - MissingJob(BuildIdentifier), + #[error("missing a build request with identifier {0}")] + MissingBuild(BuildIdentifier), #[error("missing parent block {0}")] MissingParentBlock(H256), #[error("{0}")] @@ -104,19 +52,21 @@ pub enum Error { Reth(#[from] RethError), #[error("evm execution error: {0:?}")] Execution(EVMError), + #[error("{0}")] + Internal(&'static str), } /// `Builder` builds blocks for proposers registered to connected relays. /// Block contents are sourced from the public mempool. #[derive(Clone, Debug)] -pub struct Builder(Arc>); +pub struct Builder(Arc>); // `Send` and `Sync` so we can have builder implement the required `reth` payload builder traits. -unsafe impl Send for Builder {} -unsafe impl Sync for Builder {} +unsafe impl Send for Builder {} +unsafe impl Sync for Builder {} -impl Deref for Builder { - type Target = Inner; +impl Deref for Builder { + type Target = Inner; fn deref(&self) -> &Self::Target { &self.0 @@ -124,109 +74,62 @@ impl Deref for Builder { } #[derive(Debug)] -pub struct Inner { +pub struct Inner { secret_key: SecretKey, public_key: BlsPublicKey, context: Arc, - payload_attributes_tx: Sender, relays: Vec, pool: Pool, - provider: Provider, + client: Client, chain_spec: Arc, extra_data: Bytes, builder_wallet: LocalWallet, + payload_attributes_tx: mpsc::Sender, state: Mutex, } -type RelayIndex = usize; -type ValidatorPreferences = (ExecutionAddress, u64); - -pub type BuildIdentifier = Bytes32; - -#[derive(Debug, Clone)] -pub struct BuildContext { - pub slot: Slot, - pub parent_hash: H256, - pub proposer: BlsPublicKey, - pub timestamp: u64, - pub proposer_fee_recipient: ExecutionAddress, - pub prev_randao: Hash32, - pub withdrawals: Vec, - pub relays: Vec, - pub chain_spec: Arc, - pub block_env: BlockEnv, - pub cfg_env: CfgEnv, - pub extra_data: Bytes, - pub parent_block: Arc, - pub builder_wallet: LocalWallet, - // Amount of gas to reserve after building a payload - // e.g. used for end-of-block proposer payments - pub gas_reserve: u64, - // Amount of the block's value to bid to the proposer - pub bid_percent: f64, - // Amount to add to the block's value to bid to the proposer - pub subsidy: U256, -} - -impl BuildContext { - pub fn id(&self) -> BuildIdentifier { - let mut data = Vec::with_capacity(88); - self.slot.serialize(&mut data).expect("can serialize"); - self.parent_hash.serialize(&mut data).expect("can serialize"); - self.proposer.serialize(&mut data).expect("can serialize"); - hash(data) - } - - pub fn base_fee(&self) -> u64 { - self.block_env.basefee.to::() - } - - pub fn number(&self) -> u64 { - self.block_env.number.to::() - } - - pub fn gas_limit(&self) -> u64 { - self.block_env.gas_limit.try_into().unwrap_or(u64::MAX) - } -} - #[derive(Default, Debug)] struct State { + payload_attributes_rx: Option>, + // TODO: merge in `ProposerScheduler` here? proposer_schedule: - BTreeMap>>>, - jobs: HashMap, Arc)>, + BTreeMap>>>, + builds: HashMap>, } -impl< - Pool: reth_transaction_pool::TransactionPool, - Provider: reth_provider::StateProviderFactory + reth_provider::BlockReaderIdExt, - > Builder -{ +impl Builder { pub fn new( secret_key: SecretKey, context: Arc, - payload_attributes_tx: Sender, relays: Vec, pool: Pool, - provider: Provider, + client: Client, chain_spec: Arc, extra_data: Bytes, builder_wallet: LocalWallet, ) -> Self { let public_key = secret_key.public_key(); + let (tx, rx) = mpsc::channel::(16); + + let state = State { + payload_attributes_rx: Some(rx), + proposer_schedule: Default::default(), + builds: Default::default(), + }; + Self(Arc::new(Inner { secret_key, public_key, context, - payload_attributes_tx, relays, pool, - provider, + client, chain_spec, builder_wallet, extra_data, - state: Default::default(), + payload_attributes_tx: tx, + state: Mutex::new(state), })) } @@ -279,62 +182,89 @@ impl< } tracing::info!("processing slot {slot}"); - // TODO: consider handling elsewhere to not block `state` during critical time at slot - // boundary + // TODO: consider check lock contention let mut state = self.state.lock().unwrap(); if let Some((earliest_slot, _)) = state.proposer_schedule.first_key_value() { for entry in *earliest_slot..slot { state.proposer_schedule.remove(&entry); } } - state.jobs.retain(|_, (context, _)| context.slot >= slot - 1); + state.builds.retain(|_, build| build.context.slot >= slot - 1); } - // if `process_payload_attributes` identifies a chance to build a payload - // for the incoming `payload_attributes`, it will construct the required context - // internally (including an "empty" payload ready for submission), and return - // a unique `BuildIdentifier` that can be used to sample the job. - pub async fn process_payload_attributes( - &self, - slot: Slot, - payload_attributes: PayloadBuilderAttributes, - ) -> Result { - let payload_id = payload_attributes.payload_id(); - tracing::trace!("got payload attributes to build {payload_id:?} in slot {slot}"); + pub fn on_payload_attributes(&self, payload_attributes: PayloadBuilderAttributes) { + if let Err(err) = self.payload_attributes_tx.blocking_send(payload_attributes) { + tracing::warn!(err = ?err, "could not accept payload attributes"); + } + } + pub fn stream_payload_attributes( + &self, + ) -> Result, Error> { let mut state = self.state.lock().unwrap(); - let eligible_proposals = state - .proposer_schedule - .get(&slot) - .ok_or_else(|| Error::NoRegisteredValidatorsForSlot(slot))?; + let rx = state.payload_attributes_rx.take(); + if let Some(rx) = rx { + Ok(ReceiverStream::new(rx)) + } else { + Err(Error::Internal("can only yield payload attributes stream once")) + } + } - // TODO: should defer to our own view of consensus: - // currently, if there is more than one element in `eligible_proposals` - // then there are multiple views across our relay set... - // let's simplify the return type here by picking the "majority view"... - let (proposer, preferences) = eligible_proposals - .iter() - .max_by(|(_, relay_set_a), (_, relay_set_b)| relay_set_a.len().cmp(&relay_set_b.len())) - .ok_or_else(|| Error::NoRegisteredValidatorsForSlot(slot))?; - // TODO: think about handling divergent relay views - // similarly, let's just service the "majority" relays for now... - let (preference, relays) = preferences - .iter() - .max_by(|(_, relay_set_a), (_, relay_set_b)| relay_set_a.len().cmp(&relay_set_b.len())) - .ok_or_else(|| Error::NoRegisteredValidatorsForSlot(slot))?; + pub fn build_for(&self, id: &BuildIdentifier) -> Option> { + self.state.lock().unwrap().builds.get(id).map(Clone::clone) + } - let parent_hash = payload_attributes.parent; + // TODO: pull out into "bidder" component + // TODO: support dynamic bidding, over "static" bidding with fixed percent payment + pub async fn submit_bid(&self, id: &BuildIdentifier) -> Result<(), Error> { + let build = self.build_for(id).ok_or_else(|| Error::MissingBuild(id.clone()))?; + + let context = &build.context; + + let (signed_submission, builder_payment) = + build.prepare_bid(&self.secret_key, &self.public_key, &self.context)?; + + // TODO: make calls concurrently + let relays = &context.relays; + for index in relays { + let relay = &self.relays[*index]; + tracing::info!(id = ?id, proposer_payment = ?signed_submission.message.value, builder_payment = ?builder_payment, relay = ?relay, bid = ?signed_submission, "submitting bid"); + match relay.submit_bid(&signed_submission).await { + Ok(_) => tracing::info!(id = ?id, relay = ?relay, "successfully submitted bid"), + Err(err) => { + tracing::warn!(err = ?err, id = ?id, relay = ?relay, "error submitting bid"); + } + } + } + + Ok(()) + } +} - // construct build context... +impl< + Pool: reth_transaction_pool::TransactionPool, + Client: reth_provider::StateProviderFactory + reth_provider::BlockReaderIdExt, + > Builder +{ + // NOTE: this is held inside a lock currently, minimize work here + fn construct_build_context( + &self, + slot: Slot, + parent_hash: H256, + proposer: &BlsPublicKey, + payload_attributes: PayloadBuilderAttributes, + validator_preference: &ValidatorPreference, + relays: &[RelayIndex], + ) -> Result { let parent_block = if parent_hash.is_zero() { // use latest block if parent is zero: genesis block - self.provider + self.client .block_by_number_or_tag(BlockNumberOrTag::Latest)? .ok_or_else(|| Error::MissingParentBlock(payload_attributes.parent))? .seal_slow() } else { let block = self - .provider + .client .find_block_by_hash(payload_attributes.parent, BlockSource::Any)? .ok_or_else(|| Error::MissingParentBlock(payload_attributes.parent))?; @@ -347,19 +277,21 @@ impl< payload_attributes.cfg_and_block_env(&self.chain_spec, &parent_block); // TODO: ensure this matches the protocol's tolerance - block_env.gas_limit = U256::from(preference.1); - // NOTE: fee collection strategy: drive all fees to builder + block_env.gas_limit = U256::from(validator_preference.1); + + // TODO: configurable "fee collection strategy" + // fee collection strategy: drive all fees to builder block_env.coinbase = self.builder_wallet.address().into(); - let build_context = BuildContext { + let context = BuildContext { slot, parent_hash, proposer: proposer.clone(), timestamp: payload_attributes.timestamp, - proposer_fee_recipient: preference.0.clone(), - prev_randao: to_bytes32(payload_attributes.prev_randao), + proposer_fee_recipient: validator_preference.0.clone(), + prev_randao: payload_attributes.prev_randao, withdrawals: payload_attributes.withdrawals, - relays: relays.clone(), + relays: relays.into(), chain_spec: self.chain_spec.clone(), block_env, cfg_env, @@ -370,46 +302,87 @@ impl< bid_percent: 0.9, subsidy: U256::from(10u64).pow(U256::from(18)), }; + Ok(context) + } - let build_identifier = build_context.id(); + // Determine if a new build build should be created for the given context + // fixed by `slot` and `payload_attributes`. + // If a new build should be created, then do so and return the unique identifier + // to the caller. If no new build should be created, `None` is returned. + pub fn process_payload_attributes( + &self, + slot: Slot, + payload_attributes: PayloadBuilderAttributes, + ) -> Result, Error> { + let mut state = self.state.lock().unwrap(); + let eligible_proposals = state + .proposer_schedule + .get(&slot) + .ok_or_else(|| Error::NoRegisteredValidatorsForSlot(slot))?; - if state.jobs.contains_key(&build_identifier) { - Err(Error::DuplicateJobRequest(build_identifier)) - } else { - // TODO: issue holding lock for this long? - let payload = - build_payload_with_no_transactions(&build_context, &self.provider, &self.pool)?; - state - .jobs - .insert(build_identifier.clone(), (Arc::new(build_context), Arc::new(payload))); - Ok(build_identifier) + // TODO: should defer to our own view of consensus: + // currently, if there is more than one element in `eligible_proposals` + // then there are multiple views across our relay set... + // let's simplify the return type here by picking the "majority view"... + let (proposer, preferences) = eligible_proposals + .iter() + .max_by(|(_, relay_set_a), (_, relay_set_b)| relay_set_a.len().cmp(&relay_set_b.len())) + .ok_or_else(|| Error::NoRegisteredValidatorsForSlot(slot))?; + // TODO: think about handling divergent relay views + // similarly, let's just service the "majority" relays for now... + let (validator_preference, relays) = preferences + .iter() + .max_by(|(_, relay_set_a), (_, relay_set_b)| relay_set_a.len().cmp(&relay_set_b.len())) + .ok_or_else(|| Error::NoRegisteredValidatorsForSlot(slot))?; + + let parent_hash = payload_attributes.parent; + let build_identifier = compute_build_id(slot, parent_hash, proposer); + + if state.builds.contains_key(&build_identifier) { + return Ok(None) } + + let context = self.construct_build_context( + slot, + parent_hash, + proposer, + payload_attributes, + validator_preference, + relays, + )?; + + let payload_with_payments = + build_payload_with_no_transactions(&context, &self.client, &self.pool)?; + + let build = Arc::new(Build::new(context, payload_with_payments)); + state.builds.insert(build_identifier.clone(), build); + Ok(Some(build_identifier)) } - pub fn build( + // Drives the build referenced by `id`. Inside a context where blocking is ok. + pub fn construct_best_payload( &self, - build_identifier: &BuildIdentifier, + id: &BuildIdentifier, mut done: oneshot::Receiver<()>, ) -> Result<(), Error> { - let (context, mut best_payload) = { - let state = self.state.lock().unwrap(); - let job = state.jobs.get(build_identifier).expect("only send valid jobs"); - job.clone() - }; + let build = self.build_for(id).ok_or_else(|| Error::MissingBuild(id.clone()))?; loop { // TODO: pass in `done` to check more frequently... - match build_payload(&context, &self.provider, &self.pool) { - Ok(next) => { - if &next >= &best_payload { - best_payload = Arc::new(next); - let mut state = self.state.lock().unwrap(); - let job = state - .jobs - .get_mut(build_identifier) - .expect("only called for valid jobs"); - job.1 = best_payload.clone(); + let current_value = build.value(); + match build_payload(&build.context, current_value, &self.client, &self.pool) { + Ok(outcome) => match outcome { + BuildOutcome::BetterOrEqual(payload_with_payments) => { + let mut state = build.state.lock().unwrap(); + *state = payload_with_payments; } - } + BuildOutcome::Worse { threshold, provided } => { + tracing::info!( + threshold = ?threshold, + provided = ?provided, + "discarding built payload that did not exceed current value" + ); + } + }, Err(err) => tracing::warn!("error building payload: {err}"), } match done.try_recv() { @@ -423,50 +396,4 @@ impl< std::thread::sleep(std::time::Duration::from_secs(1)) } } - - pub fn build_context_for(&self, id: &BuildIdentifier) -> Option> { - self.state.lock().unwrap().jobs.get(id).map(|job| job.0.clone()) - } - - pub fn payload_for(&self, id: &BuildIdentifier) -> Option> { - self.state.lock().unwrap().jobs.get(id).map(|job| job.1.clone()) - } - - // TODO: support dynamic bidding, over "static" bidding with fixed percent payment - pub async fn bid(&self, id: &BuildIdentifier) -> Result, Error> { - let build_context = - self.build_context_for(id).ok_or_else(|| Error::MissingJob(id.clone()))?; - let payload_with_fees = - self.payload_for(id).ok_or_else(|| Error::MissingJob(id.clone()))?; - - let payload = &payload_with_fees.payload; - let fees = &payload_with_fees.fees; - let revenue = &payload_with_fees.revenue; - - let signed_submission = make_submission( - &self.secret_key, - &self.public_key, - &self.context, - &build_context, - &payload, - *fees, - )?; - - let relays = &build_context.relays; - let mut receipts = vec![]; - for index in relays { - let relay = &self.relays[*index]; - tracing::info!(id = ?id, payment = ?fees, revenue = ?revenue, relay = ?relay, bid = ?signed_submission, "submitting bid"); - match relay.submit_bid(&signed_submission).await { - Ok(receipt) => receipts.push(receipt), - Err(err) => tracing::warn!(err = ?err, "error submitting to relay"), - } - } - - if receipts.len() == relays.len() { - tracing::trace!("successfully submitted bid to all eligible relays"); - } - - Ok(receipts) - } } diff --git a/mev-build-rs/src/mempool_builder/builder/payload_builder.rs b/mev-build-rs/src/mempool_builder/builder/payload_builder.rs index eb85c0b0..c29029ab 100644 --- a/mev-build-rs/src/mempool_builder/builder/payload_builder.rs +++ b/mev-build-rs/src/mempool_builder/builder/payload_builder.rs @@ -1,4 +1,6 @@ -use crate::mempool_builder::builder::{BuildContext, Error, PayloadWithFees}; +/// Payload building logic is heavily inspired by +/// the `reth-basic-payload-builder` package in the `reth` codebase. +use crate::mempool_builder::builder::{BuildContext, Error, PayloadWithPayments}; use ethers::{ signers::Signer, types::{ @@ -50,8 +52,18 @@ pub fn build_payload_with_no_transactions< context: &BuildContext, provider: &Provider, pool: &Pool, -) -> Result { - build_payload_with_transaction_count(context, provider, pool, 0) +) -> Result { + let outcome = build_payload_with_transaction_count(context, U256::ZERO, provider, pool, 0)?; + match outcome { + BuildOutcome::BetterOrEqual(result) => Ok(result), + BuildOutcome::Worse { .. } => unreachable!("invalid use of inner function"), + } +} + +pub enum BuildOutcome { + BetterOrEqual(PayloadWithPayments), + // The `provided` value that did not exceed the `threshold` value requested + Worse { threshold: U256, provided: U256 }, } pub fn build_payload_with_transaction_count< @@ -59,10 +71,11 @@ pub fn build_payload_with_transaction_count< Pool: reth_transaction_pool::TransactionPool, >( context: &BuildContext, + threshold_value: U256, provider: &Provider, pool: &Pool, tx_count: usize, -) -> Result { +) -> Result { let parent_hash = context.parent_hash; let state = State::new(provider.state_by_block_hash(parent_hash)?); @@ -137,7 +150,9 @@ pub fn build_payload_with_transaction_count< executed_txs.push(tx.into_signed()); } - // TODO can abort early if not better fee-wise + if total_fees < threshold_value { + return Ok(BuildOutcome::Worse { threshold: threshold_value, provided: total_fees }) + } // NOTE: next chunk of code implements end-of-block proposer payments // TODO: refactor/encapsulate this lol @@ -250,7 +265,11 @@ pub fn build_payload_with_transaction_count< }; let payload = payload.seal_slow(); - Ok(PayloadWithFees { payload, fees: total_payment, revenue }) + Ok(BuildOutcome::BetterOrEqual(PayloadWithPayments { + payload, + proposer_payment: total_payment, + builder_payment: revenue, + })) } pub fn build_payload< @@ -258,12 +277,9 @@ pub fn build_payload< Provider: reth_provider::StateProviderFactory + reth_provider::BlockReaderIdExt, >( context: &BuildContext, + threshold_value: U256, provider: &Provider, pool: &Pool, -) -> Result { - let id = context.id(); - - tracing::trace!(slot = context.slot, id = %id, "building best payload"); - - build_payload_with_transaction_count(context, provider, pool, usize::MAX) +) -> Result { + build_payload_with_transaction_count(context, threshold_value, provider, pool, usize::MAX) } diff --git a/mev-build-rs/src/mempool_builder/builder/payload_job_generator.rs b/mev-build-rs/src/mempool_builder/builder/reth_ext.rs similarity index 63% rename from mev-build-rs/src/mempool_builder/builder/payload_job_generator.rs rename to mev-build-rs/src/mempool_builder/builder/reth_ext.rs index 5432bb31..4f9524c5 100644 --- a/mev-build-rs/src/mempool_builder/builder/payload_job_generator.rs +++ b/mev-build-rs/src/mempool_builder/builder/reth_ext.rs @@ -1,5 +1,9 @@ +/// Implement the required functionality to interface with the `reth` payload builder +/// functionality, primarily `PayloadJobGenerator`. +/// +/// This module essentially implements a "no-op" builder from the point of view of `reth`, +/// and provides a touch point to signal new payload attributes to this crate's builder. use crate::mempool_builder::Builder; -use futures::FutureExt; use reth_payload_builder::{ error::PayloadBuilderError, BuiltPayload, KeepPayloadJobAlive, PayloadBuilderAttributes, PayloadId, PayloadJob, PayloadJobGenerator, @@ -8,13 +12,11 @@ use reth_primitives::{SealedBlock, U256}; use std::{ future::{self, Future, Ready}, sync::Arc, - task::{ready, Poll}, + task::Poll, }; -use tokio::sync::mpsc::Sender; pub struct Job { - payload_attributes: PayloadBuilderAttributes, - tx: Sender, + payload_id: PayloadId, } impl Future for Job { @@ -22,41 +24,39 @@ impl Future for Job { fn poll( self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, + _cx: &mut std::task::Context<'_>, ) -> std::task::Poll { - let this = self.get_mut(); - - let attrs = this.payload_attributes.clone(); - let mut send = Box::pin(this.tx.send(attrs)); - let _ = ready!(send.poll_unpin(cx)); Poll::Ready(Ok(())) } } -impl PayloadJobGenerator for Builder { +impl< + Pool: reth_transaction_pool::TransactionPool, + Provider: reth_provider::StateProviderFactory + reth_provider::BlockReaderIdExt, + > PayloadJobGenerator for Builder +{ type Job = Job; fn new_payload_job( &self, attr: PayloadBuilderAttributes, ) -> Result { - Ok(Job { payload_attributes: attr, tx: self.payload_attributes_tx.clone() }) + let job = Job { payload_id: attr.payload_id() }; + self.on_payload_attributes(attr); + Ok(job) } } -// NOTE: implement `PayloadJob` to satisfy the `reth` trait machinery but -// these functions should never be called, as the default configuration would only use them -// when the CL calls for them. impl PayloadJob for Job { type ResolvePayloadFuture = Ready, PayloadBuilderError>>; fn best_payload(&self) -> Result, PayloadBuilderError> { - let payload = Arc::new(build_default_payload(self.payload_attributes.payload_id())); + let payload = Arc::new(build_default_payload(self.payload_id)); Ok(payload) } fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) { - let payload = Arc::new(build_default_payload(self.payload_attributes.payload_id())); + let payload = Arc::new(build_default_payload(self.payload_id)); (future::ready(Ok(payload)), KeepPayloadJobAlive::No) } } diff --git a/mev-build-rs/src/mempool_builder/service.rs b/mev-build-rs/src/mempool_builder/service.rs index 2f4724fc..37b76e14 100644 --- a/mev-build-rs/src/mempool_builder/service.rs +++ b/mev-build-rs/src/mempool_builder/service.rs @@ -1,18 +1,14 @@ use crate::mempool_builder::builder::{BuildIdentifier, Builder}; -use beacon_api_client::mainnet::Client; +use beacon_api_client::mainnet::Client as ApiClient; use ethereum_consensus::{crypto::SecretKey, state_transition::Context}; use ethers::signers::{coins_bip39::English, MnemonicBuilder, Signer}; use futures::StreamExt; use mev_rs::{Error, Network, Relay, RelayEndpoint}; -use reth_payload_builder::PayloadBuilderAttributes; use reth_primitives::{Bytes, ChainSpec}; use serde::Deserialize; use std::{future::Future, pin::Pin, sync::Arc, task::Poll, time::Duration}; use tokio::{ - sync::{ - mpsc::{self, Receiver}, - oneshot, - }, + sync::{mpsc, oneshot}, task::{JoinError, JoinHandle}, }; use url::Url; @@ -27,11 +23,10 @@ pub struct Config { pub execution_mnemonic: String, } -pub struct Service { - builder: Builder, - client: Client, +pub struct Service { + builder: Builder, + api_client: ApiClient, context: Arc, - payload_attributes_rx: Receiver, } fn parse_relays(urls: &[String]) -> Vec { @@ -56,24 +51,23 @@ fn parse_relays(urls: &[String]) -> Vec { impl< Pool: reth_transaction_pool::TransactionPool + 'static, - Provider: reth_provider::StateProviderFactory + reth_provider::BlockReaderIdExt + Clone + 'static, - > Service + Client: reth_provider::StateProviderFactory + reth_provider::BlockReaderIdExt + Clone + 'static, + > Service { pub fn from( config: Config, context: Option, pool: Pool, - provider: Provider, + client: Client, chain_spec: Arc, - ) -> Result<(Self, Builder), Error> { + ) -> Result<(Self, Builder), Error> { let secret_key = config.secret_key; let network = &config.network; let context = if let Some(context) = context { context } else { Context::try_from(network)? }; let context = Arc::new(context); let beacon_api_endpoint: Url = config.beacon_api_endpoint.parse().unwrap(); - let client = Client::new(beacon_api_endpoint); - let (tx, rx) = mpsc::channel(16); + let api_client = ApiClient::new(beacon_api_endpoint); let relays = parse_relays(&config.relays); let mut derivation_index = 0; @@ -94,27 +88,29 @@ impl< let builder = Builder::new( secret_key, context.clone(), - tx, relays, pool, - provider, + client, chain_spec, Bytes::from(config.extra_data.as_bytes()), builder_wallet, ); - Ok(( - Service { builder: builder.clone(), client, context, payload_attributes_rx: rx }, - builder, - )) + Ok((Service { builder: builder.clone(), api_client, context }, builder)) } pub async fn spawn(self) -> Result { - let Self { builder, client, context, mut payload_attributes_rx } = self; + let Self { builder, api_client, context } = self; - let genesis_details = client.get_genesis_details().await?; - // let genesis_validators_root = genesis_details.genesis_validators_root; + // TODO: refactor so we only call the client if we are on a network we don't support already + let genesis_details = api_client.get_genesis_details().await?; let clock = context.clock(Some(genesis_details.genesis_time)); + if clock.before_genesis() { + let genesis = clock.duration_until_next_slot(); + tracing::warn!(duration = ?genesis, "waiting until genesis"); + tokio::time::sleep(genesis).await; + } + let slot_provider = clock.clone(); let current_epoch = clock.current_epoch().unwrap(); @@ -155,37 +151,49 @@ impl< tokio::time::sleep(start).await; - if let Err(err) = builder.bid(&id).await { - tracing::warn!("error submitting bid to relay: {err}"); + if let Err(err) = builder.submit_bid(&id).await { + tracing::warn!("error submitting bid to relays: {err}"); } // NOTE: `send` to stop job for this `build_identifier` + // TODO: handle job expiry and clean up let _ = done_tx.send(()); } } }); let builder = tokio::spawn(async move { - loop { - // NOTE / TODO: can refactor to just call `process_payload_attributes` inside the - // `reth` callback then, this is just a stream of jobs to start... - if let Some(attrs) = payload_attributes_rx.recv().await { - let slot = slot_provider.slot_at_time(attrs.timestamp).expect("past genesis"); - match builder.process_payload_attributes(slot, attrs).await { - Ok(build_identifier) => { - let builder = builder.clone(); - let id = build_identifier.clone(); - let (done_tx, done_rx) = oneshot::channel(); - tokio::task::spawn_blocking(move || { - if let Err(err) = builder.build(&id, done_rx) { - tracing::warn!("failed to build payload for {id}: {err}"); - } - }); - let _ = to_bidder.send((build_identifier, done_tx)).await; - } - Err(err) => { - tracing::warn!("{err}"); - } + // TODO: move to Future> + let payload_attributes = match builder.stream_payload_attributes() { + Ok(stream) => stream, + Err(err) => { + tracing::error!(err = ?err, "could not open payload attributes stream"); + return + } + }; + + tokio::pin!(payload_attributes); + + while let Some(attrs) = payload_attributes.next().await { + let slot = slot_provider.slot_at_time(attrs.timestamp).expect("past genesis"); + match builder.process_payload_attributes(slot, attrs) { + Ok(Some(build_identifier)) => { + let builder = builder.clone(); + let id = build_identifier.clone(); + let (done_tx, done_rx) = oneshot::channel(); + tokio::task::spawn_blocking(move || { + tracing::trace!(id = ?id, "starting build"); + if let Err(err) = builder.construct_best_payload(&id, done_rx) { + tracing::warn!(id = ?id, err = ?err, "failed to progress build"); + } + }); + let _ = to_bidder.send((build_identifier, done_tx)).await; + } + Ok(_) => { + tracing::trace!(slot = ?slot, "skipping processing of duplicate payload attributes") + } + Err(err) => { + tracing::warn!(err = ?err, "could not process payload attributes"); } } } diff --git a/mev-rs/src/blinded_block_relayer/mod.rs b/mev-rs/src/blinded_block_relayer/mod.rs index 11b375ef..00d39390 100644 --- a/mev-rs/src/blinded_block_relayer/mod.rs +++ b/mev-rs/src/blinded_block_relayer/mod.rs @@ -14,6 +14,5 @@ use async_trait::async_trait; pub trait BlindedBlockRelayer { async fn get_proposal_schedule(&self) -> Result, Error>; - // TODO: support cancellations? async fn submit_bid(&self, signed_submission: &SignedBidSubmission) -> Result<(), Error>; } diff --git a/mev-rs/src/relay.rs b/mev-rs/src/relay.rs index 87b8a1dc..0b8032c8 100644 --- a/mev-rs/src/relay.rs +++ b/mev-rs/src/relay.rs @@ -2,7 +2,7 @@ use crate::{ blinded_block_provider::Client as BlockProvider, blinded_block_relayer::{BlindedBlockRelayer, Client as Relayer}, error::Error, - types::{ProposerSchedule, SignedBidReceipt, SignedBidSubmission}, + types::{ProposerSchedule, SignedBidSubmission}, }; use async_trait::async_trait; use beacon_api_client::Client as BeaconClient; @@ -68,10 +68,7 @@ impl BlindedBlockRelayer for Relay { self.relayer.get_proposal_schedule().await } - async fn submit_bid( - &self, - signed_submission: &SignedBidSubmission, - ) -> Result { + async fn submit_bid(&self, signed_submission: &SignedBidSubmission) -> Result<(), Error> { self.relayer.submit_bid(signed_submission).await } } diff --git a/mev-rs/src/types/mod.rs b/mev-rs/src/types/mod.rs index b7d007d9..d8cb9b2f 100644 --- a/mev-rs/src/types/mod.rs +++ b/mev-rs/src/types/mod.rs @@ -380,5 +380,7 @@ pub struct BidTrace { #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct SignedBidSubmission { pub message: BidTrace, + // TODO: support multiple forks + pub execution_payload: capella::ExecutionPayload, pub signature: BlsSignature, }