Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion crates/rbuilder/src/backtest/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ where
Ok(ctx)
}

/// @Pending: change available_orders to some struct that allows to tell the difference between
/// mempool txs and private txs.
pub fn backtest_prepare_orders_from_building_context<P>(
ctx: BlockBuildingContext,
available_orders: Vec<OrdersWithTimestamp>,
Expand All @@ -100,7 +102,10 @@ where
.map(|order| Arc::clone(&order.order))
.collect();
for order in &orders {
ctx.mempool_tx_detector.add_tx(order);
if let Order::Tx(mempool_tx) = order.as_ref() {
ctx.mempool_tx_detector
.add_tx(mempool_tx.tx_with_blobs.hash());
}
}

let (sim_orders, sim_errors) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ use std::{

use alloy_primitives::{utils::format_ether, I256, U256};

use crate::{
building::{builders::block_building_helper::BlockBuildingHelper, ThreadBlockBuildingContext},
live_builder::order_input::mempool_txs_detector::MempoolTxsDetector,
use crate::building::{
builders::block_building_helper::BlockBuildingHelper, ThreadBlockBuildingContext,
};
use rbuilder_primitives::{order_statistics::OrderStatistics, OrderId, SimulatedOrder};

Expand Down Expand Up @@ -95,10 +94,13 @@ impl<'a> BlockBuildingHelperStatsLogger<'a> {

pub fn print(&self, orders: Vec<Arc<SimulatedOrder>>) {
let mut order_id_to_order = HashMap::new();
let mempool_txs_detector = MempoolTxsDetector::new();
let mempool_txs_detector = self
.block_building_helper
.building_context()
.mempool_tx_detector
.clone();
for sim_order in &orders {
order_id_to_order.insert(sim_order.id(), sim_order.clone());
mempool_txs_detector.add_tx(&sim_order.order);
}

println!(
Expand Down
9 changes: 4 additions & 5 deletions crates/rbuilder/src/building/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ impl BlockBuildingContext {
faster_finalize: bool,
mev_blocker_price: U256,
adjustment_fee_payers: ahash::HashSet<Address>,
mempool_tx_detector: Arc<MempoolTxsDetector>,
) -> Option<BlockBuildingContext> {
let attributes = EthPayloadBuilderAttributes::try_new(
attributes.data.parent_block_hash,
Expand Down Expand Up @@ -219,7 +220,7 @@ impl BlockBuildingContext {
shared_cached_reads: Default::default(),
tx_execution_cache: Arc::new(TxExecutionCache::new(evm_caching_enable)),
max_blob_gas_per_block,
mempool_tx_detector: Arc::new(MempoolTxsDetector::new()),
mempool_tx_detector,
faster_finalize,
mev_blocker_price,
adjustment_fee_payers,
Expand Down Expand Up @@ -1319,9 +1320,7 @@ mod test {
let detector = MempoolTxsDetector::new();
let mut data_gen = TestDataGenerator::default();
let tx1 = data_gen.create_tx_with_blobs_nonce(Default::default());
detector.add_tx(&Order::Tx(MempoolTx {
tx_with_blobs: tx1.clone(),
}));
detector.add_tx(tx1.hash());
let tx2 = data_gen.create_tx_with_blobs_nonce(Default::default());
let profit_1 = I256::unchecked_from(1000);
let profit_2 = I256::unchecked_from(10000);
Expand Down Expand Up @@ -1371,7 +1370,7 @@ mod test {
let order = Order::Tx(MempoolTx {
tx_with_blobs: tx.clone(),
});
detector.add_tx(&order);
detector.add_tx(tx.hash());
let profit = I256::unchecked_from(1000);
let order_ok = OrderOk {
coinbase_profit: profit.unsigned_abs(),
Expand Down
5 changes: 4 additions & 1 deletion crates/rbuilder/src/building/testing/test_chain_state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::{
building::BlockBuildingContext,
live_builder::block_list_provider::BlockList,
live_builder::{
block_list_provider::BlockList, order_input::mempool_txs_detector::MempoolTxsDetector,
},
provider::RootHasher,
roothash::RootHashContext,
utils::{RootHasherImpl, Signer},
Expand Down Expand Up @@ -420,6 +422,7 @@ impl TestBlockContextBuilder {
true,
self.mev_block_price,
Default::default(),
Arc::new(MempoolTxsDetector::new()),
)
.unwrap()
}
Expand Down
4 changes: 3 additions & 1 deletion crates/rbuilder/src/live_builder/base_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
order_flow_tracing::order_flow_tracer_manager::{
NullOrderFlowTracerManager, OrderFlowTracerManager, OrderFlowTracerManagerImpl,
},
order_input::OrderInputConfig,
order_input::{mempool_txs_detector::MempoolTxsDetector, OrderInputConfig},
process_killer::ProcessKiller,
LiveBuilder,
},
Expand Down Expand Up @@ -278,6 +278,8 @@ impl BaseConfig {
faster_finalize: self.faster_finalize,
order_flow_tracer_manager,
order_journal_observer_factory: Box::new(NullOrderJournalObserverFactory {}),

mempool_detector: Arc::new(MempoolTxsDetector::new()),
})
}

Expand Down
14 changes: 3 additions & 11 deletions crates/rbuilder/src/live_builder/building/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ where

/// Connects OrdersForBlock (source of orders) ->
/// [Optional] OrderFlowTracerManager provided tracer ->
/// ReplaceableOrderStreamSniffer (notifies mempool txs to MempoolTxsDetector) ->
/// BlobTypeOrderFilter (filters out Orders with incorrect blobs (pre/post fusaka)) ->
/// OrderReplacementManager (Handles cancellations and replacements) -> Simulations and calls start_building_job
pub fn start_block_building(
Expand Down Expand Up @@ -135,17 +134,10 @@ where
)))
};

let mempool_txs_detector_sniffer =
order_input::mempool_txs_detector::ReplaceableOrderStreamSniffer::new(
blob_type_order_filter,
block_ctx.mempool_tx_detector.clone(),
);

// order_flow_tracer_manager may add some extra ReplaceableOrderSink on the chain.
let (sim_tracer, order_flow_input) = self.order_flow_tracer_manager.create_tracers(
payload.slot_block_id(),
Box::new(mempool_txs_detector_sniffer),
);
let (sim_tracer, order_flow_input) = self
.order_flow_tracer_manager
.create_tracers(payload.slot_block_id(), blob_type_order_filter);

// sink removal is automatic via OrderSink::is_alive false
let _block_sub = self
Expand Down
16 changes: 13 additions & 3 deletions crates/rbuilder/src/live_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use building::BlockBuildingPool;
use eyre::Context;
use futures::{stream::FuturesUnordered, StreamExt};
use jsonrpsee::RpcModule;
use order_input::ReplaceableOrderPoolCommand;
use order_input::{mempool_txs_detector::MempoolTxsDetector, ReplaceableOrderPoolCommand};
use payload_events::{InternalPayloadId, MevBoostSlotDataGenerator};
use rbuilder_primitives::{MempoolTx, Order, TransactionSignedEcRecoveredWithBlobs};
use reth::transaction_pool::{
Expand Down Expand Up @@ -147,6 +147,8 @@ where

pub order_flow_tracer_manager: Box<dyn OrderFlowTracerManager>,
pub order_journal_observer_factory: Box<dyn OrderJournalObserverFactory + Send + Sync>,

pub mempool_detector: Arc<MempoolTxsDetector>,
}

impl<P> LiveBuilder<P>
Expand Down Expand Up @@ -230,6 +232,7 @@ where

let (header_sender, header_receiver) = mpsc::channel(CLEAN_TASKS_CHANNEL_SIZE);

let mempool_detector = self.mempool_detector.clone();
let orderpool_subscriber = {
let (handle, sub) = start_orderpool_jobs(
self.order_input_config,
Expand All @@ -239,6 +242,7 @@ where
self.orderpool_sender,
self.orderpool_receiver,
header_receiver,
mempool_detector.clone(),
)
.await?;
inner_jobs_handles.push(handle);
Expand Down Expand Up @@ -370,6 +374,7 @@ where
.iter()
.filter_map(|(_, r)| r.adjustment_fee_payer)
.collect(),
mempool_detector.clone(),
) {
mark_building_started(block_ctx.timestamp());
builder_pool.start_block_building(
Expand Down Expand Up @@ -401,13 +406,14 @@ where
T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
S: BlobStore,
{
let detector = self.mempool_detector.clone();
// Initialize the orderpool with every item in the reth pool.
for tx in pool
.all_transactions()
.pending_recovered()
.chain(pool.all_transactions().queued_recovered())
{
try_send_to_orderpool(tx, self.orderpool_sender.clone(), pool.clone()).await;
try_send_to_orderpool(tx, self.orderpool_sender.clone(), pool.clone(), &detector).await;
}

// Subscribe to new transactions in-process.
Expand All @@ -416,7 +422,7 @@ where
tokio::spawn(async move {
while let Some(e) = recv.recv().await {
let tx = e.transaction.transaction.transaction().clone();
try_send_to_orderpool(tx, orderpool_sender.clone(), pool.clone()).await;
try_send_to_orderpool(tx, orderpool_sender.clone(), pool.clone(), &detector).await;
}
});

Expand Down Expand Up @@ -537,16 +543,20 @@ async fn try_send_to_orderpool<V, T, S>(
tx: Recovered<TransactionSigned>,
orderpool_sender: mpsc::Sender<ReplaceableOrderPoolCommand>,
pool: Pool<V, T, S>,
mempool_detector: &Arc<MempoolTxsDetector>,
) where
V: TransactionValidator<Transaction = EthPooledTransaction> + 'static,
T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
S: BlobStore,
{
match TransactionSignedEcRecoveredWithBlobs::try_from_tx_without_blobs_and_pool(tx, pool) {
Ok(tx) => {
let tx_hash = tx.hash();
mempool_detector.add_tx(tx_hash);
let order = Order::Tx(MempoolTx::new(tx));
let command = ReplaceableOrderPoolCommand::Order(Arc::new(order));
if let Err(e) = orderpool_sender.send(command).await {
mempool_detector.remove_tx(tx_hash);
error!("Error sending order to orderpool: {:#}", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,44 +1,8 @@
use std::sync::Arc;

use ahash::RandomState;
use alloy_primitives::TxHash;
use dashmap::DashSet;

use rbuilder_primitives::{BundleReplacementData, Order, TransactionSignedEcRecoveredWithBlobs};

use super::replaceable_order_sink::ReplaceableOrderSink;

/// Get's in the middle of a ReplaceableOrder stream a feeds a MempoolTxsDetector.
#[derive(Debug)]
pub struct ReplaceableOrderStreamSniffer {
detector: Arc<MempoolTxsDetector>,
sink: Box<dyn ReplaceableOrderSink>,
}

impl ReplaceableOrderStreamSniffer {
pub fn new(sink: Box<dyn ReplaceableOrderSink>, detector: Arc<MempoolTxsDetector>) -> Self {
Self { detector, sink }
}

pub fn detector(&self) -> Arc<MempoolTxsDetector> {
self.detector.clone()
}
}

impl ReplaceableOrderSink for ReplaceableOrderStreamSniffer {
fn insert_order(&mut self, order: Arc<Order>) -> bool {
self.detector.add_tx(&order);
self.sink.insert_order(order)
}

fn remove_bundle(&mut self, replacement_data: BundleReplacementData) -> bool {
self.sink.remove_bundle(replacement_data)
}

fn is_alive(&self) -> bool {
self.sink.is_alive()
}
}
use rbuilder_primitives::TransactionSignedEcRecoveredWithBlobs;

/// Given a TransactionSignedEcRecoveredWithBlobs answers if the tx is from the mempool or not.
/// Current implementation is super simple, it just checks the tx hash against a set of hashes.
Expand All @@ -54,10 +18,12 @@ impl MempoolTxsDetector {
}
}

pub fn add_tx(&self, order: &Order) {
if let Order::Tx(mempool_tx) = order {
self.mempool_txs.insert(mempool_tx.tx_with_blobs.hash());
}
pub fn add_tx(&self, hash: TxHash) {
self.mempool_txs.insert(hash);
}

pub fn remove_tx(&self, hash: TxHash) {
self.mempool_txs.remove(&hash);
}

pub fn is_mempool(&self, tx: &TransactionSignedEcRecoveredWithBlobs) -> bool {
Expand Down
8 changes: 7 additions & 1 deletion crates/rbuilder/src/live_builder/order_input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ impl ReplaceableOrderPoolCommand {
/// - Clean up task to remove old stuff.
///
/// @Pending reengineering to modularize rpc, extra_rpc here is a patch to upgrade the created rpc server.
#[allow(clippy::too_many_arguments)]
pub async fn start_orderpool_jobs<P>(
config: OrderInputConfig,
provider_factory: P,
Expand All @@ -241,6 +242,7 @@ pub async fn start_orderpool_jobs<P>(
order_sender: mpsc::Sender<ReplaceableOrderPoolCommand>,
order_receiver: mpsc::Receiver<ReplaceableOrderPoolCommand>,
header_receiver: mpsc::Receiver<Header>,
mempool_detector: Arc<mempool_txs_detector::MempoolTxsDetector>,
) -> eyre::Result<(JoinHandle<()>, OrderPoolSubscriber)>
where
P: StateProviderFactory + 'static,
Expand All @@ -252,7 +254,10 @@ where
warn!("ignore_blobs is set to true, some order input is ignored");
}

let orderpool = Arc::new(Mutex::new(OrderPool::new(config.time_to_keep_mempool_txs)));
let orderpool = Arc::new(Mutex::new(OrderPool::new(
config.time_to_keep_mempool_txs,
mempool_detector.clone(),
)));
let subscriber = OrderPoolSubscriber {
orderpool: orderpool.clone(),
};
Expand All @@ -279,6 +284,7 @@ where
let txpool_fetcher = txpool_fetcher::subscribe_to_txpool_with_blobs(
config.clone(),
order_sender.clone(),
mempool_detector.clone(),
global_cancel.clone(),
)
.await?;
Expand Down
12 changes: 11 additions & 1 deletion crates/rbuilder/src/live_builder/order_input/orderpool.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::{
mempool_txs_detector::MempoolTxsDetector,
order_sink::{OrderPoolCommand, OrderSender2OrderSink},
replaceable_order_sink::ReplaceableOrderSink,
ReplaceableOrderPoolCommand,
Expand Down Expand Up @@ -94,11 +95,15 @@ pub struct OrderPool {
next_sink_id: u64,
/// After this time a mempool tx is dropped.
time_to_keep_mempool_txs: Duration,
mempool_detector: Arc<MempoolTxsDetector>,
}

impl OrderPool {
/// Instantiate a new OrderPool.
pub fn new(time_to_keep_mempool_txs: Duration) -> Self {
pub fn new(
time_to_keep_mempool_txs: Duration,
mempool_detector: Arc<MempoolTxsDetector>,
) -> Self {
OrderPool {
mempool_txs: Vec::new(),
bundles_by_target_block: HashMap::default(),
Expand All @@ -109,6 +114,7 @@ impl OrderPool {
bundle_cancellations: Default::default(),
time_to_keep_mempool_txs,
mempool_txs_size: 0,
mempool_detector,
}
}

Expand Down Expand Up @@ -266,6 +272,10 @@ impl OrderPool {
Self::must_retain_order(time, order, new_state, &self.time_to_keep_mempool_txs);
if !retain {
self.mempool_txs_size -= Self::measure_tx(order);
if let Order::Tx(mempool_tx) = order.as_ref() {
self.mempool_detector
.remove_tx(mempool_tx.tx_with_blobs.hash());
}
}
retain
});
Expand Down
Loading
Loading