diff --git a/crates/rbuilder/src/backtest/execute.rs b/crates/rbuilder/src/backtest/execute.rs
index caf475ca7..4114e69ea 100644
--- a/crates/rbuilder/src/backtest/execute.rs
+++ b/crates/rbuilder/src/backtest/execute.rs
@@ -87,6 +87,8 @@ where
Ok(ctx)
}
+/// @Pending: change available_orders to some struct that allows to tell the difference between
+/// mempool txs and private txs.
pub fn backtest_prepare_orders_from_building_context
(
ctx: BlockBuildingContext,
available_orders: Vec,
@@ -100,7 +102,10 @@ where
.map(|order| Arc::clone(&order.order))
.collect();
for order in &orders {
- ctx.mempool_tx_detector.add_tx(order);
+ if let Order::Tx(mempool_tx) = order.as_ref() {
+ ctx.mempool_tx_detector
+ .add_tx(mempool_tx.tx_with_blobs.hash());
+ }
}
let (sim_orders, sim_errors) =
diff --git a/crates/rbuilder/src/building/builders/block_building_helper_stats_logger.rs b/crates/rbuilder/src/building/builders/block_building_helper_stats_logger.rs
index 27875481a..c88ed212c 100644
--- a/crates/rbuilder/src/building/builders/block_building_helper_stats_logger.rs
+++ b/crates/rbuilder/src/building/builders/block_building_helper_stats_logger.rs
@@ -6,9 +6,8 @@ use std::{
use alloy_primitives::{utils::format_ether, I256, U256};
-use crate::{
- building::{builders::block_building_helper::BlockBuildingHelper, ThreadBlockBuildingContext},
- live_builder::order_input::mempool_txs_detector::MempoolTxsDetector,
+use crate::building::{
+ builders::block_building_helper::BlockBuildingHelper, ThreadBlockBuildingContext,
};
use rbuilder_primitives::{order_statistics::OrderStatistics, OrderId, SimulatedOrder};
@@ -95,10 +94,13 @@ impl<'a> BlockBuildingHelperStatsLogger<'a> {
pub fn print(&self, orders: Vec>) {
let mut order_id_to_order = HashMap::new();
- let mempool_txs_detector = MempoolTxsDetector::new();
+ let mempool_txs_detector = self
+ .block_building_helper
+ .building_context()
+ .mempool_tx_detector
+ .clone();
for sim_order in &orders {
order_id_to_order.insert(sim_order.id(), sim_order.clone());
- mempool_txs_detector.add_tx(&sim_order.order);
}
println!(
diff --git a/crates/rbuilder/src/building/mod.rs b/crates/rbuilder/src/building/mod.rs
index 06be7a61e..74ebb555c 100644
--- a/crates/rbuilder/src/building/mod.rs
+++ b/crates/rbuilder/src/building/mod.rs
@@ -148,6 +148,7 @@ impl BlockBuildingContext {
faster_finalize: bool,
mev_blocker_price: U256,
adjustment_fee_payers: ahash::HashSet,
+ mempool_tx_detector: Arc,
) -> Option {
let attributes = EthPayloadBuilderAttributes::try_new(
attributes.data.parent_block_hash,
@@ -219,7 +220,7 @@ impl BlockBuildingContext {
shared_cached_reads: Default::default(),
tx_execution_cache: Arc::new(TxExecutionCache::new(evm_caching_enable)),
max_blob_gas_per_block,
- mempool_tx_detector: Arc::new(MempoolTxsDetector::new()),
+ mempool_tx_detector,
faster_finalize,
mev_blocker_price,
adjustment_fee_payers,
@@ -1319,9 +1320,7 @@ mod test {
let detector = MempoolTxsDetector::new();
let mut data_gen = TestDataGenerator::default();
let tx1 = data_gen.create_tx_with_blobs_nonce(Default::default());
- detector.add_tx(&Order::Tx(MempoolTx {
- tx_with_blobs: tx1.clone(),
- }));
+ detector.add_tx(tx1.hash());
let tx2 = data_gen.create_tx_with_blobs_nonce(Default::default());
let profit_1 = I256::unchecked_from(1000);
let profit_2 = I256::unchecked_from(10000);
@@ -1371,7 +1370,7 @@ mod test {
let order = Order::Tx(MempoolTx {
tx_with_blobs: tx.clone(),
});
- detector.add_tx(&order);
+ detector.add_tx(tx.hash());
let profit = I256::unchecked_from(1000);
let order_ok = OrderOk {
coinbase_profit: profit.unsigned_abs(),
diff --git a/crates/rbuilder/src/building/testing/test_chain_state.rs b/crates/rbuilder/src/building/testing/test_chain_state.rs
index 3056e4753..11ac87655 100644
--- a/crates/rbuilder/src/building/testing/test_chain_state.rs
+++ b/crates/rbuilder/src/building/testing/test_chain_state.rs
@@ -1,6 +1,8 @@
use crate::{
building::BlockBuildingContext,
- live_builder::block_list_provider::BlockList,
+ live_builder::{
+ block_list_provider::BlockList, order_input::mempool_txs_detector::MempoolTxsDetector,
+ },
provider::RootHasher,
roothash::RootHashContext,
utils::{RootHasherImpl, Signer},
@@ -420,6 +422,7 @@ impl TestBlockContextBuilder {
true,
self.mev_block_price,
Default::default(),
+ Arc::new(MempoolTxsDetector::new()),
)
.unwrap()
}
diff --git a/crates/rbuilder/src/live_builder/base_config.rs b/crates/rbuilder/src/live_builder/base_config.rs
index 6a9c7e341..2cc0a26c2 100644
--- a/crates/rbuilder/src/live_builder/base_config.rs
+++ b/crates/rbuilder/src/live_builder/base_config.rs
@@ -6,7 +6,7 @@ use crate::{
order_flow_tracing::order_flow_tracer_manager::{
NullOrderFlowTracerManager, OrderFlowTracerManager, OrderFlowTracerManagerImpl,
},
- order_input::OrderInputConfig,
+ order_input::{mempool_txs_detector::MempoolTxsDetector, OrderInputConfig},
process_killer::ProcessKiller,
LiveBuilder,
},
@@ -278,6 +278,8 @@ impl BaseConfig {
faster_finalize: self.faster_finalize,
order_flow_tracer_manager,
order_journal_observer_factory: Box::new(NullOrderJournalObserverFactory {}),
+
+ mempool_detector: Arc::new(MempoolTxsDetector::new()),
})
}
diff --git a/crates/rbuilder/src/live_builder/building/mod.rs b/crates/rbuilder/src/live_builder/building/mod.rs
index 4835a2df1..647aa09ed 100644
--- a/crates/rbuilder/src/live_builder/building/mod.rs
+++ b/crates/rbuilder/src/live_builder/building/mod.rs
@@ -86,7 +86,6 @@ where
/// Connects OrdersForBlock (source of orders) ->
/// [Optional] OrderFlowTracerManager provided tracer ->
- /// ReplaceableOrderStreamSniffer (notifies mempool txs to MempoolTxsDetector) ->
/// BlobTypeOrderFilter (filters out Orders with incorrect blobs (pre/post fusaka)) ->
/// OrderReplacementManager (Handles cancellations and replacements) -> Simulations and calls start_building_job
pub fn start_block_building(
@@ -135,17 +134,10 @@ where
)))
};
- let mempool_txs_detector_sniffer =
- order_input::mempool_txs_detector::ReplaceableOrderStreamSniffer::new(
- blob_type_order_filter,
- block_ctx.mempool_tx_detector.clone(),
- );
-
// order_flow_tracer_manager may add some extra ReplaceableOrderSink on the chain.
- let (sim_tracer, order_flow_input) = self.order_flow_tracer_manager.create_tracers(
- payload.slot_block_id(),
- Box::new(mempool_txs_detector_sniffer),
- );
+ let (sim_tracer, order_flow_input) = self
+ .order_flow_tracer_manager
+ .create_tracers(payload.slot_block_id(), blob_type_order_filter);
// sink removal is automatic via OrderSink::is_alive false
let _block_sub = self
diff --git a/crates/rbuilder/src/live_builder/mod.rs b/crates/rbuilder/src/live_builder/mod.rs
index 0466f670f..faa16711f 100644
--- a/crates/rbuilder/src/live_builder/mod.rs
+++ b/crates/rbuilder/src/live_builder/mod.rs
@@ -38,7 +38,7 @@ use building::BlockBuildingPool;
use eyre::Context;
use futures::{stream::FuturesUnordered, StreamExt};
use jsonrpsee::RpcModule;
-use order_input::ReplaceableOrderPoolCommand;
+use order_input::{mempool_txs_detector::MempoolTxsDetector, ReplaceableOrderPoolCommand};
use payload_events::{InternalPayloadId, MevBoostSlotDataGenerator};
use rbuilder_primitives::{MempoolTx, Order, TransactionSignedEcRecoveredWithBlobs};
use reth::transaction_pool::{
@@ -147,6 +147,8 @@ where
pub order_flow_tracer_manager: Box,
pub order_journal_observer_factory: Box,
+
+ pub mempool_detector: Arc,
}
impl LiveBuilder
@@ -230,6 +232,7 @@ where
let (header_sender, header_receiver) = mpsc::channel(CLEAN_TASKS_CHANNEL_SIZE);
+ let mempool_detector = self.mempool_detector.clone();
let orderpool_subscriber = {
let (handle, sub) = start_orderpool_jobs(
self.order_input_config,
@@ -239,6 +242,7 @@ where
self.orderpool_sender,
self.orderpool_receiver,
header_receiver,
+ mempool_detector.clone(),
)
.await?;
inner_jobs_handles.push(handle);
@@ -370,6 +374,7 @@ where
.iter()
.filter_map(|(_, r)| r.adjustment_fee_payer)
.collect(),
+ mempool_detector.clone(),
) {
mark_building_started(block_ctx.timestamp());
builder_pool.start_block_building(
@@ -401,13 +406,14 @@ where
T: TransactionOrdering::Transaction>,
S: BlobStore,
{
+ let detector = self.mempool_detector.clone();
// Initialize the orderpool with every item in the reth pool.
for tx in pool
.all_transactions()
.pending_recovered()
.chain(pool.all_transactions().queued_recovered())
{
- try_send_to_orderpool(tx, self.orderpool_sender.clone(), pool.clone()).await;
+ try_send_to_orderpool(tx, self.orderpool_sender.clone(), pool.clone(), &detector).await;
}
// Subscribe to new transactions in-process.
@@ -416,7 +422,7 @@ where
tokio::spawn(async move {
while let Some(e) = recv.recv().await {
let tx = e.transaction.transaction.transaction().clone();
- try_send_to_orderpool(tx, orderpool_sender.clone(), pool.clone()).await;
+ try_send_to_orderpool(tx, orderpool_sender.clone(), pool.clone(), &detector).await;
}
});
@@ -537,6 +543,7 @@ async fn try_send_to_orderpool(
tx: Recovered,
orderpool_sender: mpsc::Sender,
pool: Pool,
+ mempool_detector: &Arc,
) where
V: TransactionValidator + 'static,
T: TransactionOrdering::Transaction>,
@@ -544,9 +551,12 @@ async fn try_send_to_orderpool(
{
match TransactionSignedEcRecoveredWithBlobs::try_from_tx_without_blobs_and_pool(tx, pool) {
Ok(tx) => {
+ let tx_hash = tx.hash();
+ mempool_detector.add_tx(tx_hash);
let order = Order::Tx(MempoolTx::new(tx));
let command = ReplaceableOrderPoolCommand::Order(Arc::new(order));
if let Err(e) = orderpool_sender.send(command).await {
+ mempool_detector.remove_tx(tx_hash);
error!("Error sending order to orderpool: {:#}", e);
}
}
diff --git a/crates/rbuilder/src/live_builder/order_input/mempool_txs_detector.rs b/crates/rbuilder/src/live_builder/order_input/mempool_txs_detector.rs
index c47f2c4a2..c6f8149eb 100644
--- a/crates/rbuilder/src/live_builder/order_input/mempool_txs_detector.rs
+++ b/crates/rbuilder/src/live_builder/order_input/mempool_txs_detector.rs
@@ -1,44 +1,8 @@
-use std::sync::Arc;
-
use ahash::RandomState;
use alloy_primitives::TxHash;
use dashmap::DashSet;
-use rbuilder_primitives::{BundleReplacementData, Order, TransactionSignedEcRecoveredWithBlobs};
-
-use super::replaceable_order_sink::ReplaceableOrderSink;
-
-/// Get's in the middle of a ReplaceableOrder stream a feeds a MempoolTxsDetector.
-#[derive(Debug)]
-pub struct ReplaceableOrderStreamSniffer {
- detector: Arc,
- sink: Box,
-}
-
-impl ReplaceableOrderStreamSniffer {
- pub fn new(sink: Box, detector: Arc) -> Self {
- Self { detector, sink }
- }
-
- pub fn detector(&self) -> Arc {
- self.detector.clone()
- }
-}
-
-impl ReplaceableOrderSink for ReplaceableOrderStreamSniffer {
- fn insert_order(&mut self, order: Arc) -> bool {
- self.detector.add_tx(&order);
- self.sink.insert_order(order)
- }
-
- fn remove_bundle(&mut self, replacement_data: BundleReplacementData) -> bool {
- self.sink.remove_bundle(replacement_data)
- }
-
- fn is_alive(&self) -> bool {
- self.sink.is_alive()
- }
-}
+use rbuilder_primitives::TransactionSignedEcRecoveredWithBlobs;
/// Given a TransactionSignedEcRecoveredWithBlobs answers if the tx is from the mempool or not.
/// Current implementation is super simple, it just checks the tx hash against a set of hashes.
@@ -54,10 +18,12 @@ impl MempoolTxsDetector {
}
}
- pub fn add_tx(&self, order: &Order) {
- if let Order::Tx(mempool_tx) = order {
- self.mempool_txs.insert(mempool_tx.tx_with_blobs.hash());
- }
+ pub fn add_tx(&self, hash: TxHash) {
+ self.mempool_txs.insert(hash);
+ }
+
+ pub fn remove_tx(&self, hash: TxHash) {
+ self.mempool_txs.remove(&hash);
}
pub fn is_mempool(&self, tx: &TransactionSignedEcRecoveredWithBlobs) -> bool {
diff --git a/crates/rbuilder/src/live_builder/order_input/mod.rs b/crates/rbuilder/src/live_builder/order_input/mod.rs
index 2651f02cd..2e75ca451 100644
--- a/crates/rbuilder/src/live_builder/order_input/mod.rs
+++ b/crates/rbuilder/src/live_builder/order_input/mod.rs
@@ -233,6 +233,7 @@ impl ReplaceableOrderPoolCommand {
/// - Clean up task to remove old stuff.
///
/// @Pending reengineering to modularize rpc, extra_rpc here is a patch to upgrade the created rpc server.
+#[allow(clippy::too_many_arguments)]
pub async fn start_orderpool_jobs(
config: OrderInputConfig,
provider_factory: P,
@@ -241,6 +242,7 @@ pub async fn start_orderpool_jobs
(
order_sender: mpsc::Sender,
order_receiver: mpsc::Receiver,
header_receiver: mpsc::Receiver,
+ mempool_detector: Arc,
) -> eyre::Result<(JoinHandle<()>, OrderPoolSubscriber)>
where
P: StateProviderFactory + 'static,
@@ -252,7 +254,10 @@ where
warn!("ignore_blobs is set to true, some order input is ignored");
}
- let orderpool = Arc::new(Mutex::new(OrderPool::new(config.time_to_keep_mempool_txs)));
+ let orderpool = Arc::new(Mutex::new(OrderPool::new(
+ config.time_to_keep_mempool_txs,
+ mempool_detector.clone(),
+ )));
let subscriber = OrderPoolSubscriber {
orderpool: orderpool.clone(),
};
@@ -279,6 +284,7 @@ where
let txpool_fetcher = txpool_fetcher::subscribe_to_txpool_with_blobs(
config.clone(),
order_sender.clone(),
+ mempool_detector.clone(),
global_cancel.clone(),
)
.await?;
diff --git a/crates/rbuilder/src/live_builder/order_input/orderpool.rs b/crates/rbuilder/src/live_builder/order_input/orderpool.rs
index a3360d383..eb268b330 100644
--- a/crates/rbuilder/src/live_builder/order_input/orderpool.rs
+++ b/crates/rbuilder/src/live_builder/order_input/orderpool.rs
@@ -1,4 +1,5 @@
use super::{
+ mempool_txs_detector::MempoolTxsDetector,
order_sink::{OrderPoolCommand, OrderSender2OrderSink},
replaceable_order_sink::ReplaceableOrderSink,
ReplaceableOrderPoolCommand,
@@ -94,11 +95,15 @@ pub struct OrderPool {
next_sink_id: u64,
/// After this time a mempool tx is dropped.
time_to_keep_mempool_txs: Duration,
+ mempool_detector: Arc,
}
impl OrderPool {
/// Instantiate a new OrderPool.
- pub fn new(time_to_keep_mempool_txs: Duration) -> Self {
+ pub fn new(
+ time_to_keep_mempool_txs: Duration,
+ mempool_detector: Arc,
+ ) -> Self {
OrderPool {
mempool_txs: Vec::new(),
bundles_by_target_block: HashMap::default(),
@@ -109,6 +114,7 @@ impl OrderPool {
bundle_cancellations: Default::default(),
time_to_keep_mempool_txs,
mempool_txs_size: 0,
+ mempool_detector,
}
}
@@ -266,6 +272,10 @@ impl OrderPool {
Self::must_retain_order(time, order, new_state, &self.time_to_keep_mempool_txs);
if !retain {
self.mempool_txs_size -= Self::measure_tx(order);
+ if let Order::Tx(mempool_tx) = order.as_ref() {
+ self.mempool_detector
+ .remove_tx(mempool_tx.tx_with_blobs.hash());
+ }
}
retain
});
diff --git a/crates/rbuilder/src/live_builder/order_input/txpool_fetcher.rs b/crates/rbuilder/src/live_builder/order_input/txpool_fetcher.rs
index d3b794fed..3c47a2e8f 100644
--- a/crates/rbuilder/src/live_builder/order_input/txpool_fetcher.rs
+++ b/crates/rbuilder/src/live_builder/order_input/txpool_fetcher.rs
@@ -23,6 +23,7 @@ use tracing::{error, info, trace};
pub async fn subscribe_to_txpool_with_blobs(
config: OrderInputConfig,
results: mpsc::Sender,
+ mempool_detector: Arc,
global_cancel: CancellationToken,
) -> eyre::Result> {
let mempool = config
@@ -69,7 +70,6 @@ pub async fn subscribe_to_txpool_with_blobs(
continue;
}
};
-
let tx = MempoolTx::new(tx_with_blobs);
let order = Order::Tx(tx);
let parse_duration = start.elapsed();
@@ -82,7 +82,9 @@ pub async fn subscribe_to_txpool_with_blobs(
.send_timeout(orderpool_command, config.results_channel_timeout)
.await
{
- Ok(()) => {}
+ Ok(()) => {
+ mempool_detector.add_tx(tx_hash);
+ }
Err(SendTimeoutError::Timeout(_)) => {
error!("Failed to send txpool tx to results channel, timeout");
}
@@ -140,6 +142,9 @@ mod test {
..OrderInputConfig::default_e2e()
},
sender,
+ Arc::new(
+ crate::live_builder::order_input::mempool_txs_detector::MempoolTxsDetector::new(),
+ ),
CancellationToken::new(),
)
.await