diff --git a/Cargo.lock b/Cargo.lock index 55fd621db..9ef11f048 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1595,20 +1595,54 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core 0.4.5", + "bytes", + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.8.1", + "hyper-util", + "itoa", + "matchit 0.7.3", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper 1.0.2", + "tokio", + "tower 0.5.2", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "axum" version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b098575ebe77cb6d14fc7f32749631a6e44edbef6b796f89b020e99ba20d425" dependencies = [ - "axum-core", + "axum-core 0.5.5", "bytes", "futures-util", "http 1.4.0", "http-body 1.0.1", "http-body-util", "itoa", - "matchit", + "matchit 0.8.4", "memchr", "mime", "percent-encoding", @@ -1620,6 +1654,27 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 1.0.2", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "axum-core" version = "0.5.5" @@ -6954,6 +7009,12 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "matchit" version = "0.8.4" @@ -9463,6 +9524,7 @@ dependencies = [ "alloy-trie 0.8.1", "assert_matches", "async-trait", + "axum 0.7.9", "beacon-api-client", "bid-scraper", "bigdecimal 0.4.9", @@ -9486,6 +9548,7 @@ dependencies = [ "foldhash 0.1.5", "futures", "governor", + "hex", "itertools 0.11.0", "jsonrpsee 0.20.4", "lazy_static", @@ -9545,6 +9608,7 @@ dependencies = [ "toml 0.8.23", "tonic 0.13.1", "tonic-build", + "tower 0.4.13", "tracing", "tracing-subscriber 0.3.22", "url", @@ -13616,6 +13680,17 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" +dependencies = [ + "itoa", + "serde", + "serde_core", +] + [[package]] name = "serde_qs" version = "0.8.5" @@ -15130,7 +15205,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" dependencies = [ "async-trait", - "axum", + "axum 0.8.7", "base64 0.22.1", "bytes", "h2 0.4.12", diff --git a/crates/rbuilder-operator/src/flashbots_config.rs b/crates/rbuilder-operator/src/flashbots_config.rs index 1d91e0520..cc07ca45e 100644 --- a/crates/rbuilder-operator/src/flashbots_config.rs +++ b/crates/rbuilder-operator/src/flashbots_config.rs @@ -174,6 +174,7 @@ impl LiveBuilderConfig for FlashbotsConfig { bid_observer, bidding_service.clone(), cancellation_token.clone(), + None, // No EPBS block observer in flashbots config ) .await?; diff --git a/crates/rbuilder-primitives/src/epbs/bid.rs b/crates/rbuilder-primitives/src/epbs/bid.rs new file mode 100644 index 000000000..930919f8d --- /dev/null +++ b/crates/rbuilder-primitives/src/epbs/bid.rs @@ -0,0 +1,106 @@ +//! ExecutionPayloadBid types for EPBS. +//! +//! These types represent the builder's commitment to produce an execution payload. +//! See: https://github.com/ethereum/consensus-specs/blob/master/specs/gloas/builder.md + +use alloy_primitives::{Address, BlockHash, Bytes, B256}; +use alloy_rpc_types_beacon::BlsSignature; +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DisplayFromStr}; + +/// Signing domain for EPBS builder bids. +/// From consensus-specs/specs/gloas/beacon-chain.md: +/// | DOMAIN_BEACON_BUILDER | DomainType('0x0B000000') | +pub const DOMAIN_BEACON_BUILDER: [u8; 4] = [0x0B, 0x00, 0x00, 0x00]; + +/// from consensus-specs/specs/gloas/beacon-chain.md: +#[serde_as] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct ExecutionPayloadBid { + /// hash of the current head of execution chain + pub parent_block_hash: BlockHash, + /// hash tree root of the beacon block the proposer will build on + pub parent_block_root: B256, + /// this is the blockhash which the builder constructed the payload + pub block_hash: BlockHash, + /// previous RANDAO of the constructed payload + pub prev_randao: B256, + /// execution address to receive the payment + pub fee_recipient: Address, + /// gas limit of the constructed payload + #[serde_as(as = "DisplayFromStr")] + pub gas_limit: u64, + /// validator index of the builder performing these actions. + #[serde_as(as = "DisplayFromStr")] + pub builder_index: u64, + /// to be the slot for which this bid is aimed. + #[serde_as(as = "DisplayFromStr")] + pub slot: u64, + /// to be the value (in gwei) that the builder will pay the proposer if the bid is accepted + #[serde_as(as = "DisplayFromStr")] + pub value: u64, + /// must be zero for in protocol payments. non-zero only if proposer accepts trusted payments + #[serde_as(as = "DisplayFromStr")] + pub execution_payment: u64, + /// blob commitments for the payload. + /// per spec: List[KZGCommitment, MAX_BLOB_COMMITMENTS_PER_BLOCK] + pub blob_kzg_commitments: Vec, +} + +impl ExecutionPayloadBid { + /// Returns the total payment to the proposer (value + execution_payment). + pub fn total_value(&self) -> u64 { + self.value.saturating_add(self.execution_payment) + } + + /// Returns true if this bid uses only in-protocol (beacon chain) payment. + pub fn is_in_protocol_payment(&self) -> bool { + self.execution_payment == 0 + } +} + +/// SignedExecutionPayloadBid is a signed commitment from a builder. +/// +/// signature is created using the builder's validator key and the +/// DOMAIN_BEACON_BUILDER domain. +/// +/// from consensus-specs/specs/gloas/beacon-chain.md: + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct SignedExecutionPayloadBid { + /// execution payload + pub message: ExecutionPayloadBid, + /// bls signature over the bid using the builder's validator key. + pub signature: BlsSignature, +} + +/// resp for get_bid endpoint. +/// +/// This follows the Builder API spec for EPBS: +/// GET /eth/v1/builder/execution_payload_bid/{slot}/{parent_hash}/{parent_root}/{proposer_index} +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GetExecutionPayloadBidResponse { + /// The fork version, e.g., "gloas". + pub version: String, + /// signed bid using validator signature + pub data: SignedExecutionPayloadBid, +} + +/// the params are for the get_bid endpoint following the builder-sepc +#[derive(Debug, Clone)] +pub struct GetBidParams { + /// slot for which the bid is being considered for + pub slot: u64, + /// hash of the parent block the proposer will upon + pub parent_hash: BlockHash, + /// root of the parent block the proposer will build upon + pub parent_root: B256, + /// to be reitrved from the path params + pub proposer_index: u64, + /// address from the X-Fee-Recipient header + pub fee_recipient: Address, + /// timeout ms for request via X-Timeout-Ms header + pub timeout_ms: Option, + /// timestamp from Date-Milliseconds header for latency measurement + pub date_milliseconds: Option, +} diff --git a/crates/rbuilder-primitives/src/epbs/envelope.rs b/crates/rbuilder-primitives/src/epbs/envelope.rs new file mode 100644 index 000000000..47c7c8120 --- /dev/null +++ b/crates/rbuilder-primitives/src/epbs/envelope.rs @@ -0,0 +1,122 @@ +//! ExecutionPayloadEnvelope types for EPBS. +//! +//! These types represent the full execution payload that the builder reveals +//! after their bid is included in a beacon block. +//! See: https://github.com/ethereum/consensus-specs/blob/master/specs/gloas/builder.md + +use alloy_primitives::{Bytes, B256}; +use alloy_rpc_types_beacon::BlsSignature; +use alloy_rpc_types_engine::ExecutionPayloadV3; +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DisplayFromStr}; + +/// ExecutionPayloadEnvelope contains the full execution payload and associated data. +/// +/// This is revealed by the builder after their SignedExecutionPayloadBid is included +/// in a beacon block. The envelope is broadcast on the `execution_payload` P2P topic. +/// +/// From consensus-specs/specs/gloas/beacon-chain.md: + +#[serde_as] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ExecutionPayloadEnvelope { + /// The full execution payload. + /// TODO: This should be the Gloas-specific ExecutionPayload when available in Alloy. + pub payload: ExecutionPayloadV3, + /// Execution requests (deposits, withdrawals, consolidations). + /// TODO: Use proper ExecutionRequests type from Alloy when available. + pub execution_requests: ExecutionRequests, + /// Validator index of the builder. + #[serde_as(as = "DisplayFromStr")] + pub builder_index: u64, + /// Hash tree root of the beacon block that included this builder's bid. + pub beacon_block_root: B256, + /// Slot of the beacon block. + #[serde_as(as = "DisplayFromStr")] + pub slot: u64, + /// State root after applying the execution payload. + pub state_root: B256, +} + +/// Placeholder for ExecutionRequests until available in Alloy. +/// TODO: Replace with alloy_rpc_types_beacon::requests::ExecutionRequestsV4 or equivalent. +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +pub struct ExecutionRequests { + /// Deposit requests from the execution layer. + #[serde(default)] + pub deposits: Vec, + /// Withdrawal requests from the execution layer. + #[serde(default)] + pub withdrawals: Vec, + /// Consolidation requests from the execution layer. + #[serde(default)] + pub consolidations: Vec, +} + +/// SignedExecutionPayloadEnvelope is the envelope signed by the builder. +/// +/// From consensus-specs/specs/gloas/beacon-chain.md: +/// ```python +/// class SignedExecutionPayloadEnvelope(Container): +/// message: ExecutionPayloadEnvelope +/// signature: BLSSignature +/// ``` +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct SignedExecutionPayloadEnvelope { + /// The execution payload envelope message. + pub message: ExecutionPayloadEnvelope, + /// BLS signature over the envelope using the builder's validator key. + pub signature: BlsSignature, +} + +/// Cached payload data stored by the builder after creating a bid. +/// +/// When a builder creates an ExecutionPayloadBid, they must store the full +/// payload data so they can reveal it when/if their bid is accepted. +#[derive(Debug, Clone)] +pub struct CachedPayloadData { + /// The signed bid that was broadcast/returned. + pub bid: super::SignedExecutionPayloadBid, + /// The full execution payload (to be revealed later). + pub payload: ExecutionPayloadV3, + /// Execution requests associated with the payload. + pub execution_requests: ExecutionRequests, + /// Blob KZG commitments. + pub blob_kzg_commitments: Vec, + /// Timestamp when this cache entry was created. + pub created_at: std::time::Instant, +} + +impl CachedPayloadData { + /// Creates a new cached payload entry. + pub fn new( + bid: super::SignedExecutionPayloadBid, + payload: ExecutionPayloadV3, + execution_requests: ExecutionRequests, + blob_kzg_commitments: Vec, + ) -> Self { + Self { + bid, + payload, + execution_requests, + blob_kzg_commitments, + created_at: std::time::Instant::now(), + } + } + + /// Build the envelope from cached data and the beacon block info. + pub fn build_envelope( + &self, + beacon_block_root: B256, + state_root: B256, + ) -> ExecutionPayloadEnvelope { + ExecutionPayloadEnvelope { + payload: self.payload.clone(), + execution_requests: self.execution_requests.clone(), + builder_index: self.bid.message.builder_index, + beacon_block_root, + slot: self.bid.message.slot, + state_root, + } + } +} diff --git a/crates/rbuilder-primitives/src/epbs/mod.rs b/crates/rbuilder-primitives/src/epbs/mod.rs new file mode 100644 index 000000000..ac21317d1 --- /dev/null +++ b/crates/rbuilder-primitives/src/epbs/mod.rs @@ -0,0 +1,7 @@ +mod bid; +mod envelope; +mod proposer_preferences; + +pub use bid::*; +pub use envelope::*; +pub use proposer_preferences::*; diff --git a/crates/rbuilder-primitives/src/epbs/proposer_preferences.rs b/crates/rbuilder-primitives/src/epbs/proposer_preferences.rs new file mode 100644 index 000000000..b051f2639 --- /dev/null +++ b/crates/rbuilder-primitives/src/epbs/proposer_preferences.rs @@ -0,0 +1,28 @@ +use alloy_primitives::Address; +use alloy_rpc_types_beacon::BlsSignature; +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DisplayFromStr}; + +/// Proposer preferences broadcast via the `proposer_preferences` gossip topic. +/// from consensus-specs/specs/gloas/p2p-interface.md: +#[serde_as] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct ProposerPreferences { + /// slot at which the validator will propose. + #[serde_as(as = "DisplayFromStr")] + pub proposal_slot: u64, + ///validator index of the proposer. + #[serde_as(as = "DisplayFromStr")] + pub validator_index: u64, + /// proposers preferred fee recipient + pub fee_recipient: Address, + /// proposer's preferred gas limit + #[serde_as(as = "DisplayFromStr")] + pub gas_limit: u64, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct SignedProposerPreferences { + pub message: ProposerPreferences, + pub signature: BlsSignature, +} diff --git a/crates/rbuilder-primitives/src/lib.rs b/crates/rbuilder-primitives/src/lib.rs index 4a860e50e..d34e6e512 100644 --- a/crates/rbuilder-primitives/src/lib.rs +++ b/crates/rbuilder-primitives/src/lib.rs @@ -1,6 +1,7 @@ //! Order types used as elements for block building. pub mod built_block; +pub mod epbs; pub mod evm_inspector; pub mod fmt; pub mod mev_boost; diff --git a/crates/rbuilder-primitives/src/mev_boost/ssz_roots.rs b/crates/rbuilder-primitives/src/mev_boost/ssz_roots.rs index eb5d3029f..91f42b92a 100644 --- a/crates/rbuilder-primitives/src/mev_boost/ssz_roots.rs +++ b/crates/rbuilder-primitives/src/mev_boost/ssz_roots.rs @@ -145,3 +145,37 @@ pub fn sha_pair(a: &B256, b: &B256) -> B256 { h.update(b); B256::from_slice(&h.finalize()) } + +/// KZG commitment is 48 bytes. +#[derive(tree_hash_derive::TreeHash)] +struct KzgCommitment { + inner: FixedVector, +} + +impl From<&[u8]> for KzgCommitment { + fn from(bytes: &[u8]) -> Self { + let mut inner = vec![0u8; 48]; + let len = bytes.len().min(48); + inner[..len].copy_from_slice(&bytes[..len]); + Self { + inner: FixedVector::from(inner), + } + } +} + +/// MAX_BLOB_COMMITMENTS_PER_BLOCK for Gloas. +type MaxBlobCommitmentsPerBlock = typenum::U4096; + +/// Calculate SSZ hash_tree_root for blob KZG commitments. +/// +/// This computes the Merkle root of the list of KZG commitments as required +/// by the consensus specs for ExecutionPayloadBid.blob_kzg_commitments_root. +pub fn calculate_blob_kzg_commitments_root_ssz(commitments: &[impl AsRef<[u8]>]) -> B256 { + let commitments: VariableList = VariableList::from( + commitments + .iter() + .map(|c| KzgCommitment::from(c.as_ref())) + .collect::>(), + ); + B256::from_slice(&commitments.tree_hash_root()[..]) +} diff --git a/crates/rbuilder/Cargo.toml b/crates/rbuilder/Cargo.toml index 268a7b711..1c7974716 100644 --- a/crates/rbuilder/Cargo.toml +++ b/crates/rbuilder/Cargo.toml @@ -114,6 +114,9 @@ derivative.workspace = true mockall = "0.12.1" shellexpand = "3.1.0" async-trait = "0.1.80" +axum = "0.7" +hex = "0.4" +tower = "0.4" eth-sparse-mpt.workspace = true bid-scraper.workspace = true sysperf.workspace = true diff --git a/crates/rbuilder/src/beacon_api_client/mod.rs b/crates/rbuilder/src/beacon_api_client/mod.rs index 6fb05c7e7..1bf9605b8 100644 --- a/crates/rbuilder/src/beacon_api_client/mod.rs +++ b/crates/rbuilder/src/beacon_api_client/mod.rs @@ -1,16 +1,110 @@ +use alloy_primitives::B256; use alloy_rpc_types_beacon::events::PayloadAttributesEvent; use beacon_api_client::{mainnet::Client as bClient, Error, Topic}; use mev_share_sse::client::EventStream; -use serde::Deserialize; +use rbuilder_primitives::epbs::{ + ExecutionPayloadEnvelope, ExecutionRequests, SignedExecutionPayloadBid, + SignedExecutionPayloadEnvelope, SignedProposerPreferences, +}; +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DisplayFromStr}; use std::{collections::HashMap, fmt::Debug}; use url::Url; +#[derive(Debug, Clone, Deserialize)] +pub struct GenesisData { + #[serde(with = "serde_utils::quoted_u64")] + pub genesis_time: u64, + pub genesis_validators_root: B256, + #[serde(with = "serde_utils::bytes_4_hex")] + pub genesis_fork_version: [u8; 4], +} + +#[derive(Debug, Clone, Deserialize)] +struct GenesisResponse { + data: GenesisData, +} + +/// Validator data from the beacon chain. +#[derive(Debug, Clone, Deserialize)] +pub struct ValidatorData { + /// validator index + #[serde(with = "serde_utils::quoted_u64")] + pub index: u64, + /// validators balance in gwei + #[serde(with = "serde_utils::quoted_u64")] + pub balance: u64, + /// validators status + pub status: String, + /// validator details + pub validator: ValidatorDetails, +} + +/// Detailed validator information. +#[derive(Debug, Clone, Deserialize)] +pub struct ValidatorDetails { + pub pubkey: String, + pub withdrawal_credentials: String, + #[serde(with = "serde_utils::quoted_u64")] + pub effective_balance: u64, + pub slashed: bool, + #[serde(with = "serde_utils::quoted_u64")] + pub activation_eligibility_epoch: u64, + #[serde(with = "serde_utils::quoted_u64")] + pub activation_epoch: u64, + #[serde(with = "serde_utils::quoted_u64")] + pub exit_epoch: u64, + #[serde(with = "serde_utils::quoted_u64")] + pub withdrawable_epoch: u64, +} + +#[derive(Debug, Clone, Deserialize)] +struct ValidatorResponse { + data: ValidatorData, +} + +mod serde_utils { + use serde::{Deserialize, Deserializer}; + + pub mod quoted_u64 { + use super::*; + + pub fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + s.parse().map_err(serde::de::Error::custom) + } + } + + pub mod bytes_4_hex { + use super::*; + + pub fn deserialize<'de, D>(deserializer: D) -> Result<[u8; 4], D::Error> + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + let s = s.strip_prefix("0x").unwrap_or(&s); + let bytes = hex::decode(s).map_err(serde::de::Error::custom)?; + if bytes.len() != 4 { + return Err(serde::de::Error::custom("expected 4 bytes")); + } + let mut arr = [0u8; 4]; + arr.copy_from_slice(&bytes); + Ok(arr) + } + } +} + pub const DEFAULT_CL_NODE_URL: &str = "http://localhost:8000"; #[derive(Deserialize, Clone)] #[serde(try_from = "String")] pub struct Client { inner: bClient, + endpoint_url: Url, } impl Debug for Client { @@ -21,8 +115,10 @@ impl Debug for Client { impl Default for Client { fn default() -> Self { + let url = Url::parse(DEFAULT_CL_NODE_URL).unwrap(); Self { - inner: bClient::new(Url::parse(DEFAULT_CL_NODE_URL).unwrap()), + inner: bClient::new(url.clone()), + endpoint_url: url, } } } @@ -30,10 +126,15 @@ impl Default for Client { impl Client { pub fn new(endpoint: Url) -> Self { Self { - inner: bClient::new(endpoint), + inner: bClient::new(endpoint.clone()), + endpoint_url: endpoint, } } + pub fn endpoint(&self) -> &Url { + &self.endpoint_url + } + pub async fn get_spec(&self) -> Result, Error> { self.inner.get_spec().await } @@ -41,6 +142,218 @@ impl Client { pub async fn get_events(&self) -> Result, Error> { self.inner.get_events::().await } + + /// Fetch genesis data from the beacon chain. + /// Returns the genesis time, genesis validators root, and genesis fork version. + pub async fn get_genesis(&self) -> eyre::Result { + let url = self + .endpoint_url + .join("eth/v1/beacon/genesis") + .map_err(|e| eyre::eyre!("Invalid URL: {}", e))?; + + let response = reqwest::get(url).await?; + + if !response.status().is_success() { + return Err(eyre::eyre!("Failed to get genesis: {}", response.status())); + } + + let genesis_response: GenesisResponse = response.json().await?; + + Ok(genesis_response.data) + } + + /// Fetch validator data from the beacon chain by pubkey or index. + /// + /// The `validator_id` can be either: + /// - A hex encoded BLS public key + /// - A validator index as a string + pub async fn get_validator(&self, validator_id: &str) -> eyre::Result { + let path = format!("eth/v1/beacon/states/head/validators/{}", validator_id); + let url = self + .endpoint_url + .join(&path) + .map_err(|e| eyre::eyre!("Invalid URL: {}", e))?; + + let response = reqwest::get(url).await?; + + if response.status() == reqwest::StatusCode::NOT_FOUND { + return Err(eyre::eyre!( + "Validator not found: {}. Make sure the builder validator is registered and active on the beacon chain.", + validator_id + )); + } + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(eyre::eyre!( + "Failed to get validator {}: {} - {}", + validator_id, + status, + body + )); + } + + let validator_response: ValidatorResponse = response.json().await?; + + Ok(validator_response.data) + } + + /// Fetch validator data from the beacon chain by BLS public key. + /// jut a helper method that formats the public key correctly. + pub async fn get_validator_by_pubkey(&self, pubkey: &[u8]) -> eyre::Result { + let pubkey_hex = format!("0x{}", hex::encode(pubkey)); + self.get_validator(&pubkey_hex).await + } + + /// Submit a signed execution payload bid to p2p via the beacon node. + pub async fn submit_execution_payload_bid( + &self, + bid: &SignedExecutionPayloadBid, + ) -> eyre::Result<()> { + let url = self + .endpoint_url + .join("eth/v1/beacon/execution_payload_bid") + .map_err(|e| eyre::eyre!("Invalid URL: {}", e))?; + + let response = reqwest::Client::new() + .post(url) + .header("Eth-Consensus-Version", "gloas") + .json(bid) + .send() + .await?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(eyre::eyre!( + "Failed to submit execution payload bid: {} - {}", + status, + body + )); + } + + Ok(()) + } + + /// Submit a signed execution payload envelope to p2p via the beacon node. + pub async fn submit_execution_payload_envelope( + &self, + envelope: &SignedExecutionPayloadEnvelope, + ) -> eyre::Result<()> { + let url = self + .endpoint_url + .join("eth/v1/beacon/execution_payload_envelope") + .map_err(|e| eyre::eyre!("Invalid URL: {}", e))?; + + let response = reqwest::Client::new() + .post(url) + .header("Eth-Consensus-Version", "gloas") + .json(envelope) + .send() + .await?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(eyre::eyre!( + "Failed to submit execution payload envelope: {} - {}", + status, + body + )); + } + + Ok(()) + } + + /// Construct an unsigned execution payload envelope via the beacon node. + /// + /// Returns a complete unsigned envelope ready for the builder to sign. + // TODO: I am using the beacon api from this PR not merged yet + // please keep monitor it and see if it gets merged + // : https://github.com/ethereum/beacon-APIs/pull/584 + pub async fn construct_execution_payload_envelope( + &self, + beacon_block_root: B256, + payload: &alloy_rpc_types_engine::ExecutionPayloadV3, + execution_requests: &ExecutionRequests, + ) -> eyre::Result { + let url = self + .endpoint_url + .join("eth/v1/builder/execution_payload_envelope") + .map_err(|e| eyre::eyre!("Invalid URL: {}", e))?; + + let request_body = ConstructEnvelopeRequest { + version: "gloas".to_string(), + data: ConstructEnvelopeData { + beacon_block_root, + execution_payload: payload.clone(), + execution_requests: execution_requests.clone(), + }, + }; + + let response = reqwest::Client::new() + .post(url) + .header("Eth-Consensus-Version", "gloas") + .json(&request_body) + .send() + .await?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(eyre::eyre!( + "Failed to construct execution payload envelope: {} - {}", + status, + body + )); + } + + let envelope_response: ConstructEnvelopeResponse = response.json().await?; + Ok(envelope_response.data) + } + + /// Fetch a beacon block + /// used to check which bid was included in a beacon block after a head event. + pub async fn get_beacon_block_bid( + &self, + block_id: &str, + ) -> eyre::Result> { + let path = format!("eth/v2/beacon/blocks/{}", block_id); + let url = self + .endpoint_url + .join(&path) + .map_err(|e| eyre::eyre!("Invalid URL: {}", e))?; + + let response = reqwest::get(url).await?; + + if response.status() == reqwest::StatusCode::NOT_FOUND { + return Ok(None); + } + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(eyre::eyre!( + "Failed to get beacon block {}: {} - {}", + block_id, + status, + body + )); + } + // parsring to get the bid from the response. + let block_response: serde_json::Value = response.json().await?; + let bid = block_response + .get("data") + .and_then(|d| d.get("message")) + .and_then(|m| m.get("body")) + .and_then(|b| b.get("signed_execution_payload_bid")) + .map(|bid_value| serde_json::from_value::(bid_value.clone())) + .transpose() + .map_err(|e| eyre::eyre!("Failed to parse bid from block: {}", e))?; + + Ok(bid) + } } impl TryFrom for Client { @@ -48,7 +361,10 @@ impl TryFrom for Client { fn try_from(s: String) -> Result { let url = Url::parse(&s)?; - Ok(Client::new(url)) + Ok(Self { + inner: bClient::new(url.clone()), + endpoint_url: url, + }) } } @@ -60,6 +376,67 @@ impl Topic for PayloadAttributesTopic { type Data = PayloadAttributesEvent; } +/// SSE topic for head events from the beacon node. +pub struct HeadTopic; + +impl Topic for HeadTopic { + const NAME: &'static str = "head"; + + type Data = HeadEvent; +} + +/// SSE topic for execution payload bid events. +pub struct ExecutionPayloadBidTopic; + +impl Topic for ExecutionPayloadBidTopic { + const NAME: &'static str = "execution_payload_bid"; + + type Data = SignedExecutionPayloadBid; +} + +/// SSE topic for proposer preferences events. +pub struct ProposerPreferencesTopic; + +impl Topic for ProposerPreferencesTopic { + const NAME: &'static str = "proposer_preferences"; + + type Data = SignedProposerPreferences; +} + +/// Head event from the beacon node SSE stream. +#[serde_as] +#[derive(Debug, Clone, Deserialize)] +pub struct HeadEvent { + #[serde_as(as = "DisplayFromStr")] + pub slot: u64, + pub block: B256, + /// state root of the new head state. + pub state: B256, + #[serde(default)] + pub execution_optimistic: bool, +} + +/// Request body for POST /eth/v1/builder/execution_payload_envelope. +#[derive(Debug, Clone, Serialize)] +struct ConstructEnvelopeRequest { + version: String, + data: ConstructEnvelopeData, +} + +/// Data payload for the envelope construction request. +#[derive(Debug, Clone, Serialize)] +struct ConstructEnvelopeData { + beacon_block_root: B256, + execution_payload: alloy_rpc_types_engine::ExecutionPayloadV3, + execution_requests: ExecutionRequests, +} + +/// Response from POST /eth/v1/builder/execution_payload_envelope. +#[derive(Debug, Clone, Deserialize)] +struct ConstructEnvelopeResponse { + data: ExecutionPayloadEnvelope, +} + #[cfg(test)] mod tests { // TODO: Enable these tests. diff --git a/crates/rbuilder/src/live_builder/base_config.rs b/crates/rbuilder/src/live_builder/base_config.rs index 34e57ea87..1ffd60b5d 100644 --- a/crates/rbuilder/src/live_builder/base_config.rs +++ b/crates/rbuilder/src/live_builder/base_config.rs @@ -264,6 +264,8 @@ impl BaseConfig { simulation_use_random_coinbase: self.simulation_use_random_coinbase, faster_finalize: self.faster_finalize, order_flow_tracer_manager, + epbs_server: None, + epbs_p2p_service: None, }) } diff --git a/crates/rbuilder/src/live_builder/block_output/block_observer.rs b/crates/rbuilder/src/live_builder/block_output/block_observer.rs new file mode 100644 index 000000000..f2c98aa7d --- /dev/null +++ b/crates/rbuilder/src/live_builder/block_output/block_observer.rs @@ -0,0 +1,57 @@ +//! Block observer interface for notifying external systems of built blocks. +//! +//! This module provides the `BlockObserver` trait that allows components to be +//! notified when new blocks are built. The primary use case is notifying the +//! EPBS Builder API server of new blocks so it can generate bids. + +use crate::building::builders::Block; +use alloy_primitives::BlockHash; +use std::sync::Arc; + +/// Observer that receives notifications when blocks are built. +/// +/// Implementations can use these notifications to: +/// - Generate EPBS bids +pub trait BlockObserver: Send + Sync + std::fmt::Debug { + /// Called when a new block has been finalized and is ready for submission. + /// + /// # Arguments + /// * `slot` - The slot number this block is for + /// * `parent_hash` - The execution layer parent block hash + /// * `block` - The finalized block + /// + /// Note: The beacon chain `parent_root` is NOT passed here because it's not + /// available at block building time. The proposer provides it in the bid request. + fn on_block_built(&self, slot: u64, parent_hash: BlockHash, block: &Block); +} + +/// A no-op observer that does nothing. +/// Used as a default when no observer is configured. +#[derive(Debug, Clone, Default)] +pub struct NoOpBlockObserver; + +impl BlockObserver for NoOpBlockObserver { + fn on_block_built(&self, _slot: u64, _parent_hash: BlockHash, _block: &Block) { + // No-op + } +} + +/// A multi observer that forwards notifications to multiple observers. +#[derive(Debug)] +pub struct MultiBlockObserver { + observers: Vec>, +} + +impl MultiBlockObserver { + pub fn new(observers: Vec>) -> Self { + Self { observers } + } +} + +impl BlockObserver for MultiBlockObserver { + fn on_block_built(&self, slot: u64, parent_hash: BlockHash, block: &Block) { + for observer in &self.observers { + observer.on_block_built(slot, parent_hash, block); + } + } +} diff --git a/crates/rbuilder/src/live_builder/block_output/mod.rs b/crates/rbuilder/src/live_builder/block_output/mod.rs index 39d312ae5..756e43144 100644 --- a/crates/rbuilder/src/live_builder/block_output/mod.rs +++ b/crates/rbuilder/src/live_builder/block_output/mod.rs @@ -1,5 +1,6 @@ pub mod best_block_from_algorithms; pub mod bidding_service_interface; +pub mod block_observer; pub mod relay_submit; pub mod true_value_bidding_service; pub mod unfinished_block_processing; diff --git a/crates/rbuilder/src/live_builder/block_output/unfinished_block_processing.rs b/crates/rbuilder/src/live_builder/block_output/unfinished_block_processing.rs index 7fe3ba2fc..43dcb565c 100644 --- a/crates/rbuilder/src/live_builder/block_output/unfinished_block_processing.rs +++ b/crates/rbuilder/src/live_builder/block_output/unfinished_block_processing.rs @@ -9,6 +9,7 @@ use ahash::HashMap; /// 6. Bidding service asks to finalize that block with concrete proposer value /// 7. Finalized block is adjusted to pay chosen amount to the proposer (`finalize_worker` thread) /// 8. Resulting block is submitted to `BlockBuildingSink` (in running builder its used by a thread that submits block to relays). +/// 9. Block observers (like EPBS bid provider) are notified of the new block. /// /// Alternatively if configured (adjust_finalized_blocks = true) to run using old flow `prefinalize_worker` would not do anything with the block /// and `finalize_worker` would do full finalization instead of adjustment of the finalize block. @@ -56,6 +57,7 @@ use super::{ BiddingService, BlockSealInterfaceForSlotBidder, BuiltBlockDescriptorForSlotBidder, SlotBidder, SlotBidderSealBidCommand, }, + block_observer::BlockObserver, relay_submit::RelaySubmitSinkFactory, }; @@ -67,6 +69,7 @@ use crate::live_builder::building::built_block_cache::BuiltBlockCache; /// 1. UnfinishedBuiltBlocksInput and starts `prefinalize_worker` and `finalize_worker` threads. /// 2. SlotBidder from BiddingService to manage bidding values for the sealed blocks /// 3. BlockBuildingSink to send finished blocks for relay submission +/// 4. Notifies block observers (like EPBS bid provider) of finalized blocks. #[derive(Derivative)] #[derivative(Debug)] pub struct UnfinishedBuiltBlocksInputFactory

{ @@ -81,6 +84,9 @@ pub struct UnfinishedBuiltBlocksInputFactory

{ adjust_finalized_blocks: bool, /// relay sets well get on bids. relay_sets: Vec, + /// Optional block observer for EPBS and other integrations. + #[derivative(Debug = "ignore")] + block_observer: Option>, } impl UnfinishedBuiltBlocksInputFactory

{ @@ -97,9 +103,16 @@ impl UnfinishedBuiltBlocksInputFactory

{ wallet_balance_watcher, adjust_finalized_blocks, relay_sets, + block_observer: None, } } + /// Set the block observer for EPBS integration. + pub fn with_block_observer(mut self, observer: Arc) -> Self { + self.block_observer = Some(observer); + self + } + pub fn create_sink( &mut self, slot_data: MevBoostSlotData, @@ -146,6 +159,10 @@ impl UnfinishedBuiltBlocksInputFactory

{ .create_builder_sink(slot_data.clone(), cancel.clone()) .into(); + // extract slot info for block observer + let slot = slot_data.slot(); + let parent_hash = slot_data.payload_attributes_event.data.parent_block_hash; + for (relay_set, last_finalize_command) in input.last_finalize_commands.iter() { let finalized_blocks = input.pre_finalized_multi_blocks.clone(); let cancellation_token = cancel.clone(); @@ -153,6 +170,7 @@ impl UnfinishedBuiltBlocksInputFactory

{ let relay_set = relay_set.clone(); let last_finalize_command = last_finalize_command.clone(); let block_sink = block_sink.clone(); + let block_observer = self.block_observer.clone(); std::thread::Builder::new() .name("finalize_worker".into()) .spawn(move || { @@ -163,6 +181,9 @@ impl UnfinishedBuiltBlocksInputFactory

{ last_finalize_command, adjust_finalized_blocks, cancellation_token, + block_observer, + slot, + parent_hash, ) }) .unwrap(); @@ -597,6 +618,7 @@ impl UnfinishedBuiltBlocksInput { // finalize_worker impl UnfinishedBuiltBlocksInput { + #[allow(clippy::too_many_arguments)] fn run_finalize_thread( relay_set: RelaySet, block_building_sink: Arc, @@ -604,6 +626,9 @@ impl UnfinishedBuiltBlocksInput { last_finalize_command: Arc>, adjust_finalized_blocks: bool, cancellation_token: CancellationToken, + block_observer: Option>, + slot: u64, + parent_hash: alloy_primitives::BlockHash, ) { loop { if cancellation_token.is_cancelled() { @@ -668,6 +693,12 @@ impl UnfinishedBuiltBlocksInput { result.block.trace.chosen_as_best_at = finalize_command.prefinalized_block.chosen_as_best_at; result.block.trace.sent_to_bidder = finalize_command.prefinalized_block.sent_to_bidder; + + // Notify block observer (EPBS bid provider) of the new block + if let Some(observer) = &block_observer { + observer.on_block_built(slot, parent_hash, &result.block); + } + block_building_sink.new_block(relay_set.clone(), result.block); } } diff --git a/crates/rbuilder/src/live_builder/builder_api/bid_provider.rs b/crates/rbuilder/src/live_builder/builder_api/bid_provider.rs new file mode 100644 index 000000000..fcde4e540 --- /dev/null +++ b/crates/rbuilder/src/live_builder/builder_api/bid_provider.rs @@ -0,0 +1,465 @@ +//! EPBS Bid Provider - Integrates with the block building pipeline to generate bids. +//! +//! This module provides the `LiveEpbsBidProvider` which implements `EpbsBidProvider` +//! by connecting to the existing block building infrastructure. + +use alloy_primitives::{BlockHash, U256}; +use alloy_rpc_types_engine::ExecutionPayloadV3; +use parking_lot::RwLock; +use rbuilder_primitives::epbs::{ + CachedPayloadData, ExecutionPayloadBid, ExecutionRequests, GetBidParams, + SignedExecutionPayloadBid, +}; +use std::{collections::HashMap, sync::Arc, time::Instant}; +use tracing::{debug, info, trace}; + +use crate::{ + building::builders::Block, live_builder::block_output::block_observer::BlockObserver, + mev_boost::EpbsBidSigner, +}; +use alloy_primitives::Bytes; +use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2}; + +use super::EpbsBidProvider; + +/// Key for tracking best blocks by slot and parent. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct SlotParentKey { + pub slot: u64, + pub parent_hash: BlockHash, +} + +impl SlotParentKey { + pub fn from_params(params: &GetBidParams) -> Self { + Self { + slot: params.slot, + parent_hash: params.parent_hash, + } + } +} + +/// Cached block data for generating EPBS bids. +#[derive(Debug, Clone)] +pub struct CachedBlockData { + /// The built block from the building pipeline. + pub block: Block, + /// When this block was cached. + pub cached_at: Instant, + /// Slot this block is for. + pub slot: u64, +} + +/// Config for the LiveEpbsBidProvider. +#[derive(Debug, Clone)] +pub struct LiveEpbsBidProviderConfig { + /// max number of blocks to cache. + pub max_cached_blocks: usize, + /// max age of a cached block before it's considered stale. + pub max_block_age_ms: u64, +} + +impl Default for LiveEpbsBidProviderConfig { + fn default() -> Self { + Self { + max_cached_blocks: 100, + max_block_age_ms: 12_000, // one slot, but maybe we can also update it? + } + } +} + +/// Live EPBS Bid Provider that integrates with the block building pipeline. +/// +/// This provider: +/// 1. Receives built blocks from the block building pipeline +/// 2. Tracks the best block for each slot/parent combination +/// 3. Generates SignedExecutionPayloadBid on request +/// 4. Caches full payloads for later revelation + +pub struct LiveEpbsBidProvider { + /// Configuration. + config: LiveEpbsBidProviderConfig, + /// The signer for creating signed bids. Optional to support lazy initialization. + /// Contains the builder_index (looked up from beacon chain by public key). + signer: Arc>>, + /// Best blocks by slot/parent key. + best_blocks: RwLock>, + /// Cache of full payloads for revelation, keyed by block_hash. + payload_cache: Arc>>, +} + +impl LiveEpbsBidProvider { + /// Create a new LiveEpbsBidProvider with a signer. + pub fn new(signer: EpbsBidSigner, config: LiveEpbsBidProviderConfig) -> Self { + Self { + config, + signer: Arc::new(RwLock::new(Some(signer))), + best_blocks: RwLock::new(HashMap::new()), + payload_cache: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Create a new uninitialized LiveEpbsBidProvider. + /// + /// The signer must be set later using `set_signer()` before bids can be generated. + /// The builder_index will be obtained from the signer once it's set. + pub fn new_uninitialized(config: LiveEpbsBidProviderConfig) -> Self { + Self { + config, + signer: Arc::new(RwLock::new(None)), + best_blocks: RwLock::new(HashMap::new()), + payload_cache: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Set the signer for this provider. + /// + /// This is used for lazy initialization when the builder_index and signing domain + /// need to be fetched from the beacon chain after startup. + pub fn set_signer(&self, signer: EpbsBidSigner) { + *self.signer.write() = Some(signer); + } + + /// Check if the signer is ready. + pub fn is_ready(&self) -> bool { + self.signer.read().is_some() + } + + /// Get the builder index (if signer is initialized). + pub fn builder_index(&self) -> Option { + self.signer.read().as_ref().map(|s| s.builder_index()) + } + + /// Get a shared reference to the signer for use by the P2P service. + pub fn shared_signer(&self) -> Arc>> { + self.signer.clone() + } + + /// Get a shared reference to the payload cache for use by the P2P reveal handler. + pub fn shared_payload_cache(&self) -> Arc>> { + self.payload_cache.clone() + } + + /// Notify the provider of a new built block. + /// + /// This should be called by the block building pipeline whenever a new + /// block is produced. The provider will track the best block for each + /// slot/parent combination. + pub fn on_new_block(&self, slot: u64, parent_hash: BlockHash, block: Block) { + let key = SlotParentKey { slot, parent_hash }; + + let cached = CachedBlockData { + block: block.clone(), + cached_at: Instant::now(), + slot, + }; + + let mut best_blocks = self.best_blocks.write(); + + // Check if this block is better than the current best + let should_update = match best_blocks.get(&key) { + Some(existing) => block.trace.bid_value > existing.block.trace.bid_value, + None => true, + }; + + if should_update { + info!( + slot, + ?parent_hash, + block_hash = ?block.sealed_block.hash(), + bid_value = %block.trace.bid_value, + cached_blocks = best_blocks.len() + 1, + "EPBS: Cached new best block for slot" + ); + best_blocks.insert(key, cached); + } + + // Cleanup old entries if we are over the limit + if best_blocks.len() > self.config.max_cached_blocks { + let now = Instant::now(); + best_blocks.retain(|_, v| { + now.duration_since(v.cached_at).as_millis() < self.config.max_block_age_ms as u128 + }); + } + } + + /// Get the best block for a given slot/parent combination. + pub fn get_best_block(&self, params: &GetBidParams) -> Option { + let key = SlotParentKey::from_params(params); + let best_blocks = self.best_blocks.read(); + best_blocks.get(&key).cloned() + } + + /// Convert a Block to an ExecutionPayloadBid. + fn block_to_bid( + block: &Block, + params: &GetBidParams, + builder_index: u64, + blob_kzg_commitments: Vec, + ) -> ExecutionPayloadBid { + // bid_value is in wei, we need gwei + let value_gwei = (block.trace.bid_value / U256::from(1_000_000_000u64)) + .try_into() + .unwrap_or(u64::MAX); + + ExecutionPayloadBid { + parent_block_hash: params.parent_hash, + parent_block_root: params.parent_root, + block_hash: block.sealed_block.hash(), + prev_randao: block.sealed_block.mix_hash, + fee_recipient: params.fee_recipient, + gas_limit: block.sealed_block.gas_limit, + builder_index, + slot: params.slot, + value: value_gwei, + execution_payment: 0, // In protocol payment + blob_kzg_commitments, + } + } + + // TODO: review implementation + /// Convert execution requests from EIP-7685 typed format to separated lists. + /// + /// EIP-7685 execution requests are prefixed with a type byte: + /// - 0x00: Deposit requests + /// - 0x01: Withdrawal requests + /// - 0x02: Consolidation requests + fn convert_execution_requests(requests: &[Bytes]) -> ExecutionRequests { + let mut deposits = Vec::new(); + let mut withdrawals = Vec::new(); + let mut consolidations = Vec::new(); + + for request in requests { + if request.is_empty() { + continue; + } + + let request_type = request[0]; + let request_data = Bytes::copy_from_slice(&request[1..]); + + match request_type { + 0x00 => deposits.push(request_data), + 0x01 => withdrawals.push(request_data), + 0x02 => consolidations.push(request_data), + _ => { + // Unknown request type - skip it + debug!(request_type, "Unknown execution request type, skipping"); + } + } + } + + ExecutionRequests { + deposits, + withdrawals, + consolidations, + } + } + + /// Cache the payload for later revelation. + fn cache_payload(&self, signed_bid: &SignedExecutionPayloadBid, block: &Block) { + let block_hash = signed_bid.message.block_hash; + + // Convert block to ExecutionPayloadV3 + let payload = self.block_to_execution_payload(block); + + // Extract blob commitments + let blob_kzg_commitments = self.extract_blob_commitments(block); + + let execution_requests = Self::convert_execution_requests(&block.execution_requests); + + let cached = CachedPayloadData::new( + signed_bid.clone(), + payload, + execution_requests, + blob_kzg_commitments, + ); + + self.payload_cache.write().insert(block_hash, cached); + + debug!(?block_hash, "Cached payload for revelation"); + } + + /// Convert a Block to ExecutionPayloadV3. + /// + /// TODO: Use proper conversion from rbuilder-primitives when available. + fn block_to_execution_payload(&self, block: &Block) -> ExecutionPayloadV3 { + let sealed = &block.sealed_block; + + // Extract transactions as raw bytes + let transactions: Vec = sealed + .body() + .transactions + .iter() + .map(|tx| { + let mut buf = Vec::new(); + alloy_eips::eip2718::Encodable2718::encode_2718(tx, &mut buf); + Bytes::from(buf) + }) + .collect(); + + // Extract withdrawals + let withdrawals = sealed + .body() + .withdrawals + .as_ref() + .map(|w| w.to_vec()) + .unwrap_or_default(); + + ExecutionPayloadV3 { + payload_inner: ExecutionPayloadV2 { + payload_inner: ExecutionPayloadV1 { + parent_hash: sealed.parent_hash, + fee_recipient: sealed.beneficiary, + state_root: sealed.state_root, + receipts_root: sealed.receipts_root, + logs_bloom: sealed.logs_bloom, + prev_randao: sealed.mix_hash, + block_number: sealed.number, + gas_limit: sealed.gas_limit, + gas_used: sealed.gas_used, + timestamp: sealed.timestamp, + extra_data: sealed.extra_data.clone(), + base_fee_per_gas: U256::from(sealed.base_fee_per_gas.unwrap_or_default()), + block_hash: sealed.hash(), + transactions, + }, + withdrawals, + }, + blob_gas_used: sealed.blob_gas_used.unwrap_or_default(), + excess_blob_gas: sealed.excess_blob_gas.unwrap_or_default(), + } + } + + /// Extract blob KZG commitments from a block. + fn extract_blob_commitments(&self, block: &Block) -> Vec { + let mut commitments = Vec::new(); + + for sidecar in &block.txs_blobs_sidecars { + match sidecar.as_ref() { + alloy_eips::eip7594::BlobTransactionSidecarVariant::Eip4844(s) => { + for commitment in &s.commitments { + commitments.push(alloy_primitives::Bytes::copy_from_slice( + commitment.as_slice(), + )); + } + } + alloy_eips::eip7594::BlobTransactionSidecarVariant::Eip7594(s) => { + for commitment in &s.commitments { + commitments.push(alloy_primitives::Bytes::copy_from_slice( + commitment.as_slice(), + )); + } + } + } + } + + commitments + } + + /// Get a cached payload by block hash. + pub fn get_cached_payload(&self, block_hash: &BlockHash) -> Option { + self.payload_cache.read().get(block_hash).cloned() + } + + /// Cleanup stale cache entries. + pub fn cleanup(&self) { + let now = Instant::now(); + let max_age_ms = self.config.max_block_age_ms as u128; + + self.best_blocks + .write() + .retain(|_, v| now.duration_since(v.cached_at).as_millis() < max_age_ms); + + self.payload_cache.write().retain(|_, v| { + v.created_at.elapsed().as_millis() < max_age_ms * 2 // Keep payloads longer + }); + } +} + +impl std::fmt::Debug for LiveEpbsBidProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LiveEpbsBidProvider") + .field("config", &self.config) + .field("builder_index", &self.builder_index()) + .field("signer_ready", &self.is_ready()) + .field("cached_blocks", &self.best_blocks.read().len()) + .field("cached_payloads", &self.payload_cache.read().len()) + .finish() + } +} + +/// Implement BlockObserver so that the block building pipeline can notify us of new blocks. +impl BlockObserver for LiveEpbsBidProvider { + fn on_block_built(&self, slot: u64, parent_hash: BlockHash, block: &Block) { + self.on_new_block(slot, parent_hash, block.clone()); + } +} + +#[async_trait::async_trait] +impl EpbsBidProvider for LiveEpbsBidProvider { + async fn generate_bid( + &self, + params: &GetBidParams, + ) -> eyre::Result> { + // Check if signer is ready + let signer = self.signer.read(); + let signer = match signer.as_ref() { + Some(s) => s, + None => { + debug!( + slot = params.slot, + "EPBS signer not yet initialized, cannot generate bid" + ); + return Ok(None); + } + }; + + // Get the best block for this slot/parent + let cached_block = match self.get_best_block(params) { + Some(cached) => cached, + None => { + trace!( + slot = params.slot, + ?params.parent_hash, + "No block available for bid request" + ); + return Ok(None); + } + }; + + // Check if the block is too old + let age_ms = cached_block.cached_at.elapsed().as_millis(); + if age_ms > self.config.max_block_age_ms as u128 { + debug!( + slot = params.slot, + age_ms, "Best block is stale, not returning bid" + ); + return Ok(None); + } + + // extract blob commitments + let blob_kzg_commitments = self.extract_blob_commitments(&cached_block.block); + + // Create the bid + let bid = Self::block_to_bid( + &cached_block.block, + params, + signer.builder_index(), + blob_kzg_commitments, + ); + + // Sign the bid + let signed_bid = signer.sign_bid(&bid)?; + + info!( + slot = params.slot, + block_hash = ?signed_bid.message.block_hash, + value = signed_bid.message.value, + "Generated EPBS bid" + ); + + // Cache the payload for later revelation + self.cache_payload(&signed_bid, &cached_block.block); + + Ok(Some(signed_bid)) + } +} diff --git a/crates/rbuilder/src/live_builder/builder_api/handlers.rs b/crates/rbuilder/src/live_builder/builder_api/handlers.rs new file mode 100644 index 000000000..fa27fdbb2 --- /dev/null +++ b/crates/rbuilder/src/live_builder/builder_api/handlers.rs @@ -0,0 +1,179 @@ +use alloy_primitives::{Address, B256}; +use axum::{ + extract::{Path, State}, + http::{HeaderMap, StatusCode}, + response::IntoResponse, + Json, +}; +use rbuilder_primitives::epbs::{GetBidParams, GetExecutionPayloadBidResponse}; +use std::sync::Arc; +use tracing::{error, info, trace}; + +use super::EpbsBuilderState; + +/// GET /eth/v1/builder/execution_payload_bid/{slot}/{parent_hash}/{parent_root}/{proposer_index} +/// +/// returns a SignedExecutionPayloadBid for the given slot. +pub async fn get_execution_payload_bid_handler( + State(state): State>, + Path((slot, parent_hash, parent_root, proposer_index)): Path<(u64, String, String, u64)>, + headers: HeaderMap, +) -> Result { + // Parse path parameters + let parent_hash = + parse_hash(&parent_hash).map_err(|_| GetExecutionPayloadBidError::InvalidParentHash)?; + let parent_root = + parse_hash(&parent_root).map_err(|_| GetExecutionPayloadBidError::InvalidParentRoot)?; + + // Parse headers + let fee_recipient = parse_fee_recipient(&headers)?; + let timeout_ms = parse_timeout_ms(&headers); + let date_milliseconds = parse_date_milliseconds(&headers); + + let params = GetBidParams { + slot, + parent_hash: parent_hash.into(), + parent_root, + proposer_index, + fee_recipient, + timeout_ms, + date_milliseconds, + }; + + trace!( + slot = params.slot, + proposer_index = params.proposer_index, + ?params.parent_hash, + ?params.fee_recipient, + "Received get_execution_payload_bid request" + ); + + // Get the best bid from the builder + match state.get_execution_payload_bid(¶ms).await { + Ok(Some(signed_bid)) => { + info!( + slot = params.slot, + block_hash = ?signed_bid.message.block_hash, + value = signed_bid.message.value, + "Returning execution payload bid" + ); + + let response = GetExecutionPayloadBidResponse { + version: "gloas".to_string(), + data: signed_bid, + }; + + Ok(( + StatusCode::OK, + [("Eth-Consensus-Version", "gloas")], + Json(response), + )) + } + Ok(None) => { + trace!(slot = params.slot, "No execution payload bid available"); + Err(GetExecutionPayloadBidError::NoBidAvailable) + } + Err(e) => { + error!(slot = params.slot, error = ?e, "Error generating execution payload bid"); + Err(GetExecutionPayloadBidError::InternalError(e.to_string())) + } + } +} + +/// Error type for get_execution_payload_bid handler. +#[derive(Debug)] +pub enum GetExecutionPayloadBidError { + InvalidParentHash, + InvalidParentRoot, + InvalidFeeRecipient, + MissingFeeRecipient, + NoBidAvailable, + InternalError(String), +} + +impl IntoResponse for GetExecutionPayloadBidError { + fn into_response(self) -> axum::response::Response { + let (status, message) = match self { + GetExecutionPayloadBidError::InvalidParentHash => { + (StatusCode::BAD_REQUEST, "Invalid parent_hash".to_string()) + } + GetExecutionPayloadBidError::InvalidParentRoot => { + (StatusCode::BAD_REQUEST, "Invalid parent_root".to_string()) + } + GetExecutionPayloadBidError::InvalidFeeRecipient => ( + StatusCode::BAD_REQUEST, + "Invalid X-Fee-Recipient header".to_string(), + ), + GetExecutionPayloadBidError::MissingFeeRecipient => ( + StatusCode::BAD_REQUEST, + "Missing required X-Fee-Recipient header".to_string(), + ), + GetExecutionPayloadBidError::NoBidAvailable => { + // Per spec, return 204 No Content when no bid is available + return StatusCode::NO_CONTENT.into_response(); + } + GetExecutionPayloadBidError::InternalError(msg) => { + (StatusCode::INTERNAL_SERVER_ERROR, msg) + } + }; + + let body = serde_json::json!({ + "code": status.as_u16(), + "message": message + }); + + (status, Json(body)).into_response() + } +} + +// Helper functions for parsing request parameters + +fn parse_hash(s: &str) -> Result { + // Strip 0x prefix if present + let s = s.strip_prefix("0x").unwrap_or(s); + let bytes = hex::decode(s).map_err(|_| ())?; + if bytes.len() != 32 { + return Err(()); + } + let mut arr = [0u8; 32]; + arr.copy_from_slice(&bytes); + Ok(B256::from(arr)) +} + +fn parse_fee_recipient(headers: &HeaderMap) -> Result { + let header_value = headers + .get("X-Fee-Recipient") + .ok_or(GetExecutionPayloadBidError::MissingFeeRecipient)? + .to_str() + .map_err(|_| GetExecutionPayloadBidError::InvalidFeeRecipient)?; + + let s = header_value.strip_prefix("0x").unwrap_or(header_value); + let bytes = hex::decode(s).map_err(|_| GetExecutionPayloadBidError::InvalidFeeRecipient)?; + if bytes.len() != 20 { + return Err(GetExecutionPayloadBidError::InvalidFeeRecipient); + } + let mut arr = [0u8; 20]; + arr.copy_from_slice(&bytes); + Ok(Address::from(arr)) +} + +fn parse_timeout_ms(headers: &HeaderMap) -> Option { + headers + .get("X-Timeout-Ms") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse().ok()) +} + +fn parse_date_milliseconds(headers: &HeaderMap) -> Option { + headers + .get("Date-Milliseconds") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse().ok()) +} + +/// GET /eth/v1/builder/status +/// +/// Returns 200 OK if the builder is healthy. +pub async fn status_handler() -> StatusCode { + StatusCode::OK +} diff --git a/crates/rbuilder/src/live_builder/builder_api/mod.rs b/crates/rbuilder/src/live_builder/builder_api/mod.rs new file mode 100644 index 000000000..24d427cf8 --- /dev/null +++ b/crates/rbuilder/src/live_builder/builder_api/mod.rs @@ -0,0 +1,13 @@ +mod bid_provider; +mod handlers; +pub mod p2p; +mod server; + +pub use bid_provider::{ + CachedBlockData, LiveEpbsBidProvider, LiveEpbsBidProviderConfig, SlotParentKey, +}; +pub use handlers::{get_execution_payload_bid_handler, GetExecutionPayloadBidError}; +pub use p2p::{EpbsP2PConfig, EpbsP2PService}; +pub use server::{ + EpbsBidProvider, EpbsBuilderServer, EpbsBuilderServerConfig, EpbsBuilderState, +}; diff --git a/crates/rbuilder/src/live_builder/builder_api/p2p/bid_tracker.rs b/crates/rbuilder/src/live_builder/builder_api/p2p/bid_tracker.rs new file mode 100644 index 000000000..17ede6ece --- /dev/null +++ b/crates/rbuilder/src/live_builder/builder_api/p2p/bid_tracker.rs @@ -0,0 +1,202 @@ +use alloy_primitives::{BlockHash, B256}; +use parking_lot::RwLock; +use rbuilder_primitives::epbs::SignedExecutionPayloadBid; +use std::collections::HashMap; +use tracing::debug; + +// TODO: revisit key construction +/// Key for tracking bids +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct BidKey { + slot: u64, + parent_block_hash: BlockHash, + parent_block_root: B256, +} + +impl BidKey { + fn from_bid(bid: &rbuilder_primitives::epbs::ExecutionPayloadBid) -> Self { + Self { + slot: bid.slot, + parent_block_hash: bid.parent_block_hash, + parent_block_root: bid.parent_block_root, + } + } +} + +/// Tracks competing bids and our own bids per slot from execution_payload_bid SSE to[ic] +#[derive(Debug)] +pub struct BidTracker { + /// Highest bid seen per (slot, parent_hash, parent_root). + highest_bids: RwLock>, + /// Our own submitted bids: slot -> latest bid. + our_bids: RwLock>, + // TODO: maybe we can remove this but keep for now + /// Our builder index for identifying our own bids. + our_builder_index: u64, +} + +impl BidTracker { + pub fn new(our_builder_index: u64) -> Self { + Self { + highest_bids: RwLock::new(HashMap::new()), + our_bids: RwLock::new(HashMap::new()), + our_builder_index, + } + } + + /// Process an incoming bid from the SSE stream. + /// Returns true if this bid is the new highest for its key. + pub fn on_bid_received(&self, bid: &SignedExecutionPayloadBid) -> bool { + let key = BidKey::from_bid(&bid.message); + let mut highest = self.highest_bids.write(); + + let is_new_highest = match highest.get(&key) { + Some(existing) => bid.message.value > existing.message.value, + None => true, + }; + + if is_new_highest { + debug!( + slot = bid.message.slot, + builder_index = bid.message.builder_index, + value = bid.message.value, + "New highest bid received" + ); + highest.insert(key, bid.clone()); + } + + is_new_highest + } + + /// Record a bid we submitted. + pub fn on_bid_submitted(&self, bid: &SignedExecutionPayloadBid) { + let slot = bid.message.slot; + self.our_bids.write().insert(slot, bid.clone()); + // Also track it as potentially the highest + self.on_bid_received(bid); + } + + /// Get the highest competing bid for a given slot/parent combination. + pub fn highest_bid( + &self, + slot: u64, + parent_hash: &BlockHash, + parent_root: &B256, + ) -> Option { + let key = BidKey { + slot, + parent_block_hash: *parent_hash, + parent_block_root: *parent_root, + }; + self.highest_bids.read().get(&key).cloned() + } + + /// Get our latest submitted bid for a slot. + pub fn our_bid(&self, slot: u64) -> Option { + self.our_bids.read().get(&slot).cloned() + } + + /// Check if we are currently the highest bidder for a slot/parent. + pub fn are_we_winning( + &self, + slot: u64, + parent_hash: &BlockHash, + parent_root: &B256, + ) -> bool { + let key = BidKey { + slot, + parent_block_hash: *parent_hash, + parent_block_root: *parent_root, + }; + self.highest_bids + .read() + .get(&key) + .map(|bid| bid.message.builder_index == self.our_builder_index) + .unwrap_or(false) + } + + /// Clean up entries older than current_slot. + pub fn cleanup(&self, current_slot: u64) { + self.highest_bids + .write() + .retain(|key, _| key.slot >= current_slot.saturating_sub(2)); + self.our_bids + .write() + .retain(|&slot, _| slot >= current_slot.saturating_sub(2)); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_primitives::Address; + use alloy_rpc_types_beacon::BlsSignature; + + fn make_bid(slot: u64, builder_index: u64, value: u64) -> SignedExecutionPayloadBid { + SignedExecutionPayloadBid { + message: rbuilder_primitives::epbs::ExecutionPayloadBid { + parent_block_hash: BlockHash::ZERO, + parent_block_root: B256::ZERO, + block_hash: BlockHash::ZERO, + prev_randao: B256::ZERO, + fee_recipient: Address::ZERO, + gas_limit: 30_000_000, + builder_index, + slot, + value, + execution_payment: 0, + blob_kzg_commitments: vec![], + }, + signature: BlsSignature::default(), + } + } + + #[test] + fn test_bid_tracking() { + let tracker = BidTracker::new(1); + + // 1st bid from builder 2 + let bid1 = make_bid(100, 2, 1000); + assert!(tracker.on_bid_received(&bid1)); + + // higher bid from builder 3 + let bid2 = make_bid(100, 3, 2000); + assert!(tracker.on_bid_received(&bid2)); + + // lower bid should not replace + let bid3 = make_bid(100, 4, 500); + assert!(!tracker.on_bid_received(&bid3)); + + let highest = tracker + .highest_bid(100, &BlockHash::ZERO, &B256::ZERO) + .unwrap(); + assert_eq!(highest.message.value, 2000); + } + + #[test] + fn test_our_bid_tracking() { + let tracker = BidTracker::new(1); + + let our_bid = make_bid(100, 1, 1500); + tracker.on_bid_submitted(&our_bid); + + assert!(tracker.our_bid(100).is_some()); + assert!(tracker.are_we_winning(100, &BlockHash::ZERO, &B256::ZERO)); + + // someone outbided us + let higher_bid = make_bid(100, 2, 2000); + tracker.on_bid_received(&higher_bid); + assert!(!tracker.are_we_winning(100, &BlockHash::ZERO, &B256::ZERO)); + } + + #[test] + fn test_cleanup() { + let tracker = BidTracker::new(1); + tracker.on_bid_submitted(&make_bid(10, 1, 100)); + tracker.on_bid_submitted(&make_bid(20, 1, 200)); + + tracker.cleanup(20); + assert!(tracker.our_bid(10).is_none()); + assert!(tracker.our_bid(20).is_some()); + } +} diff --git a/crates/rbuilder/src/live_builder/builder_api/p2p/mod.rs b/crates/rbuilder/src/live_builder/builder_api/p2p/mod.rs new file mode 100644 index 000000000..892205380 --- /dev/null +++ b/crates/rbuilder/src/live_builder/builder_api/p2p/mod.rs @@ -0,0 +1,9 @@ +pub mod bid_tracker; +pub mod proposer_prefs; +pub mod reveal_handler; +pub mod scheduler; +pub mod service; +pub mod types; + +pub use service::EpbsP2PService; +pub use types::EpbsP2PConfig; diff --git a/crates/rbuilder/src/live_builder/builder_api/p2p/proposer_prefs.rs b/crates/rbuilder/src/live_builder/builder_api/p2p/proposer_prefs.rs new file mode 100644 index 000000000..25b20bfb3 --- /dev/null +++ b/crates/rbuilder/src/live_builder/builder_api/p2p/proposer_prefs.rs @@ -0,0 +1,86 @@ +use parking_lot::RwLock; +use rbuilder_primitives::epbs::ProposerPreferences; +use std::collections::HashMap; +use tracing::debug; + +/// Cache of proposer preferences keyed by proposal slot. +#[derive(Debug)] +pub struct ProposerPreferencesCache { + /// slot -> ProposerPreferences mapping. + prefs: RwLock>, +} + +impl ProposerPreferencesCache { + pub fn new() -> Self { + Self { + prefs: RwLock::new(HashMap::new()), + } + } + + /// Insert or update preferences for a slot. + pub fn insert(&self, prefs: ProposerPreferences) { + let slot = prefs.proposal_slot; + debug!( + slot, + validator_index = prefs.validator_index, + fee_recipient = %prefs.fee_recipient, + gas_limit = prefs.gas_limit, + "Cached proposer preferences" + ); + self.prefs.write().insert(slot, prefs); + } + + pub fn get(&self, slot: u64) -> Option { + self.prefs.read().get(&slot).cloned() + } + + /// cleanup up preferences older than current_slot - max_age_slots. + pub fn cleanup(&self, current_slot: u64, max_age_slots: u64) { + let cutoff = current_slot.saturating_sub(max_age_slots); + self.prefs.write().retain(|&slot, _| slot >= cutoff); + } + + pub fn len(&self) -> usize { + self.prefs.read().len() + } + + pub fn is_empty(&self) -> bool { + self.prefs.read().is_empty() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_primitives::Address; + + fn make_prefs(slot: u64) -> ProposerPreferences { + ProposerPreferences { + proposal_slot: slot, + validator_index: 42, + fee_recipient: Address::ZERO, + gas_limit: 30_000_000, + } + } + + #[test] + fn test_insert_and_get() { + let cache = ProposerPreferencesCache::new(); + cache.insert(make_prefs(100)); + assert!(cache.get(100).is_some()); + assert!(cache.get(101).is_none()); + } + + #[test] + fn test_cleanup() { + let cache = ProposerPreferencesCache::new(); + cache.insert(make_prefs(10)); + cache.insert(make_prefs(20)); + cache.insert(make_prefs(30)); + cache.cleanup(30, 15); + // slot 10 is < 30 - 15 = 15, should be removed + assert!(cache.get(10).is_none()); + assert!(cache.get(20).is_some()); + assert!(cache.get(30).is_some()); + } +} diff --git a/crates/rbuilder/src/live_builder/builder_api/p2p/reveal_handler.rs b/crates/rbuilder/src/live_builder/builder_api/p2p/reveal_handler.rs new file mode 100644 index 000000000..b15e4ec7b --- /dev/null +++ b/crates/rbuilder/src/live_builder/builder_api/p2p/reveal_handler.rs @@ -0,0 +1,148 @@ +use alloy_primitives::{BlockHash, B256}; +use parking_lot::RwLock; +use rbuilder_primitives::epbs::{CachedPayloadData, SignedExecutionPayloadBid}; +use std::{collections::HashMap, sync::Arc}; +use tracing::{info, warn}; + +use crate::{beacon_api_client::Client, mev_boost::EpbsBidSigner}; + +/// Handles payload revelation after bid inclusion in a beacon block. +pub struct RevealHandler { + /// Beacon api client for constructing and submitting envelopes. + beacon_client: Client, + /// Shared signer for envelope signing. + signer: Arc>>, + /// Shared payload cache from the bid provider. + payload_cache: Arc>>, +} + +impl RevealHandler { + pub fn new( + beacon_client: Client, + signer: Arc>>, + payload_cache: Arc>>, + ) -> Self { + Self { + beacon_client, + signer, + payload_cache, + } + } + + /// Called when a head event indicates our bid was included in a beacon block. + pub async fn on_bid_included( + &self, + slot: u64, + beacon_block_root: B256, + included_bid: &SignedExecutionPayloadBid, + ) -> eyre::Result<()> { + let block_hash = included_bid.message.block_hash; + + info!( + slot, + ?block_hash, + ?beacon_block_root, + builder_index = included_bid.message.builder_index, + "Our bid was included, starting payload reveal" + ); + + // 1. look up the cached payload + let cached = self + .payload_cache + .read() + .get(&block_hash) + .cloned() + .ok_or_else(|| { + eyre::eyre!( + "No cached payload found for block_hash {:?} at slot {}", + block_hash, + slot + ) + })?; + + // 2. construct envelope via beacon node + // retrying up to 3 times before giving up. + //(TODO: check if the PR gets merged which computes state_root). + let envelope = { + let mut result = None; + for attempt in 1..=3 { + match self + .beacon_client + .construct_execution_payload_envelope( + beacon_block_root, + &cached.payload, + &cached.execution_requests, + ) + .await + { + Ok(envelope) => { + info!( + slot, + ?block_hash, + state_root = ?envelope.state_root, + "Beacon node constructed envelope with state_root" + ); + result = Some(envelope); + break; + } + Err(e) => { + warn!( + slot, + ?block_hash, + attempt, + error = %e, + "Failed to construct envelope via beacon node, retrying..." + ); + if attempt < 3 { + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } + } + } + } + result.ok_or_else(|| { + eyre::eyre!( + "Failed to construct envelope after 3 attempts for slot {}. \ + Cannot reveal without valid state_root.", + slot, + ) + })? + }; + + // 3. sign the envelope + let signed_envelope = { + let signer_guard = self.signer.read(); + let signer = signer_guard + .as_ref() + .ok_or_else(|| eyre::eyre!("Signer not initialized"))?; + signer.sign_envelope(&envelope)? + }; + + info!( + slot, + ?block_hash, + ?beacon_block_root, + "Submitting signed execution payload envelope" + ); + + // 4. submit to p2p via beacon api + self.beacon_client + .submit_execution_payload_envelope(&signed_envelope) + .await?; + + info!( + slot, + ?block_hash, + "Successfully submitted execution payload envelope" + ); + + Ok(()) + } +} + +impl std::fmt::Debug for RevealHandler { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RevealHandler") + .field("beacon_client", &self.beacon_client) + .finish() + } +} diff --git a/crates/rbuilder/src/live_builder/builder_api/p2p/scheduler.rs b/crates/rbuilder/src/live_builder/builder_api/p2p/scheduler.rs new file mode 100644 index 000000000..408ec5e07 --- /dev/null +++ b/crates/rbuilder/src/live_builder/builder_api/p2p/scheduler.rs @@ -0,0 +1,170 @@ +use super::types::EpbsP2PConfig; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +/// Manages bid timing within slots. +/// Manages time-based bid submission within slot boundaries. +/// Supports two modes: +/// interval mode: resubmit bids at regular intervals within the bidding window +/// single bid mode: submit once when payload is ready +#[derive(Debug, Clone)] +pub struct BidScheduler { + /// Genesis time in seconds since Unix epoch. + genesis_time: u64, + /// Slot duration in seconds. + seconds_per_slot: u64, + /// MS into slot to start bidding. + bid_start_ms: u64, + /// MS into slot to stop bidding. + bid_end_ms: u64, + /// Interval between bid resubmissions, 0 = single bid mode. + bid_interval_ms: u64, +} + +impl BidScheduler { + pub fn new(config: &EpbsP2PConfig) -> Self { + Self { + genesis_time: config.genesis_time, + seconds_per_slot: config.seconds_per_slot, + bid_start_ms: config.bid_start_ms, + bid_end_ms: config.bid_end_ms, + bid_interval_ms: config.bid_interval_ms, + } + } + + /// Returns the slot start time as seconds since unix epoch. + pub fn slot_start_time(&self, slot: u64) -> u64 { + self.genesis_time + slot * self.seconds_per_slot + } + + /// Returns ms elapsed since the start of the given slot. + /// Returns None if the slot hasn't started yet. + pub fn ms_into_slot(&self, slot: u64) -> Option { + let slot_start = self.slot_start_time(slot); + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default(); + let now_ms = now.as_millis() as u64; + let slot_start_ms = slot_start * 1000; + + if now_ms >= slot_start_ms { + Some(now_ms - slot_start_ms) + } else { + None + } + } + + /// Returns true if we are currently within the biding window for the given slot. + pub fn is_in_bidding_window(&self, slot: u64) -> bool { + match self.ms_into_slot(slot) { + Some(ms) => ms >= self.bid_start_ms && ms < self.bid_end_ms, + None => false, + } + } + + /// Returns the duration until the bidding window opens for the given slot. + /// Returns Duration::ZERO if the window is already open. + /// Returns None if the bidding window has already closed. + pub fn time_until_bid_start(&self, slot: u64) -> Option { + let slot_start_ms = self.slot_start_time(slot) * 1000; + let bid_open_ms = slot_start_ms + self.bid_start_ms; + let bid_close_ms = slot_start_ms + self.bid_end_ms; + + let now_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + + if now_ms >= bid_close_ms { + // window already closed + return None; + } + + if now_ms >= bid_open_ms { + // window already open + return Some(Duration::ZERO); + } + + Some(Duration::from_millis(bid_open_ms - now_ms)) + } + + /// Returns the duration until the bidding window closes for the given slot. + /// Returns None if the window has already closed. + pub fn time_until_bid_end(&self, slot: u64) -> Option { + let slot_start_ms = self.slot_start_time(slot) * 1000; + let bid_close_ms = slot_start_ms + self.bid_end_ms; + + let now_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + + if now_ms >= bid_close_ms { + return None; + } + + Some(Duration::from_millis(bid_close_ms - now_ms)) + } + + /// Whether this scheduler is in single bid mode i.e no resubmission in the slot. + pub fn is_single_bid_mode(&self) -> bool { + self.bid_interval_ms == 0 + } + + /// Returns the bid resubmission interval or none in single bid mode. + pub fn bid_interval(&self) -> Option { + if self.bid_interval_ms == 0 { + None + } else { + Some(Duration::from_millis(self.bid_interval_ms)) + } + } + + /// Compute the current slot from wall clock time. + pub fn current_slot(&self) -> u64 { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + if now < self.genesis_time { + return 0; + } + (now - self.genesis_time) / self.seconds_per_slot + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_scheduler(genesis_time: u64) -> BidScheduler { + BidScheduler { + genesis_time, + seconds_per_slot: 12, + bid_start_ms: 0, + bid_end_ms: 4000, + bid_interval_ms: 500, + } + } + + #[test] + fn test_slot_start_time() { + let s = make_scheduler(1_000_000); + assert_eq!(s.slot_start_time(0), 1_000_000); + assert_eq!(s.slot_start_time(1), 1_000_012); + assert_eq!(s.slot_start_time(100), 1_001_200); + } + + #[test] + fn test_single_bid_mode() { + let mut s = make_scheduler(0); + assert!(!s.is_single_bid_mode()); + s.bid_interval_ms = 0; + assert!(s.is_single_bid_mode()); + } + + #[test] + fn test_bid_interval() { + let s = make_scheduler(0); + assert_eq!(s.bid_interval(), Some(Duration::from_millis(500))); + } +} diff --git a/crates/rbuilder/src/live_builder/builder_api/p2p/service.rs b/crates/rbuilder/src/live_builder/builder_api/p2p/service.rs new file mode 100644 index 000000000..9d469a731 --- /dev/null +++ b/crates/rbuilder/src/live_builder/builder_api/p2p/service.rs @@ -0,0 +1,531 @@ +//! Main P2P ePBS builder service orchestrator. +//! +//! This service coordinates the full P2P builder flow: +//! 1. Subscribes to SSE events from the beacon node (head, bids, proposer preferences) +//! 2. Submits bids to the beacon node for P2P gossip on a schedule +//! 3. Monitors for bid inclusion in beacon blocks +//! 4. Triggers payload envelope revelation after bid inclusion + +use super::{ + bid_tracker::BidTracker, + proposer_prefs::ProposerPreferencesCache, + reveal_handler::RevealHandler, + scheduler::BidScheduler, + types::EpbsP2PConfig, +}; +use crate::{ + beacon_api_client::{ + Client, ExecutionPayloadBidTopic, HeadEvent, HeadTopic, ProposerPreferencesTopic, + }, + live_builder::builder_api::{EpbsBidProvider, LiveEpbsBidProvider}, + mev_boost::EpbsBidSigner, +}; +use alloy_primitives::{BlockHash, B256}; +use futures::StreamExt; +use parking_lot::RwLock; +use rbuilder_primitives::epbs::{GetBidParams, SignedExecutionPayloadBid}; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, info, warn}; + +/// Main p2p epbs builder service. +/// This service coordinates the full P2P builder flow: +/// Noting all the duties it currently takes care of +/// 1. Subscribes to SSE events from the beacon node (head, bids, proposer preferences) +/// 2. Submits bids to the beacon node for P2P gossip on a schedule +/// 3. Monitors for bid inclusion in beacon blocks +/// 4. Triggers payload envelope revelation after bid inclusion +pub struct EpbsP2PService { + config: EpbsP2PConfig, + beacon_client: Client, + bid_provider: Arc, + signer: Arc>>, + payload_cache: Arc>>, +} + +impl EpbsP2PService { + pub fn new( + config: EpbsP2PConfig, + beacon_client: Client, + bid_provider: Arc, + signer: Arc>>, + payload_cache: Arc< + RwLock>, + >, + ) -> Self { + Self { + config, + beacon_client, + bid_provider, + signer, + payload_cache, + } + } + + /// Run the P2P builder service. + /// + /// This spawns SSE listener tasks and runs the main event loop until cancelled. + pub async fn run(self, cancel: CancellationToken) -> eyre::Result<()> { + info!("Starting EPBS P2P builder service"); + + loop { + if cancel.is_cancelled() { + return Ok(()); + } + if self.signer.read().is_some() { + break; + } + debug!("Waiting for EPBS signer initialization..."); + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + } + + let builder_index = self + .signer + .read() + .as_ref() + .map(|s| s.builder_index()) + .unwrap_or(0); + + info!(builder_index, "EPBS P2P service signer ready"); + + let scheduler = BidScheduler::new(&self.config); + let bid_tracker = Arc::new(BidTracker::new(builder_index)); + let prefs_cache = Arc::new(ProposerPreferencesCache::new()); + let reveal_handler = RevealHandler::new( + self.beacon_client.clone(), + self.signer.clone(), + self.payload_cache.clone(), + ); + + // channel for head events from sse listener + // TODO: spinning unbounded channel for now. revist again. + let (head_tx, mut head_rx) = mpsc::unbounded_channel::(); + // channel for bid events from sse listener + // TODO: spinning unbounded channel for now. revist again. + let (bid_tx, mut bid_rx) = mpsc::unbounded_channel::(); + + // spawning the sse listeners for head and bids + let head_handle = { + let client = self.beacon_client.clone(); + let cancel = cancel.clone(); + let tx = head_tx; + tokio::spawn(async move { + Self::run_head_listener(client, tx, cancel).await; + }) + }; + + let bid_handle = { + let client = self.beacon_client.clone(); + let cancel = cancel.clone(); + let tx = bid_tx; + tokio::spawn(async move { + Self::run_bid_listener(client, tx, cancel).await; + }) + }; + + let prefs_handle = { + let client = self.beacon_client.clone(); + let cancel = cancel.clone(); + let cache = prefs_cache.clone(); + tokio::spawn(async move { + Self::run_prefs_listener(client, cache, cancel).await; + }) + }; + + // main event loop + let mut current_slot = scheduler.current_slot(); + let mut bid_interval = self.create_bid_interval(); + + info!(current_slot, "EPBS P2P service entering main loop"); + + loop { + tokio::select! { + _ = cancel.cancelled() => { + info!("EPBS P2P service shutting down"); + break; + } + + // head event recevied, check if our bid was included + Some(head_event) = head_rx.recv() => { + let new_slot = head_event.slot; + if new_slot > current_slot { + debug!(old_slot = current_slot, new_slot, "New slot detected"); + current_slot = new_slot; + bid_tracker.cleanup(current_slot); + //TODO keeping for 2 epochs think again about it. + prefs_cache.cleanup(current_slot, 64); + } + + // check if our bid was included in this block + self.handle_head_event( + &head_event, + &bid_tracker, + &reveal_handler, + builder_index, + ).await; + } + + // competing bid received + Some(bid) = bid_rx.recv() => { + let is_new_highest = bid_tracker.on_bid_received(&bid); + if is_new_highest && bid.message.builder_index != builder_index { + debug!( + slot = bid.message.slot, + competing_value = bid.message.value, + competing_builder = bid.message.builder_index, + "Outbid by competing builder" + ); + } + } + + // submit/resubmit bid + _ = bid_interval.tick() => { + let target_slot = current_slot + 1; + if scheduler.is_in_bidding_window(target_slot) || scheduler.is_in_bidding_window(current_slot) { + let bid_slot = if scheduler.is_in_bidding_window(target_slot) { + target_slot + } else { + current_slot + }; + + if let Err(e) = self.submit_bid( + bid_slot, + &prefs_cache, + &bid_tracker, + ).await { + debug!(slot = bid_slot, error = %e, "Failed to submit bid"); + } + } + } + } + } + + // cleanup all + head_handle.abort(); + bid_handle.abort(); + prefs_handle.abort(); + + info!("EPBS P2P service stopped"); + Ok(()) + } + + /// Handle a head event by checking if our bid was included. + async fn handle_head_event( + &self, + head_event: &HeadEvent, + bid_tracker: &BidTracker, + reveal_handler: &RevealHandler, + builder_index: u64, + ) { + let slot = head_event.slot; + let block_root = head_event.block; + + // check if we had a bid for this slot + let our_bid = match bid_tracker.our_bid(slot) { + Some(bid) => bid, + None => return, // we didn't bid for this slot + }; + + // query the beacon node to see which bid was included + let block_root_hex = format!("0x{}", hex::encode(block_root)); + match self + .beacon_client + .get_beacon_block_bid(&block_root_hex) + .await + { + Ok(Some(included_bid)) => { + if included_bid.message.builder_index == builder_index + && included_bid.message.block_hash == our_bid.message.block_hash + { + info!( + slot, + ?block_root, + "Our bid was included in the beacon block, triggering reveal" + ); + + if let Err(e) = reveal_handler + .on_bid_included(slot, block_root, &included_bid) + .await + { + error!(slot, error = %e, "Failed to reveal payload"); + } + } else { + debug!( + slot, + included_builder = included_bid.message.builder_index, + "Different builder's bid was included" + ); + } + } + Ok(None) => { + debug!(slot, "No bid found in beacon block body"); + } + Err(e) => { + warn!(slot, error = %e, "Failed to query beacon block for bid"); + } + } + } + + /// Generate and submit a bid for the given slot. + async fn submit_bid( + &self, + slot: u64, + prefs_cache: &ProposerPreferencesCache, + bid_tracker: &BidTracker, + ) -> eyre::Result<()> { + // retireve proposer preferences for this slot, should be cached already + let prefs = prefs_cache.get(slot).ok_or_else(|| { + eyre::eyre!( + "No proposer preferences found for slot {}, skipping P2P bid", + slot + ) + })?; + + let params = GetBidParams { + slot, + // bid provider uses the best cached blocks parent_hash + parent_hash: BlockHash::ZERO, // this will be overridden by providers cached data + parent_root: B256::ZERO, // this will be overridden by providers cached data + proposer_index: prefs.validator_index, + fee_recipient: prefs.fee_recipient, + timeout_ms: None, + date_milliseconds: None, + }; + + // generate bid via the bid provider + let signed_bid = self + .bid_provider + .generate_bid(¶ms) + .await? + .ok_or_else(|| eyre::eyre!("No bid available for slot {}", slot))?; + + // validate P2P gossip rules per consensus-specs p2p rules + // [REJECT] execution_payment must be 0 for P2P gossip + if signed_bid.message.execution_payment != 0 { + return Err(eyre::eyre!( + "Bid has non-zero execution_payment ({}), cannot broadcast via P2P", + signed_bid.message.execution_payment + )); + } + + // [REJECT] fee_recipient must match proposer preferences + if signed_bid.message.fee_recipient != prefs.fee_recipient { + return Err(eyre::eyre!( + "Bid fee_recipient {:?} does not match proposer preferences {:?}", + signed_bid.message.fee_recipient, + prefs.fee_recipient + )); + } + + // [REJECT] gas_limit must match proposer preferences + if signed_bid.message.gas_limit != prefs.gas_limit { + return Err(eyre::eyre!( + "Bid gas_limit {} does not match proposer preferences {}", + signed_bid.message.gas_limit, + prefs.gas_limit + )); + } + + // [REJECT] blob_kzg_commitments length must not exceed MAX_BLOB_COMMITMENTS_PER_BLOCK + const MAX_BLOB_COMMITMENTS_PER_BLOCK: usize = 4096; + if signed_bid.message.blob_kzg_commitments.len() > MAX_BLOB_COMMITMENTS_PER_BLOCK { + return Err(eyre::eyre!( + "Bid has {} blob_kzg_commitments, exceeds max {}", + signed_bid.message.blob_kzg_commitments.len(), + MAX_BLOB_COMMITMENTS_PER_BLOCK + )); + } + + info!( + slot, + value = signed_bid.message.value, + block_hash = ?signed_bid.message.block_hash, + builder_index = signed_bid.message.builder_index, + "Submitting bid to beacon node for P2P gossip" + ); + + // submit to beacon node + self.beacon_client + .submit_execution_payload_bid(&signed_bid) + .await?; + + // keep track of our bid + bid_tracker.on_bid_submitted(&signed_bid); + + info!( + slot, + value = signed_bid.message.value, + "Bid submitted successfully" + ); + + Ok(()) + } + + /// Create the bid resubmission interval timer. + fn create_bid_interval(&self) -> tokio::time::Interval { + let interval_ms = if self.config.bid_interval_ms > 0 { + self.config.bid_interval_ms + } else { + // single bid mode: use a long interval, bids will be gated by the scheduler + 1000 + }; + let mut interval = tokio::time::interval(std::time::Duration::from_millis(interval_ms)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + interval + } + + /// SSE listener for head events. + async fn run_head_listener( + client: Client, + tx: mpsc::UnboundedSender, + cancel: CancellationToken, + ) { + loop { + if cancel.is_cancelled() { + return; + } + + match client.get_events::().await { + Ok(mut stream) => { + info!("Connected to beacon node head SSE stream"); + loop { + tokio::select! { + _ = cancel.cancelled() => return, + event = stream.next() => { + match event { + Some(Ok(head_event)) => { + if tx.send(head_event).is_err() { + return; // Receiver dropped + } + } + Some(Err(e)) => { + warn!(error = %e, "Error in head SSE stream"); + break; // Reconnect + } + None => { + warn!("Head SSE stream ended"); + break; // Reconnect + } + } + } + } + } + } + Err(e) => { + warn!(error = %e, "Failed to connect to head SSE stream"); + } + } + + // backoff before reconnecting + tokio::select! { + _ = cancel.cancelled() => return, + _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {} + } + } + } + + /// SSE listener for execution payload bid events. + async fn run_bid_listener( + client: Client, + tx: mpsc::UnboundedSender, + cancel: CancellationToken, + ) { + loop { + if cancel.is_cancelled() { + return; + } + + match client.get_events::().await { + Ok(mut stream) => { + info!("Connected to beacon node bid SSE stream"); + loop { + tokio::select! { + _ = cancel.cancelled() => return, + event = stream.next() => { + match event { + Some(Ok(bid)) => { + if tx.send(bid).is_err() { + return; + } + } + Some(Err(e)) => { + warn!(error = %e, "Error in bid SSE stream"); + break; + } + None => { + warn!("Bid SSE stream ended"); + break; + } + } + } + } + } + } + Err(e) => { + warn!(error = %e, "Failed to connect to bid SSE stream"); + } + } + + tokio::select! { + _ = cancel.cancelled() => return, + _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {} + } + } + } + + /// SSE listener for proposer preferences events. + async fn run_prefs_listener( + client: Client, + cache: Arc, + cancel: CancellationToken, + ) { + loop { + if cancel.is_cancelled() { + return; + } + + match client.get_events::().await { + Ok(mut stream) => { + info!("Connected to beacon node proposer preferences SSE stream"); + loop { + tokio::select! { + _ = cancel.cancelled() => return, + event = stream.next() => { + match event { + Some(Ok(signed_prefs)) => { + cache.insert(signed_prefs.message); + } + Some(Err(e)) => { + warn!(error = %e, "Error in proposer preferences SSE stream"); + break; + } + None => { + warn!("Proposer preferences SSE stream ended"); + break; + } + } + } + } + } + } + Err(e) => { + warn!(error = %e, "Failed to connect to proposer preferences SSE stream"); + } + } + + tokio::select! { + _ = cancel.cancelled() => return, + _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {} + } + } + } +} + +impl std::fmt::Debug for EpbsP2PService { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("EpbsP2PService") + .field("config", &self.config) + .finish() + } +} diff --git a/crates/rbuilder/src/live_builder/builder_api/p2p/types.rs b/crates/rbuilder/src/live_builder/builder_api/p2p/types.rs new file mode 100644 index 000000000..e83dfd582 --- /dev/null +++ b/crates/rbuilder/src/live_builder/builder_api/p2p/types.rs @@ -0,0 +1,53 @@ +use serde::Deserialize; + +/// config for the p2p epbs builder service. +#[derive(Debug, Clone)] +pub struct EpbsP2PConfig { + /// Whether P2P bid broadcasting is enabled. + pub enabled: bool, + /// Milliseconds into slot to start bidding (default: 0 = slot start). + pub bid_start_ms: u64, + /// Milliseconds into slot to stop bidding (default: 4000). + pub bid_end_ms: u64, + /// Interval between bid resubmissions in ms. 0 = single bid mode. + pub bid_interval_ms: u64, + /// Value increment per resubmission in gwei. + pub bid_value_increment_gwei: u64, + /// Genesis time from the beacon chain (seconds since unix epoch). + pub genesis_time: u64, + /// Slot duration in seconds (from beacon spec). + pub seconds_per_slot: u64, +} + +impl Default for EpbsP2PConfig { + fn default() -> Self { + Self { + enabled: false, + bid_start_ms: 0, + bid_end_ms: 4000, + bid_interval_ms: 500, + bid_value_increment_gwei: 0, + genesis_time: 0, + seconds_per_slot: 12, + } + } +} + +/// event types used by the P2P service event loop. +#[derive(Debug, Clone)] +pub enum P2PEvent { + /// A new head was seen on the beacon chain. + NewHead(HeadEventData), + /// A competing bid was seen on the P2P network. + BidReceived(rbuilder_primitives::epbs::SignedExecutionPayloadBid), + /// Proposer preferences received from P2P gossip. + ProposerPreferences(rbuilder_primitives::epbs::SignedProposerPreferences), +} + +/// Extracted head event data relevant to the p2p builder. +#[derive(Debug, Clone, Deserialize)] +pub struct HeadEventData { + pub slot: u64, + pub block_root: alloy_primitives::B256, + pub state_root: alloy_primitives::B256, +} diff --git a/crates/rbuilder/src/live_builder/builder_api/server.rs b/crates/rbuilder/src/live_builder/builder_api/server.rs new file mode 100644 index 000000000..01ab718b1 --- /dev/null +++ b/crates/rbuilder/src/live_builder/builder_api/server.rs @@ -0,0 +1,166 @@ +//! EPBS Builder API HTTP Server. + +use alloy_primitives::BlockHash; +use axum::{routing::get, Router}; +use parking_lot::RwLock; +use rbuilder_primitives::epbs::{CachedPayloadData, GetBidParams, SignedExecutionPayloadBid}; +use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; +use tokio::net::TcpListener; +use tokio_util::sync::CancellationToken; +use tracing::info; + +use super::handlers::{get_execution_payload_bid_handler, status_handler}; + +#[derive(Debug, Clone)] +pub struct EpbsBuilderServerConfig { + /// server address + pub listen_addr: SocketAddr, + /// max age for cached payloads before they are evicted. + pub cache_ttl: Duration, +} + +impl Default for EpbsBuilderServerConfig { + fn default() -> Self { + Self { + listen_addr: "0.0.0.0:18551".parse().unwrap(), + cache_ttl: Duration::from_secs(32 * 12), // setting upto 2 epochs + } + } +} + +/// Trait for generating EPBS bids. +/// +/// This trait is implemented by the block builder to provide bids +/// to the EPBS Builder API server. +#[async_trait::async_trait] +pub trait EpbsBidProvider: Send + Sync { + /// generates the signed execution payload and returns it if no error encountered + /// returns none if no bid can be generated (e.g., unknown slot, no payload ready). + async fn generate_bid( + &self, + params: &GetBidParams, + ) -> eyre::Result>; +} + +/// State shared between the HTTP server and handlers. +pub struct EpbsBuilderState { + /// builder server config + pub config: EpbsBuilderServerConfig, + /// bid provider (block builder integration). + bid_provider: Arc, + /// cache the generated payloads, keyed by block_hash. + /// when a bid is returned, the full payload is cached here + /// so it can be revealed when the beacon block is seen. + payload_cache: RwLock>, +} + +impl std::fmt::Debug for EpbsBuilderState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("EpbsBuilderState") + .field("config", &self.config) + .field("payload_cache_len", &self.payload_cache.read().len()) + .finish() + } +} + +impl EpbsBuilderState { + pub fn new(config: EpbsBuilderServerConfig, bid_provider: Arc) -> Self { + Self { + config, + bid_provider, + payload_cache: RwLock::new(HashMap::new()), + } + } + + /// Returns a signed execution payload bid given the bid params. + pub async fn get_execution_payload_bid( + &self, + params: &GetBidParams, + ) -> eyre::Result> { + self.bid_provider.generate_bid(params).await + } + + /// cache a payload for later revelation. + pub fn cache_payload(&self, data: CachedPayloadData) { + let block_hash = data.bid.message.block_hash; + self.payload_cache.write().insert(block_hash, data); + } + + pub fn get_cached_payload(&self, block_hash: &BlockHash) -> Option { + self.payload_cache.read().get(block_hash).cloned() + } + + pub fn cleanup_cache(&self) { + let ttl = self.config.cache_ttl; + self.payload_cache + .write() + .retain(|_, v| v.created_at.elapsed() < ttl); + } +} + +/// EPBS Builder API HTTP Server. +#[derive(Debug)] +pub struct EpbsBuilderServer { + state: Arc, +} + +impl EpbsBuilderServer { + pub fn new(config: EpbsBuilderServerConfig, bid_provider: Arc) -> Self { + Self { + state: Arc::new(EpbsBuilderState::new(config, bid_provider)), + } + } + + pub fn state(&self) -> Arc { + self.state.clone() + } + + /// Returns the listen address for this server. + pub fn listen_addr(&self) -> std::net::SocketAddr { + self.state.config.listen_addr + } + + fn build_router(&self) -> Router { + Router::new() + .route( + "/eth/v1/builder/execution_payload_bid/:slot/:parent_hash/:parent_root/:proposer_index", + get(get_execution_payload_bid_handler), + ) + .route("/eth/v1/builder/status", get(status_handler)) + .with_state(self.state.clone()) + } + + pub async fn run(self, cancel: CancellationToken) -> eyre::Result<()> { + let addr = self.state.config.listen_addr; + let router = self.build_router(); + + info!("starting builder server for epbs bids {}", addr); + + let listener = TcpListener::bind(addr).await?; + + // spawn cache cleanup task + let state_clone = self.state.clone(); + let cancel_clone = cancel.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(60)); + loop { + tokio::select! { + _ = cancel_clone.cancelled() => break, + _ = interval.tick() => { + state_clone.cleanup_cache(); + } + } + } + }); + + // run the server + axum::serve(listener, router) + .with_graceful_shutdown(async move { + cancel.cancelled().await; + info!("shutting down builder server"); + }) + .await?; + + Ok(()) + } +} diff --git a/crates/rbuilder/src/live_builder/config.rs b/crates/rbuilder/src/live_builder/config.rs index b35be25d6..f7070f425 100644 --- a/crates/rbuilder/src/live_builder/config.rs +++ b/crates/rbuilder/src/live_builder/config.rs @@ -7,10 +7,15 @@ use super::{ bidding_service_interface::{ BidObserver, BiddingService, LandedBlockInfo, NullBidObserver, }, + block_observer, relay_submit::{RelaySubmitSinkFactory, SubmissionConfig}, true_value_bidding_service::NewTrueBlockValueBiddingService, unfinished_block_processing::UnfinishedBuiltBlocksInputFactory, }, + builder_api::{ + EpbsBuilderServer, EpbsBuilderServerConfig, EpbsP2PService, LiveEpbsBidProvider, + LiveEpbsBidProviderConfig, + }, wallet_balance_watcher::WalletBalanceWatcher, }; use crate::{ @@ -42,6 +47,7 @@ use crate::{ mev_boost::{ bloxroute_grpc, optimistic_v3::{self, OptimisticV3BlockCache}, + sign_epbs::EpbsBidSigner, BLSBlockSigner, MevBoostRelayBidSubmitter, MevBoostRelaySlotInfoProvider, RelayClient, RelayConfig, RelaySubmitConfig, }, @@ -94,6 +100,19 @@ pub const BID_SOURCE_TIMEOUT_SECS: u64 = 28; /// Don't want to waste too much time in case i failed to non-boost block. pub const BID_SOURCE_WAIT_TIME_SECS: u64 = 2; +pub const DEFAULT_EPBS_SERVER_PORT: u16 = 18551; + +/// Default for epbs_enabled - enabled by default. +/// Signing domain will be fetched from beacon chain in background if not configured +fn default_epbs_enabled() -> bool { + true +} + +/// Default EPBS server port +fn default_epbs_server_port() -> u16 { + DEFAULT_EPBS_SERVER_PORT +} + #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] #[serde(tag = "algo", rename_all = "kebab-case", deny_unknown_fields)] pub enum SpecificBuilderConfig { @@ -216,6 +235,47 @@ pub struct L1Config { pub optimistic_v3_public_url: String, /// The relay pubkey. pub optimistic_v3_relay_pubkeys: HashSet, + + /// Enable EPBS Builder API server. + #[serde(default = "default_epbs_enabled")] + pub epbs_enabled: bool, + /// EPBS Builder API server IP. + #[serde(default = "default_ip")] + pub epbs_server_ip: Ipv4Addr, + /// EPBS Builder API server port. + #[serde(default = "default_epbs_server_port")] + pub epbs_server_port: u16, + /// Secret key for the builder's validator (for signing EPBS bids). + /// If not provided, relay_secret_key will be used. + epbs_builder_secret_key: Option>, + /// Signing domain for EPBS bids (32 bytes hex). + /// Computed from: DOMAIN_BEACON_BUILDER + fork_data_root + /// Can be set via env var: "$EPBS_SIGNING_DOMAIN" + epbs_signing_domain: Option>, + + /// Enable P2P ePBS builder (bid gossip via beacon node). + #[serde(default)] + pub epbs_p2p_enabled: bool, + /// Milliseconds into slot to start bidding (P2P mode). + #[serde(default)] + pub epbs_p2p_bid_start_ms: u64, + /// Milliseconds into slot to stop bidding (P2P mode). + #[serde(default = "default_epbs_p2p_bid_end_ms")] + pub epbs_p2p_bid_end_ms: u64, + /// Interval between bid resubmissions in ms (0 = single bid, P2P mode). + #[serde(default = "default_epbs_p2p_bid_interval_ms")] + pub epbs_p2p_bid_interval_ms: u64, + /// Value increment per resubmission in gwei (P2P mode). + #[serde(default)] + pub epbs_p2p_bid_value_increment_gwei: u64, +} + +fn default_epbs_p2p_bid_end_ms() -> u64 { + 4000 +} + +fn default_epbs_p2p_bid_interval_ms() -> u64 { + 500 } impl Default for L1Config { @@ -232,6 +292,18 @@ impl Default for L1Config { optimistic_v3_server_port: 6071, optimistic_v3_public_url: String::new(), optimistic_v3_relay_pubkeys: HashSet::default(), + // EPBS defaults - enabled by default for testing + epbs_enabled: true, + epbs_server_ip: default_ip(), + epbs_server_port: DEFAULT_EPBS_SERVER_PORT, + epbs_builder_secret_key: None, + epbs_signing_domain: None, + // EPBS P2P defaults - disabled by default + epbs_p2p_enabled: false, + epbs_p2p_bid_start_ms: 0, + epbs_p2p_bid_end_ms: default_epbs_p2p_bid_end_ms(), + epbs_p2p_bid_interval_ms: default_epbs_p2p_bid_interval_ms(), + epbs_p2p_bid_value_increment_gwei: 0, } } } @@ -251,6 +323,276 @@ impl L1Config { .collect() } + pub fn epbs_server_addr(&self) -> SocketAddr { + SocketAddr::V4(SocketAddrV4::new(self.epbs_server_ip, self.epbs_server_port)) + } + + /// Returns the EPBS builder secret key, falling back to relay_secret_key if not set. + pub fn epbs_secret_key(&self) -> eyre::Result> { + let key_str = if let Some(key) = &self.epbs_builder_secret_key { + Some(key.value()?) + } else if let Some(key) = &self.relay_secret_key { + Some(key.value()?) + } else { + None + }; + + match key_str { + Some(s) => { + let key = SecretKey::try_from(s) + .map_err(|e| eyre::eyre!("Failed to parse EPBS secret key: {:?}", e))?; + Ok(Some(key)) + } + None => Ok(None), + } + } + + /// Returns the EPBS signing domain from config/env if set. + /// + /// The signing domain should be computed as: + /// `DOMAIN_BEACON_BUILDER (4 bytes) + fork_data_root[0:28]` + /// + /// Can be configured via: + /// - Config file: `epbs_signing_domain = "0x..."` + /// - Environment variable: `epbs_signing_domain = "$EPBS_SIGNING_DOMAIN"` + /// + /// Returns None if not configured (will be fetched from beacon chain). + pub fn epbs_signing_domain(&self) -> eyre::Result> { + match &self.epbs_signing_domain { + Some(domain) => { + let domain_str = domain.value()?; + let domain_str = domain_str.strip_prefix("0x").unwrap_or(&domain_str); + let bytes = hex::decode(domain_str) + .map_err(|e| eyre::eyre!("Failed to decode EPBS signing domain: {}", e))?; + if bytes.len() != 32 { + return Err(eyre::eyre!( + "EPBS signing domain must be 32 bytes, got {}", + bytes.len() + )); + } + let mut arr = [0u8; 32]; + arr.copy_from_slice(&bytes); + Ok(Some(B256::from(arr))) + } + None => Ok(None), + } + } + + /// Create EPBS components if enabled. + /// + /// The builder_index and signing_domain are fetched from the beacon chain + /// in a background task, allowing the server to start immediately without blocking. + /// + /// Returns: + /// - The EPBS bid provider (also implements BlockObserver) + /// - The EPBS server (to be spawned) + /// - Optionally, the P2P service (if epbs_p2p_enabled) + /// + /// Returns None if EPBS is not enabled. + pub fn create_epbs_components( + &self, + ) -> eyre::Result< + Option<( + Arc, + EpbsBuilderServer, + Option, + )>, + > { + use crate::mev_boost::sign_epbs::compute_epbs_domain; + + if !self.epbs_enabled { + info!("EPBS Builder API server is disabled"); + return Ok(None); + } + + info!( + listen_addr = %self.epbs_server_addr(), + "EPBS Builder API server is enabled" + ); + + let secret_key = self + .epbs_secret_key()? + .ok_or_else(|| eyre::eyre!("EPBS secret key is required when epbs_enabled is true"))?; + + // get pubkey for retreiving builder_index + let pubkey = secret_key.public_key(); + let pubkey_bytes = pubkey.as_ref().to_vec(); + + // Get signing domain from config (optional - will be fetched if not provided) + let signing_domain = self.epbs_signing_domain()?; + + let clients = self.beacon_clients()?; + if clients.is_empty() { + return Err(eyre::eyre!( + "No beacon chain clients configured. Set cl_node_url for EPBS." + )); + } + + // Create provider without signer - will be initialized in background + // after fetching builder_index and signing domain from beacon chain + info!("Will fetch builder_index and signing domain from beacon chain in background"); + let provider = Arc::new(LiveEpbsBidProvider::new_uninitialized( + LiveEpbsBidProviderConfig::default(), + )); + + let p2p_beacon_client = clients.first().cloned(); + + // Spawn background task to fetch builder_index and signing domain, then initialize signer + let provider_clone = provider.clone(); + tokio::spawn(async move { + // retry config + const MAX_RETRIES: u32 = 120; // ~20 minutes with max backoff + const INITIAL_BACKOFF_MS: u64 = 1000; + const MAX_BACKOFF_MS: u64 = 10000; + + let mut last_error: Option = None; + let mut attempt = 0; + + loop { + attempt += 1; + + for client in &clients { + // Look up builder_index by public key + let builder_index = match client.get_validator_by_pubkey(&pubkey_bytes).await { + Ok(validator) => { + info!( + builder_index = validator.index, + pubkey = %validator.validator.pubkey, + status = %validator.status, + "Found builder validator on beacon chain" + ); + + // Check if the validator is a builder (has BUILDER_WITHDRAWAL_PREFIX) + if !validator + .validator + .withdrawal_credentials + .starts_with("0x03") + { + tracing::warn!( + withdrawal_credentials = %validator.validator.withdrawal_credentials, + "Validator does not have BUILDER_WITHDRAWAL_PREFIX (0x03). \ + This validator may not be recognized as a builder." + ); + } + + validator.index + } + Err(e) => { + last_error = Some(e); + continue; + } + }; + + // Get signing domain (from config or beacon chain) + let domain = if let Some(domain) = signing_domain { + info!("Using configured EPBS signing domain"); + domain + } else { + match client.get_genesis().await { + Ok(genesis) => { + let domain = compute_epbs_domain( + genesis.genesis_fork_version, + genesis.genesis_validators_root, + ); + info!( + ?domain, + genesis_fork_version = ?genesis.genesis_fork_version, + genesis_validators_root = ?genesis.genesis_validators_root, + "Computed EPBS signing domain from beacon chain" + ); + domain + } + Err(e) => { + last_error = Some(e); + continue; + } + } + }; + + // Create and set the signer + let signer = EpbsBidSigner::new(secret_key, builder_index, domain); + provider_clone.set_signer(signer); + info!( + builder_index, + "EPBS signer initialized, bid generation is now enabled" + ); + return; + } + + if attempt >= MAX_RETRIES { + tracing::error!( + "Failed to initialize EPBS signer after {} attempts: {:?}. EPBS bids will not be generated.", + MAX_RETRIES, + last_error + ); + return; + } + + let backoff_ms: u64 = + std::cmp::min(INITIAL_BACKOFF_MS * 2u64.pow(attempt - 1), MAX_BACKOFF_MS); + + info!( + attempt, + max_retries = MAX_RETRIES, + backoff_ms, + error = ?last_error, + "Beacon client not ready, retrying in background..." + ); + tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await; + } + }); + + let server_config = EpbsBuilderServerConfig { + listen_addr: self.epbs_server_addr(), + ..Default::default() + }; + + // Create the server + let server = EpbsBuilderServer::new(server_config, provider.clone()); + + info!( + listen_addr = %self.epbs_server_addr(), + "EPBS Builder API server configured (waiting for beacon chain for builder_index)" + ); + + // Create P2P service if enabled + let p2p_service = if self.epbs_p2p_enabled { + use super::builder_api::p2p::{EpbsP2PConfig}; + + info!("EPBS P2P builder service is enabled"); + + let p2p_config = EpbsP2PConfig { + enabled: true, + bid_start_ms: self.epbs_p2p_bid_start_ms, + bid_end_ms: self.epbs_p2p_bid_end_ms, + bid_interval_ms: self.epbs_p2p_bid_interval_ms, + bid_value_increment_gwei: self.epbs_p2p_bid_value_increment_gwei, + // TODO: make this better + // genesis_time and seconds_per_slot will be set once beacon chain is available. + // For now use defaults; the service waits for the signer to be ready anyway. + genesis_time: 0, + seconds_per_slot: 12, + }; + + let beacon_client = p2p_beacon_client + .ok_or_else(|| eyre::eyre!("No beacon client available for P2P service"))?; + + let p2p_service = EpbsP2PService::new( + p2p_config, + beacon_client, + provider.clone(), + provider.shared_signer(), + provider.shared_payload_cache(), + ); + + Some(p2p_service) + } else { + None + }; + + Ok(Some((provider, server, p2p_service))) + } + /// Analyzes relay_config and creates MevBoostRelayBidSubmitter/MevBoostRelaySlotInfoProvider as needed. fn create_relay_sub_objects( relay_config: &RelayConfig, @@ -510,6 +852,19 @@ impl LiveBuilderConfig for Config { let (wallet_balance_watcher, _) = create_wallet_balance_watcher(provider.clone(), &self.base_config).await?; + // Create EPBS components if enabled + let epbs_components = self.l1_config.create_epbs_components()?; + let (block_observer, epbs_server, epbs_p2p_service): ( + Option>, + Option, + Option, + ) = match epbs_components { + Some((bid_provider, server, p2p_service)) => { + (Some(bid_provider), Some(server), p2p_service) + } + None => (None, None, None), + }; + let (sink_factory, slot_info_provider, adjustment_fee_payers) = create_sink_factory_and_relays( &self.base_config, @@ -519,10 +874,11 @@ impl LiveBuilderConfig for Config { Box::new(NullBidObserver {}), bidding_service, cancellation_token.clone(), + block_observer, ) .await?; - let live_builder = create_builder_from_sink( + let mut live_builder = create_builder_from_sink( &self.base_config, &self.l1_config, provider, @@ -532,6 +888,16 @@ impl LiveBuilderConfig for Config { cancellation_token, ) .await?; + + // Set EPBS server if enabled + if let Some(server) = epbs_server { + live_builder = live_builder.with_epbs_server(server); + } + // Set EPBS P2P service if enabled + if let Some(p2p_service) = epbs_p2p_service { + live_builder = live_builder.with_epbs_p2p_service(p2p_service); + } + let builders = create_builders( self.live_builders()?, self.base_config.max_order_execution_duration_warning(), @@ -1082,6 +1448,7 @@ pub async fn create_sink_factory_and_relays

( bid_observer: Box, bidding_service: Arc, cancellation_token: CancellationToken, + block_observer: Option>, ) -> eyre::Result<( UnfinishedBuiltBlocksInputFactory

, Vec, @@ -1107,7 +1474,7 @@ where ); } - let sink_factory = UnfinishedBuiltBlocksInputFactory::new( + let mut sink_factory = UnfinishedBuiltBlocksInputFactory::new( bidding_service, sink_sealed_factory, wallet_balance_watcher, @@ -1115,6 +1482,11 @@ where relay_sets, ); + // Wire block observer for EPBS integration + if let Some(observer) = block_observer { + sink_factory = sink_factory.with_block_observer(observer); + } + Ok((sink_factory, slot_info_provider, adjustment_fee_payers)) } diff --git a/crates/rbuilder/src/live_builder/mod.rs b/crates/rbuilder/src/live_builder/mod.rs index bdcdd38f5..c3aa66a3b 100644 --- a/crates/rbuilder/src/live_builder/mod.rs +++ b/crates/rbuilder/src/live_builder/mod.rs @@ -1,6 +1,7 @@ pub mod base_config; pub mod block_list_provider; pub mod block_output; +pub mod builder_api; pub mod building; pub mod cli; pub mod config; @@ -15,6 +16,7 @@ pub mod watchdog; use crate::{ building::{builders::BlockBuildingAlgorithm, BlockBuildingContext}, live_builder::{ + builder_api::EpbsBuilderServer, order_flow_tracing::order_flow_tracer_manager::OrderFlowTracerManager, order_input::{start_orderpool_jobs, OrderInputConfig}, process_killer::ProcessKiller, @@ -134,6 +136,15 @@ where pub simulation_use_random_coinbase: bool, pub order_flow_tracer_manager: Box, + + /// Optional EPBS Builder API server (EIP-7732). + /// When set, the server will be spawned alongside the builder. + pub epbs_server: Option, + + /// Optional EPBS P2P builder service. + /// When set, bids will be broadcast via p2p and payload envelopes + /// will be revealed after bid inclusion in beacon blocks. + pub epbs_p2p_service: Option, } impl

LiveBuilder

@@ -148,6 +159,31 @@ where Self { builders, ..self } } + /// Set the EPBS Builder API server. + /// + /// When set, the server will be spawned when `run()` is called and will + /// serve bids to proposers via the Builder API (EIP-7732). + pub fn with_epbs_server(self, server: EpbsBuilderServer) -> Self { + Self { + epbs_server: Some(server), + ..self + } + } + + /// Set the EPBS P2P builder service. + /// + /// When set, the service will be spawned when `run()` is called and will + /// broadcast bids via p2p gossip and reveal payloads after bid inclusion. + pub fn with_epbs_p2p_service( + self, + service: builder_api::EpbsP2PService, + ) -> Self { + Self { + epbs_p2p_service: Some(service), + ..self + } + } + pub async fn run( self, ready_to_build: Arc, // If Some, we should send a message for every slot we start building. @@ -233,6 +269,31 @@ where self.order_flow_tracer_manager, ); + // Spawn EPBS Builder API server if configured + if let Some(epbs_server) = self.epbs_server { + let cancel = self.global_cancellation.clone(); + info!( + listen_addr = %epbs_server.listen_addr(), + "Starting EPBS Builder API server" + ); + inner_jobs_handles.push(tokio::spawn(async move { + if let Err(e) = epbs_server.run(cancel).await { + error!(?e, "EPBS Builder API server error"); + } + })); + } + + // Spawn epbs p2p builder service if configured + if let Some(p2p_service) = self.epbs_p2p_service { + let cancel = self.global_cancellation.clone(); + info!("Starting EPBS P2P builder service"); + inner_jobs_handles.push(tokio::spawn(async move { + if let Err(e) = p2p_service.run(cancel).await { + error!(?e, "EPBS P2P builder service error"); + } + })); + } + ready_to_build.store(true, Ordering::Relaxed); while let Some(payload) = payload_events_channel.recv().await { let blocklist = self.blocklist_provider.get_blocklist()?; diff --git a/crates/rbuilder/src/mev_boost/mod.rs b/crates/rbuilder/src/mev_boost/mod.rs index 1168f5f12..179a32cb7 100644 --- a/crates/rbuilder/src/mev_boost/mod.rs +++ b/crates/rbuilder/src/mev_boost/mod.rs @@ -28,8 +28,10 @@ mod error; pub mod fake_mev_boost_relay; pub mod optimistic_v3; pub mod rpc; +pub mod sign_epbs; pub mod sign_payload; pub use error::*; +pub use sign_epbs::*; pub use sign_payload::*; const TOTAL_PAYMENT_HEADER: &str = "Total-Payment"; diff --git a/crates/rbuilder/src/mev_boost/sign_epbs.rs b/crates/rbuilder/src/mev_boost/sign_epbs.rs new file mode 100644 index 000000000..0fb9e04ff --- /dev/null +++ b/crates/rbuilder/src/mev_boost/sign_epbs.rs @@ -0,0 +1,406 @@ +//! EPBS bid and envelope signing for EIP-7732/Gloas. +//! +//! This module implements signing for ExecutionPayloadBid and ExecutionPayloadEnvelope +//! using the DOMAIN_BEACON_BUILDER domain as specified in the consensus specs. + +use alloy_primitives::{Address, BlockHash, B256}; +use alloy_rpc_types_beacon::BlsSignature; +use ethereum_consensus::{ + bellatrix::Transaction, + capella::Withdrawal, + crypto::SecretKey, + primitives::{Bytes32, ExecutionAddress, Gwei, Hash32}, + signing::sign_with_domain, + ssz::prelude::*, +}; +use rbuilder_primitives::epbs::{ + ExecutionPayloadBid, ExecutionPayloadEnvelope, SignedExecutionPayloadBid, + SignedExecutionPayloadEnvelope, +}; + +/// DOMAIN_BEACON_BUILDER from consensus-specs/specs/gloas/beacon-chain.md +/// Value: DomainType('0x0B000000') +pub const DOMAIN_BEACON_BUILDER: [u8; 4] = [0x0B, 0x00, 0x00, 0x00]; + +/// Signer for EPBS bids using the builder's validator key. +/// +/// uses DOMAIN_BEACON_BUILDER since the builder is now a staked +/// validator in the beacon chain. +#[derive(Debug, Clone)] +pub struct EpbsBidSigner { + /// Builder validator secret key. + sec: SecretKey, + /// The builders validator index in the beacon chain. + builder_index: u64, + /// Pre comp domain for signing (DOMAIN_BEACON_BUILDER + fork version + genesis validators root). + domain: B256, +} + +impl EpbsBidSigner { + /// Create a new EPBS bid signer. + pub fn new(sec: SecretKey, builder_index: u64, domain: B256) -> Self { + Self { + sec, + builder_index, + domain, + } + } + + /// Create from a hex-encoded secret key string. + pub fn from_string(secret_key: String, builder_index: u64, domain: B256) -> eyre::Result { + let secret_key = SecretKey::try_from(secret_key) + .map_err(|e| eyre::eyre!("Failed to parse key: {:?}", e.to_string()))?; + Ok(Self::new(secret_key, builder_index, domain)) + } + + /// Get the builder's validator index. + pub fn builder_index(&self) -> u64 { + self.builder_index + } + + /// Get the builder's public key. + pub fn pub_key(&self) -> alloy_rpc_types_beacon::BlsPublicKey { + alloy_rpc_types_beacon::BlsPublicKey::from_slice(&self.sec.public_key()) + } + + /// Sign an ExecutionPayloadBid. + /// + /// This follows the spec: + /// ```python + /// def get_execution_payload_bid_signature( + /// state: BeaconState, bid: ExecutionPayloadBid, privkey: int + /// ) -> BLSSignature + pub fn sign_bid(&self, bid: &ExecutionPayloadBid) -> eyre::Result { + let ssz_bid = SszExecutionPayloadBid::from_bid(bid); + let signature = sign_with_domain(&ssz_bid, &self.sec, *self.domain)?; + let signature = BlsSignature::from_slice(&signature); + + Ok(SignedExecutionPayloadBid { + message: bid.clone(), + signature, + }) + } + + pub fn sign_envelope( + &self, + envelope: &ExecutionPayloadEnvelope, + ) -> eyre::Result { + let ssz_envelope = SszExecutionPayloadEnvelope::from_envelope(envelope)?; + let signature = sign_with_domain(&ssz_envelope, &self.sec, *self.domain)?; + let signature = BlsSignature::from_slice(&signature); + + Ok(SignedExecutionPayloadEnvelope { + message: envelope.clone(), + signature, + }) + } +} + +/// SSZ-merkleizable version of ExecutionPayloadBid for signing. + +#[derive(Debug, Clone, PartialEq, Eq, Hash, SimpleSerialize)] +pub struct SszExecutionPayloadBid { + pub parent_block_hash: Hash32, + pub parent_block_root: Hash32, + pub block_hash: Hash32, + pub prev_randao: Hash32, + pub fee_recipient: ExecutionAddress, + pub gas_limit: u64, + pub builder_index: u64, + pub slot: u64, + pub value: u64, + pub execution_payment: u64, + pub blob_kzg_commitments_root: Hash32, +} + +// TODO: use a better approach here. Import types when available rather +impl SszExecutionPayloadBid { + pub fn from_bid(bid: &ExecutionPayloadBid) -> Self { + let commitments_refs: Vec<&[u8]> = bid + .blob_kzg_commitments + .iter() + .map(|c| c.as_ref()) + .collect(); + let commitments_root = + rbuilder_primitives::mev_boost::ssz_roots::calculate_blob_kzg_commitments_root_ssz( + &commitments_refs, + ); + + Self { + parent_block_hash: hash32_from_block_hash(&bid.parent_block_hash), + parent_block_root: hash32_from_b256(&bid.parent_block_root), + block_hash: hash32_from_block_hash(&bid.block_hash), + prev_randao: hash32_from_b256(&bid.prev_randao), + fee_recipient: address_to_execution_address(&bid.fee_recipient), + gas_limit: bid.gas_limit, + builder_index: bid.builder_index, + slot: bid.slot, + value: bid.value, + execution_payment: bid.execution_payment, + blob_kzg_commitments_root: hash32_from_b256(&commitments_root), + } + } +} + +// mainnet constants from consensus-specs +const BYTES_PER_LOGS_BLOOM: usize = 256; +const MAX_EXTRA_DATA_BYTES: usize = 32; +const MAX_BYTES_PER_TRANSACTION: usize = 1_073_741_824; // 2^30 +const MAX_TRANSACTIONS_PER_PAYLOAD: usize = 1_048_576; // 2^20 +const MAX_WITHDRAWALS_PER_PAYLOAD: usize = 16; +const MAX_DEPOSIT_REQUESTS_PER_PAYLOAD: usize = 8192; // 2^13 +const MAX_WITHDRAWAL_REQUESTS_PER_PAYLOAD: usize = 16; // 2^4 +const MAX_CONSOLIDATION_REQUESTS_PER_PAYLOAD: usize = 2; // 2^1 + +// TODO: import via libs when available +/// SSZ `DepositRequest` from Electra. +#[derive(Default, Debug, Clone, PartialEq, Eq, SimpleSerialize)] +pub struct SszDepositRequest { + pub pubkey: ByteVector<48>, + pub withdrawal_credentials: Bytes32, + pub amount: u64, + pub signature: ByteVector<96>, + pub index: u64, +} + +// TODO: import via libs when available +/// SSZ `WithdrawalRequest` from Electra. +#[derive(Default, Debug, Clone, PartialEq, Eq, SimpleSerialize)] +pub struct SszWithdrawalRequest { + pub source_address: ExecutionAddress, + pub validator_pubkey: ByteVector<48>, + pub amount: u64, +} + +// TODO: import via libs when available +/// SSZ `ConsolidationRequest` from Electra. +#[derive(Default, Debug, Clone, PartialEq, Eq, SimpleSerialize)] +pub struct SszConsolidationRequest { + pub source_address: ExecutionAddress, + pub source_pubkey: ByteVector<48>, + pub target_pubkey: ByteVector<48>, +} + +// TODO: import via libs when available +/// SSZ `ExecutionRequests` from Electra. +#[derive(Default, Debug, Clone, PartialEq, Eq, SimpleSerialize)] +pub struct SszExecutionRequests { + pub deposits: List, + pub withdrawals: List, + pub consolidations: List, +} + +/// SSZ `ExecutionPayload` +#[derive(Default, Debug, Clone, PartialEq, Eq, SimpleSerialize)] +pub struct SszExecutionPayload { + pub parent_hash: Hash32, + pub fee_recipient: ExecutionAddress, + pub state_root: Bytes32, + pub receipts_root: Bytes32, + pub logs_bloom: ByteVector, + pub prev_randao: Bytes32, + pub block_number: u64, + pub gas_limit: u64, + pub gas_used: u64, + pub timestamp: u64, + pub extra_data: ByteList, + pub base_fee_per_gas: U256, + pub block_hash: Hash32, + pub transactions: + List, MAX_TRANSACTIONS_PER_PAYLOAD>, + pub withdrawals: List, + pub blob_gas_used: u64, + pub excess_blob_gas: u64, +} + +// TODO: import via libs when available +/// SSZ `ExecutionPayloadEnvelope` from Gloas. +#[derive(Default, Debug, Clone, PartialEq, Eq, SimpleSerialize)] +pub struct SszExecutionPayloadEnvelope { + pub payload: SszExecutionPayload, + pub execution_requests: SszExecutionRequests, + pub builder_index: u64, + pub beacon_block_root: Hash32, + pub slot: u64, + pub state_root: Hash32, +} + +impl SszExecutionPayloadEnvelope { + pub fn from_envelope( + envelope: &ExecutionPayloadEnvelope, + ) -> eyre::Result { + let inner1 = &envelope.payload.payload_inner.payload_inner; + let inner2 = &envelope.payload.payload_inner; + let inner3 = &envelope.payload; + + // convert transactions + let mut transactions = List::default(); + for tx_bytes in &inner1.transactions { + let tx = Transaction::try_from(tx_bytes.as_ref()) + .map_err(|e| eyre::eyre!("Failed to convert transaction: {:?}", e))?; + transactions.push(tx); + } + + // convert withdrawals + let mut withdrawals = List::default(); + for w in &inner2.withdrawals { + let withdrawal = Withdrawal { + index: w.index as usize, + validator_index: w.validator_index as usize, + address: ExecutionAddress::try_from(w.address.as_slice()) + .expect("Address is 20 bytes"), + amount: w.amount as Gwei, + }; + withdrawals.push(withdrawal); + } + + // convert extra_data + let extra_data = ByteList::try_from(inner1.extra_data.as_ref()) + .map_err(|e| eyre::eyre!("Extra data too long: {:?}", e))?; + + let payload = SszExecutionPayload { + parent_hash: hash32_from_b256(&B256::from(inner1.parent_hash)), + fee_recipient: ExecutionAddress::try_from(inner1.fee_recipient.as_slice()) + .expect("Address is 20 bytes"), + state_root: bytes32_from_b256(&B256::from(inner1.state_root)), + receipts_root: bytes32_from_b256(&B256::from(inner1.receipts_root)), + logs_bloom: ByteVector::try_from(inner1.logs_bloom.as_ref()) + .map_err(|e| eyre::eyre!("Invalid logs_bloom: {:?}", e))?, + prev_randao: bytes32_from_b256(&B256::from(inner1.prev_randao)), + block_number: inner1.block_number, + gas_limit: inner1.gas_limit, + gas_used: inner1.gas_used, + timestamp: inner1.timestamp, + extra_data, + base_fee_per_gas: inner1.base_fee_per_gas, + block_hash: hash32_from_b256(&B256::from(inner1.block_hash)), + transactions, + withdrawals, + blob_gas_used: inner3.blob_gas_used, + excess_blob_gas: inner3.excess_blob_gas, + }; + + // convert execution requests + let execution_requests = + convert_execution_requests_to_ssz(&envelope.execution_requests)?; + + Ok(Self { + payload, + execution_requests, + builder_index: envelope.builder_index, + beacon_block_root: hash32_from_b256(&envelope.beacon_block_root), + slot: envelope.slot, + state_root: hash32_from_b256(&envelope.state_root), + }) + } +} + +/// Convert our raw-bytes ExecutionRequests to proper SSZ typed requests. +fn convert_execution_requests_to_ssz( + requests: &rbuilder_primitives::epbs::ExecutionRequests, +) -> eyre::Result { + let mut ssz_requests = SszExecutionRequests::default(); + + for raw in &requests.deposits { + if raw.len() < 192 { + continue; // skip malformed + } + let req = SszDepositRequest { + pubkey: ByteVector::try_from(&raw[0..48]) + .map_err(|e| eyre::eyre!("deposit pubkey: {:?}", e))?, + withdrawal_credentials: Bytes32::try_from(&raw[48..80]) + .map_err(|e| eyre::eyre!("deposit withdrawal_credentials: {:?}", e))?, + amount: u64::from_le_bytes(raw[80..88].try_into().unwrap()), + signature: ByteVector::try_from(&raw[88..184]) + .map_err(|e| eyre::eyre!("deposit signature: {:?}", e))?, + index: u64::from_le_bytes(raw[184..192].try_into().unwrap()), + }; + ssz_requests.deposits.push(req); + } + + for raw in &requests.withdrawals { + if raw.len() < 76 { + continue; + } + let req = SszWithdrawalRequest { + source_address: ExecutionAddress::try_from(&raw[0..20]) + .map_err(|e| eyre::eyre!("withdrawal source_address: {:?}", e))?, + validator_pubkey: ByteVector::try_from(&raw[20..68]) + .map_err(|e| eyre::eyre!("withdrawal validator_pubkey: {:?}", e))?, + amount: u64::from_le_bytes(raw[68..76].try_into().unwrap()), + }; + ssz_requests.withdrawals.push(req); + } + + for raw in &requests.consolidations { + if raw.len() < 116 { + continue; + } + let req = SszConsolidationRequest { + source_address: ExecutionAddress::try_from(&raw[0..20]) + .map_err(|e| eyre::eyre!("consolidation source_address: {:?}", e))?, + source_pubkey: ByteVector::try_from(&raw[20..68]) + .map_err(|e| eyre::eyre!("consolidation source_pubkey: {:?}", e))?, + target_pubkey: ByteVector::try_from(&raw[68..116]) + .map_err(|e| eyre::eyre!("consolidation target_pubkey: {:?}", e))?, + }; + ssz_requests.consolidations.push(req); + } + + Ok(ssz_requests) +} + +// Helper conversion functions + +fn hash32_from_block_hash(h: &BlockHash) -> Hash32 { + Hash32::try_from(h.as_slice()).expect("BlockHash is 32 bytes") +} + +fn hash32_from_b256(h: &B256) -> Hash32 { + Hash32::try_from(h.as_slice()).expect("B256 is 32 bytes") +} + +fn bytes32_from_b256(h: &B256) -> Bytes32 { + Bytes32::try_from(h.as_slice()).expect("B256 is 32 bytes") +} + +fn address_to_execution_address(a: &Address) -> ExecutionAddress { + ExecutionAddress::try_from(a.as_slice()).expect("Address is 20 bytes") +} + +/// Compute the EPBS signing domain from beacon chain genesis data. +/// +/// The domain is computed following the consensus-specs: +/// ```python +/// domain = compute_domain(DOMAIN_BEACON_BUILDER, fork_version, genesis_validators_root) +/// ``` +/// +/// The `fork_version` and `genesis_validators_root` are fetched from the beacon chain +/// via the `/eth/v1/beacon/genesis` endpoint in `config.rs`. +pub fn compute_epbs_domain(fork_version: [u8; 4], genesis_validators_root: B256) -> B256 { + use ethereum_consensus::{ + phase0::beacon_state::ForkData, + primitives::{Root, Version}, + ssz::prelude::*, + }; + + // create ForkData and compute its hash_tree_root + let version = Version::try_from(fork_version.as_slice()).expect("fork_version is 4 bytes"); + let root = Root::try_from(genesis_validators_root.as_slice()).expect("root is 32 bytes"); + + let fork_data = ForkData { + current_version: version, + genesis_validators_root: root, + }; + + let fork_data_root = fork_data + .hash_tree_root() + .expect("ForkData hash_tree_root should not fail"); + + // construcrt domain: DOMAIN_BEACON_BUILDER || fork_data_root[:28] + let mut domain = [0u8; 32]; + domain[0..4].copy_from_slice(&DOMAIN_BEACON_BUILDER); + domain[4..32].copy_from_slice(&fork_data_root[..28]); + + B256::from(domain) +} \ No newline at end of file