Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions crates/rbuilder-primitives/src/evm_inspector.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use ahash::HashMap;
use alloy_consensus::Transaction;
use alloy_primitives::{Address, B256, U256};
use alloy_rpc_types::AccessList;
use reth_primitives::{Recovered, TransactionSigned};
use revm::{
bytecode::opcode,
Expand All @@ -10,7 +9,6 @@ use revm::{
interpreter::{interpreter_types::Jumps, CallInputs, CallOutcome, Interpreter},
Inspector,
};
use revm_inspectors::access_list::AccessListInspector;

#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct SlotKey {
Expand Down Expand Up @@ -257,7 +255,6 @@ where

#[derive(Debug)]
pub struct RBuilderEVMInspector<'a> {
access_list_inspector: AccessListInspector,
used_state_inspector: Option<UsedStateEVMInspector<'a>>,
}

Expand All @@ -266,23 +263,15 @@ impl<'a> RBuilderEVMInspector<'a> {
tx: &Recovered<TransactionSigned>,
used_state_trace: Option<&'a mut UsedStateTrace>,
) -> Self {
let access_list_inspector =
AccessListInspector::new(tx.access_list().cloned().unwrap_or_default());

let mut used_state_inspector = used_state_trace.map(UsedStateEVMInspector::new);
if let Some(i) = &mut used_state_inspector {
i.use_tx_nonce(tx);
}

Self {
access_list_inspector,
used_state_inspector,
}
}

pub fn into_access_list(self) -> AccessList {
self.access_list_inspector.into_access_list()
}
}

impl<'a, CTX> Inspector<CTX> for RBuilderEVMInspector<'a>
Expand All @@ -292,7 +281,6 @@ where
{
#[inline]
fn step(&mut self, interp: &mut Interpreter, context: &mut CTX) {
self.access_list_inspector.step(interp, context);
if let Some(used_state_inspector) = &mut self.used_state_inspector {
used_state_inspector.step(interp, context);
}
Expand Down
33 changes: 31 additions & 2 deletions crates/rbuilder/src/building/order_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,35 @@ where
Factory: EvmFactory,
{
let tx = tx_with_blobs.internal_tx_unsecure();

// Skip the AccessListInspector entirely — it calls step() on every EVM opcode
// just to track accessed addresses for the blocklist check. Instead, we check
// the blocklist against ResultAndState.state (EvmState = HashMap<Address, Account>)
// which already contains every address touched during execution.
// This eliminates ~50% of CPU overhead during block building.
if used_state_tracer.is_none() {
let mut evm = evm_factory.create_evm(db, evm_env);
let res = match evm.transact(tx) {
Ok(res) => res,
Err(err) => match err {
EVMError::Transaction(tx_err) => {
return Ok(Err(TransactionErr::InvalidTransaction(tx_err)))
}
EVMError::Database(_) | EVMError::Header(_) | EVMError::Custom(_) => {
return Err(err.into())
}
},
};
// Check blocklist against addresses in the execution state diff
if !blocklist.is_empty() && res.state.keys().any(|addr| blocklist.contains(addr)) {
return Ok(Err(TransactionErr::Blocklist));
}
return Ok(Ok(res));
}

// Slow path: used_state_tracer is active (parallel builder conflict detection).
// Still need the inspector for UsedStateEVMInspector, but we can skip AccessListInspector
// and use the state diff for blocklist checking instead.
let mut rbuilder_inspector = RBuilderEVMInspector::new(tx, used_state_tracer);

let mut evm = evm_factory.create_evm_with_inspector(db, evm_env, &mut rbuilder_inspector);
Expand All @@ -1177,8 +1206,8 @@ where
},
};
drop(evm);
let access_list = rbuilder_inspector.into_access_list();
if access_list.flatten().any(|(a, _)| blocklist.contains(&a)) {
// Use state diff for blocklist check instead of access list
if !blocklist.is_empty() && res.state.keys().any(|addr| blocklist.contains(addr)) {
return Ok(Err(TransactionErr::Blocklist));
}

Expand Down
117 changes: 76 additions & 41 deletions crates/rbuilder/src/live_builder/order_input/txpool_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,21 @@ use rbuilder_primitives::{
use std::{pin::pin, sync::Arc, time::Instant};
use time::OffsetDateTime;
use tokio::{
sync::{mpsc, mpsc::error::SendTimeoutError},
sync::{mpsc, mpsc::error::SendTimeoutError, Semaphore},
task::JoinHandle,
};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, trace};
use tracing::{error, info, trace, warn};

/// Max concurrent tx fetch requests to prevent overwhelming the RPC connection.
const TX_FETCH_CONCURRENCY: usize = 64;

/// Subscribes to EL mempool and pushes new txs as orders in results.
/// This version allows 4844 by subscribing to subscribe_pending_txs to get the hashes and then calling eth_getRawTransactionByHash
/// to get the raw tx that, in case of 4844 tx, may include blobs.
/// In the future we may consider updating reth so we can process blob txs in a different task to avoid slowing down non blob txs.
///
/// Uses a separate IPC connection for RPC calls and fetches transactions
/// concurrently to avoid blocking the subscription stream.
pub async fn subscribe_to_txpool_with_blobs(
config: OrderInputConfig,
results: mpsc::Sender<ReplaceableOrderPoolCommand>,
Expand All @@ -29,70 +34,100 @@ pub async fn subscribe_to_txpool_with_blobs(
.mempool_source
.ok_or_else(|| eyre::eyre!("No txpool source configured"))?;

let provider = match mempool {
// Create TWO connections: one for subscription stream, one for RPC calls.
// This prevents sequential get_raw_transaction_by_hash calls from blocking
// the subscription consumer and causing backpressure on the IPC socket.
let (sub_provider, rpc_provider) = match &mempool {
MempoolSource::Ipc(path) => {
let ipc = IpcConnect::new(path);
ProviderBuilder::new().connect_ipc(ipc).await?
let ipc1 = IpcConnect::new(path.clone());
let ipc2 = IpcConnect::new(path.clone());
let p1 = ProviderBuilder::new().connect_ipc(ipc1).await?;
let p2 = ProviderBuilder::new().connect_ipc(ipc2).await?;
(p1, Arc::new(p2))
}
MempoolSource::Ws(url) => {
let ws_conn = alloy_provider::WsConnect::new(url);
ProviderBuilder::new().connect_ws(ws_conn).await?
let ws1 = alloy_provider::WsConnect::new(url.clone());
let ws2 = alloy_provider::WsConnect::new(url.clone());
let p1 = ProviderBuilder::new().connect_ws(ws1).await?;
let p2 = ProviderBuilder::new().connect_ws(ws2).await?;
(p1, Arc::new(p2))
}
};

let handle = tokio::spawn(async move {
info!("Subscribe to txpool with blobs: started");

let stream = match provider.subscribe_pending_transactions().await {
let stream = match sub_provider
.subscribe_pending_transactions()
.channel_size(16384)
.await
{
Ok(stream) => stream.into_stream().take_until(global_cancel.cancelled()),
Err(err) => {
error!(?err, "Failed to subscribe to ipc txpool stream");
// Closing builder because this job is critical so maybe restart will help
global_cancel.cancel();
return;
}
};
let mut stream = pin!(stream);

while let Some(tx_hash) = stream.next().await {
let received_at = OffsetDateTime::now_utc();
let start = Instant::now();
// Semaphore limits concurrent fetch tasks to avoid overwhelming the RPC connection
let semaphore = Arc::new(Semaphore::new(TX_FETCH_CONCURRENCY));

let tx_with_blobs = match get_tx_with_blobs(tx_hash, &provider).await {
Ok(Some(tx_with_blobs)) => tx_with_blobs,
Ok(None) => {
trace!(?tx_hash, "tx not found in tx pool");
continue;
}
Err(err) => {
error!(?tx_hash, ?err, "Failed to get tx from pool");
// Consume tx hash notifications as fast as possible, spawning concurrent fetch tasks
while let Some(tx_hash) = stream.next().await {
let permit = match semaphore.clone().try_acquire_owned() {
Ok(permit) => permit,
Err(_) => {
// All fetch slots busy — drop this hash rather than blocking the stream
trace!(?tx_hash, "tx fetch concurrency limit reached, dropping hash");
continue;
}
};

let tx = MempoolTx::new(tx_with_blobs);
let order = Order::Tx(tx);
let parse_duration = start.elapsed();
trace!(order = ?order.id(), parse_duration_mus = parse_duration.as_micros(), "Mempool transaction received with blobs");
add_txfetcher_time_to_query(parse_duration);

let orderpool_command = ReplaceableOrderPoolCommand::Order(Arc::new(order));
mark_command_received(&orderpool_command, received_at);
match results
.send_timeout(orderpool_command, config.results_channel_timeout)
.await
{
Ok(()) => {}
Err(SendTimeoutError::Timeout(_)) => {
error!("Failed to send txpool tx to results channel, timeout");
}
Err(SendTimeoutError::Closed(_)) => {
break;
let provider = rpc_provider.clone();
let results = results.clone();
let config_timeout = config.results_channel_timeout;

tokio::spawn(async move {
let _permit = permit; // held until task completes
let received_at = OffsetDateTime::now_utc();
let start = Instant::now();

let tx_with_blobs = match get_tx_with_blobs(tx_hash, provider.as_ref()).await {
Ok(Some(tx)) => tx,
Ok(None) => {
trace!(?tx_hash, "tx not found in tx pool");
return;
}
Err(err) => {
trace!(?tx_hash, ?err, "Failed to get tx from pool");
return;
}
};

let tx = MempoolTx::new(tx_with_blobs);
let order = Order::Tx(tx);
let parse_duration = start.elapsed();
trace!(order = ?order.id(), parse_duration_mus = parse_duration.as_micros(), "Mempool transaction received with blobs");
add_txfetcher_time_to_query(parse_duration);

let orderpool_command = ReplaceableOrderPoolCommand::Order(Arc::new(order));
mark_command_received(&orderpool_command, received_at);
match results
.send_timeout(orderpool_command, config_timeout)
.await
{
Ok(()) => {}
Err(SendTimeoutError::Timeout(_)) => {
warn!("Failed to send txpool tx to results channel, timeout");
}
Err(SendTimeoutError::Closed(_)) => {}
}
}
});
}

// stream is closed, cancelling token because builder can't work without this stream
// stream ended, cancelling token because builder can't work without this stream
global_cancel.cancel();
info!("Subscribe to txpool: finished");
});
Expand Down