diff --git a/crates/rbuilder/src/backtest/execute.rs b/crates/rbuilder/src/backtest/execute.rs index caf475ca7..4114e69ea 100644 --- a/crates/rbuilder/src/backtest/execute.rs +++ b/crates/rbuilder/src/backtest/execute.rs @@ -87,6 +87,8 @@ where Ok(ctx) } +/// @Pending: change available_orders to some struct that allows to tell the difference between +/// mempool txs and private txs. pub fn backtest_prepare_orders_from_building_context

( ctx: BlockBuildingContext, available_orders: Vec, @@ -100,7 +102,10 @@ where .map(|order| Arc::clone(&order.order)) .collect(); for order in &orders { - ctx.mempool_tx_detector.add_tx(order); + if let Order::Tx(mempool_tx) = order.as_ref() { + ctx.mempool_tx_detector + .add_tx(mempool_tx.tx_with_blobs.hash()); + } } let (sim_orders, sim_errors) = diff --git a/crates/rbuilder/src/building/builders/block_building_helper_stats_logger.rs b/crates/rbuilder/src/building/builders/block_building_helper_stats_logger.rs index 27875481a..c88ed212c 100644 --- a/crates/rbuilder/src/building/builders/block_building_helper_stats_logger.rs +++ b/crates/rbuilder/src/building/builders/block_building_helper_stats_logger.rs @@ -6,9 +6,8 @@ use std::{ use alloy_primitives::{utils::format_ether, I256, U256}; -use crate::{ - building::{builders::block_building_helper::BlockBuildingHelper, ThreadBlockBuildingContext}, - live_builder::order_input::mempool_txs_detector::MempoolTxsDetector, +use crate::building::{ + builders::block_building_helper::BlockBuildingHelper, ThreadBlockBuildingContext, }; use rbuilder_primitives::{order_statistics::OrderStatistics, OrderId, SimulatedOrder}; @@ -95,10 +94,13 @@ impl<'a> BlockBuildingHelperStatsLogger<'a> { pub fn print(&self, orders: Vec>) { let mut order_id_to_order = HashMap::new(); - let mempool_txs_detector = MempoolTxsDetector::new(); + let mempool_txs_detector = self + .block_building_helper + .building_context() + .mempool_tx_detector + .clone(); for sim_order in &orders { order_id_to_order.insert(sim_order.id(), sim_order.clone()); - mempool_txs_detector.add_tx(&sim_order.order); } println!( diff --git a/crates/rbuilder/src/building/mod.rs b/crates/rbuilder/src/building/mod.rs index 06be7a61e..74ebb555c 100644 --- a/crates/rbuilder/src/building/mod.rs +++ b/crates/rbuilder/src/building/mod.rs @@ -148,6 +148,7 @@ impl BlockBuildingContext { faster_finalize: bool, mev_blocker_price: U256, adjustment_fee_payers: ahash::HashSet

, + mempool_tx_detector: Arc, ) -> Option { let attributes = EthPayloadBuilderAttributes::try_new( attributes.data.parent_block_hash, @@ -219,7 +220,7 @@ impl BlockBuildingContext { shared_cached_reads: Default::default(), tx_execution_cache: Arc::new(TxExecutionCache::new(evm_caching_enable)), max_blob_gas_per_block, - mempool_tx_detector: Arc::new(MempoolTxsDetector::new()), + mempool_tx_detector, faster_finalize, mev_blocker_price, adjustment_fee_payers, @@ -1319,9 +1320,7 @@ mod test { let detector = MempoolTxsDetector::new(); let mut data_gen = TestDataGenerator::default(); let tx1 = data_gen.create_tx_with_blobs_nonce(Default::default()); - detector.add_tx(&Order::Tx(MempoolTx { - tx_with_blobs: tx1.clone(), - })); + detector.add_tx(tx1.hash()); let tx2 = data_gen.create_tx_with_blobs_nonce(Default::default()); let profit_1 = I256::unchecked_from(1000); let profit_2 = I256::unchecked_from(10000); @@ -1371,7 +1370,7 @@ mod test { let order = Order::Tx(MempoolTx { tx_with_blobs: tx.clone(), }); - detector.add_tx(&order); + detector.add_tx(tx.hash()); let profit = I256::unchecked_from(1000); let order_ok = OrderOk { coinbase_profit: profit.unsigned_abs(), diff --git a/crates/rbuilder/src/building/testing/test_chain_state.rs b/crates/rbuilder/src/building/testing/test_chain_state.rs index 3056e4753..11ac87655 100644 --- a/crates/rbuilder/src/building/testing/test_chain_state.rs +++ b/crates/rbuilder/src/building/testing/test_chain_state.rs @@ -1,6 +1,8 @@ use crate::{ building::BlockBuildingContext, - live_builder::block_list_provider::BlockList, + live_builder::{ + block_list_provider::BlockList, order_input::mempool_txs_detector::MempoolTxsDetector, + }, provider::RootHasher, roothash::RootHashContext, utils::{RootHasherImpl, Signer}, @@ -420,6 +422,7 @@ impl TestBlockContextBuilder { true, self.mev_block_price, Default::default(), + Arc::new(MempoolTxsDetector::new()), ) .unwrap() } diff --git a/crates/rbuilder/src/live_builder/base_config.rs b/crates/rbuilder/src/live_builder/base_config.rs index 6a9c7e341..2cc0a26c2 100644 --- a/crates/rbuilder/src/live_builder/base_config.rs +++ b/crates/rbuilder/src/live_builder/base_config.rs @@ -6,7 +6,7 @@ use crate::{ order_flow_tracing::order_flow_tracer_manager::{ NullOrderFlowTracerManager, OrderFlowTracerManager, OrderFlowTracerManagerImpl, }, - order_input::OrderInputConfig, + order_input::{mempool_txs_detector::MempoolTxsDetector, OrderInputConfig}, process_killer::ProcessKiller, LiveBuilder, }, @@ -278,6 +278,8 @@ impl BaseConfig { faster_finalize: self.faster_finalize, order_flow_tracer_manager, order_journal_observer_factory: Box::new(NullOrderJournalObserverFactory {}), + + mempool_detector: Arc::new(MempoolTxsDetector::new()), }) } diff --git a/crates/rbuilder/src/live_builder/building/mod.rs b/crates/rbuilder/src/live_builder/building/mod.rs index 4835a2df1..647aa09ed 100644 --- a/crates/rbuilder/src/live_builder/building/mod.rs +++ b/crates/rbuilder/src/live_builder/building/mod.rs @@ -86,7 +86,6 @@ where /// Connects OrdersForBlock (source of orders) -> /// [Optional] OrderFlowTracerManager provided tracer -> - /// ReplaceableOrderStreamSniffer (notifies mempool txs to MempoolTxsDetector) -> /// BlobTypeOrderFilter (filters out Orders with incorrect blobs (pre/post fusaka)) -> /// OrderReplacementManager (Handles cancellations and replacements) -> Simulations and calls start_building_job pub fn start_block_building( @@ -135,17 +134,10 @@ where ))) }; - let mempool_txs_detector_sniffer = - order_input::mempool_txs_detector::ReplaceableOrderStreamSniffer::new( - blob_type_order_filter, - block_ctx.mempool_tx_detector.clone(), - ); - // order_flow_tracer_manager may add some extra ReplaceableOrderSink on the chain. - let (sim_tracer, order_flow_input) = self.order_flow_tracer_manager.create_tracers( - payload.slot_block_id(), - Box::new(mempool_txs_detector_sniffer), - ); + let (sim_tracer, order_flow_input) = self + .order_flow_tracer_manager + .create_tracers(payload.slot_block_id(), blob_type_order_filter); // sink removal is automatic via OrderSink::is_alive false let _block_sub = self diff --git a/crates/rbuilder/src/live_builder/mod.rs b/crates/rbuilder/src/live_builder/mod.rs index 0466f670f..faa16711f 100644 --- a/crates/rbuilder/src/live_builder/mod.rs +++ b/crates/rbuilder/src/live_builder/mod.rs @@ -38,7 +38,7 @@ use building::BlockBuildingPool; use eyre::Context; use futures::{stream::FuturesUnordered, StreamExt}; use jsonrpsee::RpcModule; -use order_input::ReplaceableOrderPoolCommand; +use order_input::{mempool_txs_detector::MempoolTxsDetector, ReplaceableOrderPoolCommand}; use payload_events::{InternalPayloadId, MevBoostSlotDataGenerator}; use rbuilder_primitives::{MempoolTx, Order, TransactionSignedEcRecoveredWithBlobs}; use reth::transaction_pool::{ @@ -147,6 +147,8 @@ where pub order_flow_tracer_manager: Box, pub order_journal_observer_factory: Box, + + pub mempool_detector: Arc, } impl

LiveBuilder

@@ -230,6 +232,7 @@ where let (header_sender, header_receiver) = mpsc::channel(CLEAN_TASKS_CHANNEL_SIZE); + let mempool_detector = self.mempool_detector.clone(); let orderpool_subscriber = { let (handle, sub) = start_orderpool_jobs( self.order_input_config, @@ -239,6 +242,7 @@ where self.orderpool_sender, self.orderpool_receiver, header_receiver, + mempool_detector.clone(), ) .await?; inner_jobs_handles.push(handle); @@ -370,6 +374,7 @@ where .iter() .filter_map(|(_, r)| r.adjustment_fee_payer) .collect(), + mempool_detector.clone(), ) { mark_building_started(block_ctx.timestamp()); builder_pool.start_block_building( @@ -401,13 +406,14 @@ where T: TransactionOrdering::Transaction>, S: BlobStore, { + let detector = self.mempool_detector.clone(); // Initialize the orderpool with every item in the reth pool. for tx in pool .all_transactions() .pending_recovered() .chain(pool.all_transactions().queued_recovered()) { - try_send_to_orderpool(tx, self.orderpool_sender.clone(), pool.clone()).await; + try_send_to_orderpool(tx, self.orderpool_sender.clone(), pool.clone(), &detector).await; } // Subscribe to new transactions in-process. @@ -416,7 +422,7 @@ where tokio::spawn(async move { while let Some(e) = recv.recv().await { let tx = e.transaction.transaction.transaction().clone(); - try_send_to_orderpool(tx, orderpool_sender.clone(), pool.clone()).await; + try_send_to_orderpool(tx, orderpool_sender.clone(), pool.clone(), &detector).await; } }); @@ -537,6 +543,7 @@ async fn try_send_to_orderpool( tx: Recovered, orderpool_sender: mpsc::Sender, pool: Pool, + mempool_detector: &Arc, ) where V: TransactionValidator + 'static, T: TransactionOrdering::Transaction>, @@ -544,9 +551,12 @@ async fn try_send_to_orderpool( { match TransactionSignedEcRecoveredWithBlobs::try_from_tx_without_blobs_and_pool(tx, pool) { Ok(tx) => { + let tx_hash = tx.hash(); + mempool_detector.add_tx(tx_hash); let order = Order::Tx(MempoolTx::new(tx)); let command = ReplaceableOrderPoolCommand::Order(Arc::new(order)); if let Err(e) = orderpool_sender.send(command).await { + mempool_detector.remove_tx(tx_hash); error!("Error sending order to orderpool: {:#}", e); } } diff --git a/crates/rbuilder/src/live_builder/order_input/mempool_txs_detector.rs b/crates/rbuilder/src/live_builder/order_input/mempool_txs_detector.rs index c47f2c4a2..c6f8149eb 100644 --- a/crates/rbuilder/src/live_builder/order_input/mempool_txs_detector.rs +++ b/crates/rbuilder/src/live_builder/order_input/mempool_txs_detector.rs @@ -1,44 +1,8 @@ -use std::sync::Arc; - use ahash::RandomState; use alloy_primitives::TxHash; use dashmap::DashSet; -use rbuilder_primitives::{BundleReplacementData, Order, TransactionSignedEcRecoveredWithBlobs}; - -use super::replaceable_order_sink::ReplaceableOrderSink; - -/// Get's in the middle of a ReplaceableOrder stream a feeds a MempoolTxsDetector. -#[derive(Debug)] -pub struct ReplaceableOrderStreamSniffer { - detector: Arc, - sink: Box, -} - -impl ReplaceableOrderStreamSniffer { - pub fn new(sink: Box, detector: Arc) -> Self { - Self { detector, sink } - } - - pub fn detector(&self) -> Arc { - self.detector.clone() - } -} - -impl ReplaceableOrderSink for ReplaceableOrderStreamSniffer { - fn insert_order(&mut self, order: Arc) -> bool { - self.detector.add_tx(&order); - self.sink.insert_order(order) - } - - fn remove_bundle(&mut self, replacement_data: BundleReplacementData) -> bool { - self.sink.remove_bundle(replacement_data) - } - - fn is_alive(&self) -> bool { - self.sink.is_alive() - } -} +use rbuilder_primitives::TransactionSignedEcRecoveredWithBlobs; /// Given a TransactionSignedEcRecoveredWithBlobs answers if the tx is from the mempool or not. /// Current implementation is super simple, it just checks the tx hash against a set of hashes. @@ -54,10 +18,12 @@ impl MempoolTxsDetector { } } - pub fn add_tx(&self, order: &Order) { - if let Order::Tx(mempool_tx) = order { - self.mempool_txs.insert(mempool_tx.tx_with_blobs.hash()); - } + pub fn add_tx(&self, hash: TxHash) { + self.mempool_txs.insert(hash); + } + + pub fn remove_tx(&self, hash: TxHash) { + self.mempool_txs.remove(&hash); } pub fn is_mempool(&self, tx: &TransactionSignedEcRecoveredWithBlobs) -> bool { diff --git a/crates/rbuilder/src/live_builder/order_input/mod.rs b/crates/rbuilder/src/live_builder/order_input/mod.rs index 2651f02cd..2e75ca451 100644 --- a/crates/rbuilder/src/live_builder/order_input/mod.rs +++ b/crates/rbuilder/src/live_builder/order_input/mod.rs @@ -233,6 +233,7 @@ impl ReplaceableOrderPoolCommand { /// - Clean up task to remove old stuff. /// /// @Pending reengineering to modularize rpc, extra_rpc here is a patch to upgrade the created rpc server. +#[allow(clippy::too_many_arguments)] pub async fn start_orderpool_jobs

( config: OrderInputConfig, provider_factory: P, @@ -241,6 +242,7 @@ pub async fn start_orderpool_jobs

( order_sender: mpsc::Sender, order_receiver: mpsc::Receiver, header_receiver: mpsc::Receiver

, + mempool_detector: Arc, ) -> eyre::Result<(JoinHandle<()>, OrderPoolSubscriber)> where P: StateProviderFactory + 'static, @@ -252,7 +254,10 @@ where warn!("ignore_blobs is set to true, some order input is ignored"); } - let orderpool = Arc::new(Mutex::new(OrderPool::new(config.time_to_keep_mempool_txs))); + let orderpool = Arc::new(Mutex::new(OrderPool::new( + config.time_to_keep_mempool_txs, + mempool_detector.clone(), + ))); let subscriber = OrderPoolSubscriber { orderpool: orderpool.clone(), }; @@ -279,6 +284,7 @@ where let txpool_fetcher = txpool_fetcher::subscribe_to_txpool_with_blobs( config.clone(), order_sender.clone(), + mempool_detector.clone(), global_cancel.clone(), ) .await?; diff --git a/crates/rbuilder/src/live_builder/order_input/orderpool.rs b/crates/rbuilder/src/live_builder/order_input/orderpool.rs index a3360d383..eb268b330 100644 --- a/crates/rbuilder/src/live_builder/order_input/orderpool.rs +++ b/crates/rbuilder/src/live_builder/order_input/orderpool.rs @@ -1,4 +1,5 @@ use super::{ + mempool_txs_detector::MempoolTxsDetector, order_sink::{OrderPoolCommand, OrderSender2OrderSink}, replaceable_order_sink::ReplaceableOrderSink, ReplaceableOrderPoolCommand, @@ -94,11 +95,15 @@ pub struct OrderPool { next_sink_id: u64, /// After this time a mempool tx is dropped. time_to_keep_mempool_txs: Duration, + mempool_detector: Arc, } impl OrderPool { /// Instantiate a new OrderPool. - pub fn new(time_to_keep_mempool_txs: Duration) -> Self { + pub fn new( + time_to_keep_mempool_txs: Duration, + mempool_detector: Arc, + ) -> Self { OrderPool { mempool_txs: Vec::new(), bundles_by_target_block: HashMap::default(), @@ -109,6 +114,7 @@ impl OrderPool { bundle_cancellations: Default::default(), time_to_keep_mempool_txs, mempool_txs_size: 0, + mempool_detector, } } @@ -266,6 +272,10 @@ impl OrderPool { Self::must_retain_order(time, order, new_state, &self.time_to_keep_mempool_txs); if !retain { self.mempool_txs_size -= Self::measure_tx(order); + if let Order::Tx(mempool_tx) = order.as_ref() { + self.mempool_detector + .remove_tx(mempool_tx.tx_with_blobs.hash()); + } } retain }); diff --git a/crates/rbuilder/src/live_builder/order_input/txpool_fetcher.rs b/crates/rbuilder/src/live_builder/order_input/txpool_fetcher.rs index d3b794fed..3c47a2e8f 100644 --- a/crates/rbuilder/src/live_builder/order_input/txpool_fetcher.rs +++ b/crates/rbuilder/src/live_builder/order_input/txpool_fetcher.rs @@ -23,6 +23,7 @@ use tracing::{error, info, trace}; pub async fn subscribe_to_txpool_with_blobs( config: OrderInputConfig, results: mpsc::Sender, + mempool_detector: Arc, global_cancel: CancellationToken, ) -> eyre::Result> { let mempool = config @@ -69,7 +70,6 @@ pub async fn subscribe_to_txpool_with_blobs( continue; } }; - let tx = MempoolTx::new(tx_with_blobs); let order = Order::Tx(tx); let parse_duration = start.elapsed(); @@ -82,7 +82,9 @@ pub async fn subscribe_to_txpool_with_blobs( .send_timeout(orderpool_command, config.results_channel_timeout) .await { - Ok(()) => {} + Ok(()) => { + mempool_detector.add_tx(tx_hash); + } Err(SendTimeoutError::Timeout(_)) => { error!("Failed to send txpool tx to results channel, timeout"); } @@ -140,6 +142,9 @@ mod test { ..OrderInputConfig::default_e2e() }, sender, + Arc::new( + crate::live_builder::order_input::mempool_txs_detector::MempoolTxsDetector::new(), + ), CancellationToken::new(), ) .await