Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,125 changes: 975 additions & 150 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,6 @@ opentelemetry = { version = "0.31", features = ["trace"] }

# Base Path
concurrent-queue = "2.5.0"
tips-core = { git = "https://github.com/base/tips", rev = "c08eaa4fe10c26de8911609b41ddab4918698325", default-features = false }
tips-core = { git = "https://github.com/base/tips", rev = "d571797179fd0cc9e932373ba9a4eb5c2be0c0c9", default-features = false }
tips-audit = { git = "https://github.com/base/tips", rev = "d571797179fd0cc9e932373ba9a4eb5c2be0c0c9" }
rdkafka = { version = "0.37" }
2 changes: 2 additions & 0 deletions crates/op-rbuilder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ reqwest = "0.12.23"
k256 = "0.13.4"

rollup-boost.workspace = true
tips-audit.workspace = true
rdkafka.workspace = true

nanoid = { version = "0.4", optional = true }
reth-ipc = { workspace = true, optional = true }
Expand Down
19 changes: 19 additions & 0 deletions crates/op-rbuilder/src/args/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,25 @@ pub struct OpRbuilderArgs {
)]
pub resource_metering_buffer_size: usize,

/// Buffer size for backrun bundles (LRU eviction when full)
#[arg(long = "builder.backrun-bundle-buffer-size", default_value = "10000")]
pub backrun_bundle_buffer_size: usize,

/// Path to Kafka properties file for audit events (enables Kafka audit if set)
#[arg(
long = "builder.audit-kafka-properties",
env = "AUDIT_KAFKA_PROPERTIES_FILE"
)]
pub audit_kafka_properties: Option<String>,

/// Kafka topic for audit events
#[arg(
long = "builder.audit-kafka-topic",
env = "AUDIT_KAFKA_TOPIC",
default_value = "tips-audit"
)]
pub audit_kafka_topic: String,

/// Path to builder playgorund to automatically start up the node connected to it
#[arg(
long = "builder.playground",
Expand Down
122 changes: 119 additions & 3 deletions crates/op-rbuilder/src/builders/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ use reth_revm::{State, context::Block};
use reth_transaction_pool::{BestTransactionsAttributes, PoolTransaction};
use revm::{DatabaseCommit, context::result::ResultAndState, interpreter::as_u64_saturated};
use std::{sync::Arc, time::Instant};
use tips_audit::BundleEvent;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, trace};
use tracing::{debug, info, trace, warn};

use crate::{
bundles::AuditSender,
gas_limiter::AddressGasLimiter,
metrics::OpRBuilderMetrics,
primitives::reth::{ExecutionInfo, TxnExecutionResult},
Expand Down Expand Up @@ -80,6 +82,10 @@ pub struct OpPayloadBuilderCtx<ExtraCtx: Debug + Default = ()> {
pub address_gas_limiter: AddressGasLimiter,
/// Per transaction resource metering information
pub resource_metering: ResourceMetering,
/// Backrun bundle store for storing backrun transactions
pub backrun_bundle_store: crate::bundles::BackrunBundleStore,
/// Audit event channel for backrun bundle lifecycle tracking
pub audit_tx: Option<AuditSender>,
}

impl<ExtraCtx: Debug + Default> OpPayloadBuilderCtx<ExtraCtx> {
Expand All @@ -91,6 +97,19 @@ impl<ExtraCtx: Debug + Default> OpPayloadBuilderCtx<ExtraCtx> {
Self { extra_ctx, ..self }
}

/// Send an audit event if the audit channel is configured
fn send_audit_event(&self, event: BundleEvent) {
if let Some(ref audit_tx) = self.audit_tx {
if let Err(e) = audit_tx.send(event) {
warn!(
target: "payload_builder",
error = %e,
"Failed to send audit event"
);
}
}
}

/// Returns the parent block the payload will be build on.
pub fn parent(&self) -> &SealedHeader {
&self.config.parent_header
Expand Down Expand Up @@ -433,7 +452,7 @@ impl<ExtraCtx: Debug + Default> OpPayloadBuilderCtx<ExtraCtx> {
is_bundle_tx && !reverted_hashes.unwrap().contains(&tx_hash);

let log_txn = |result: TxnExecutionResult| {
debug!(
info!(
target: "payload_builder",
message = "Considering transaction",
tx_hash = ?tx_hash,
Expand All @@ -445,6 +464,17 @@ impl<ExtraCtx: Debug + Default> OpPayloadBuilderCtx<ExtraCtx> {

num_txs_considered += 1;

// Look up bundle_id for this tx (registered via base_txBundleId RPC)
let bundle_id = self.backrun_bundle_store.get_tx_bundle_id(&tx_hash);

// Emit StartExecuting audit event - tx is about to be executed
self.send_audit_event(BundleEvent::StartExecuting {
bundle_id,
tx_hash,
block_number: self.block_number(),
timestamp_ms: chrono::Utc::now().timestamp_millis(),
});

let _resource_usage = self.resource_metering.get(&tx_hash);

// TODO: ideally we should get this from the txpool stream
Expand Down Expand Up @@ -543,7 +573,9 @@ impl<ExtraCtx: Debug + Default> OpPayloadBuilderCtx<ExtraCtx> {
continue;
}

if result.is_success() {
let is_success = result.is_success();

if is_success {
log_txn(TxnExecutionResult::Success);
num_txs_simulated_success += 1;
self.metrics.successful_tx_gas_used.record(gas_used as f64);
Expand Down Expand Up @@ -600,6 +632,90 @@ impl<ExtraCtx: Debug + Default> OpPayloadBuilderCtx<ExtraCtx> {
// append sender and transaction to the respective lists
info.executed_senders.push(tx.signer());
info.executed_transactions.push(tx.into_inner());

// Emit Executed audit event - tx successfully executed and committed
self.send_audit_event(BundleEvent::Executed {
bundle_id,
tx_hash,
block_number: self.block_number(),
gas_used,
timestamp_ms: chrono::Utc::now().timestamp_millis(),
});

// Execute backrun bundles for this transaction if it succeeded
if is_success && let Some(backrun_bundles) = self.backrun_bundle_store.get(&tx_hash) {
self.metrics.backrun_target_txs_found_total.increment(1);

for stored_bundle in backrun_bundles {
let bundle_id = stored_bundle.bundle_id;
for backrun_tx in stored_bundle.backrun_txs {
let ResultAndState { result, state } = match evm.transact(&backrun_tx) {
Ok(res) => res,
Err(err) => {
return Err(PayloadBuilderError::evm(err));
}
};

let backrun_gas_used = result.gas_used();
let is_backrun_success = result.is_success();

if !is_backrun_success {
info!(message = "Backrun transaction reverted", tx_hash = ?backrun_tx.tx_hash(), bundle_id = %bundle_id);

// Emit BackrunBundleExecuted audit event for reverted tx
self.send_audit_event(BundleEvent::BackrunBundleExecuted {
bundle_id,
target_tx_hash: tx_hash,
backrun_tx_hash: backrun_tx.tx_hash(),
block_number: self.block_number(),
gas_used: backrun_gas_used,
success: false,
timestamp_ms: chrono::Utc::now().timestamp_millis(),
});

continue;
}

info!(message = "Backrun transaction succeeded", tx_hash = ?backrun_tx.tx_hash(), bundle_id = %bundle_id);

info.cumulative_gas_used += backrun_gas_used;
info.cumulative_da_bytes_used += backrun_tx.encoded_2718().len() as u64;

let ctx = ReceiptBuilderCtx {
tx: backrun_tx.inner(),
evm: &evm,
result,
state: &state,
cumulative_gas_used: info.cumulative_gas_used,
};
info.receipts.push(self.build_receipt(ctx, None));

// commit changes
evm.db_mut().commit(state);

// Emit BackrunBundleExecuted audit event for successful tx
self.send_audit_event(BundleEvent::BackrunBundleExecuted {
bundle_id,
target_tx_hash: tx_hash,
backrun_tx_hash: backrun_tx.tx_hash(),
block_number: self.block_number(),
gas_used: backrun_gas_used,
success: true,
timestamp_ms: chrono::Utc::now().timestamp_millis(),
});

// update add to total fees
let miner_fee = backrun_tx
.effective_tip_per_gas(base_fee)
.expect("fee is always valid; execution succeeded");
info.total_fees += U256::from(miner_fee) * U256::from(backrun_gas_used);

// append sender and transaction to the respective lists
info.executed_senders.push(backrun_tx.signer());
info.executed_transactions.push(backrun_tx.into_inner());
}
}
}
}

let payload_transaction_simulation_time = execute_txs_start_time.elapsed();
Expand Down
10 changes: 10 additions & 0 deletions crates/op-rbuilder/src/builders/flashblocks/ctx.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
builders::{BuilderConfig, OpPayloadBuilderCtx, flashblocks::FlashblocksConfig},
bundles::{AuditSender, BackrunBundleStore},
gas_limiter::{AddressGasLimiter, args::GasLimiterArgs},
metrics::OpRBuilderMetrics,
resource_metering::ResourceMetering,
Expand Down Expand Up @@ -32,6 +33,10 @@ pub(super) struct OpPayloadSyncerCtx {
metrics: Arc<OpRBuilderMetrics>,
/// Resource metering tracking
resource_metering: ResourceMetering,
/// Backrun bundle store
backrun_bundle_store: BackrunBundleStore,
/// Audit event channel
audit_tx: Option<AuditSender>,
}

impl OpPayloadSyncerCtx {
Expand All @@ -45,13 +50,16 @@ impl OpPayloadSyncerCtx {
Client: ClientBounds,
{
let chain_spec = client.chain_spec();
let audit_tx = builder_config.backrun_bundle_store.audit_tx();
Ok(Self {
evm_config,
da_config: builder_config.da_config.clone(),
chain_spec,
max_gas_per_txn: builder_config.max_gas_per_txn,
metrics,
resource_metering: builder_config.resource_metering,
backrun_bundle_store: builder_config.backrun_bundle_store,
audit_tx,
})
}

Expand Down Expand Up @@ -85,6 +93,8 @@ impl OpPayloadSyncerCtx {
max_gas_per_txn: self.max_gas_per_txn,
address_gas_limiter: AddressGasLimiter::new(GasLimiterArgs::default()),
resource_metering: self.resource_metering.clone(),
backrun_bundle_store: self.backrun_bundle_store.clone(),
audit_tx: self.audit_tx,
}
}
}
2 changes: 2 additions & 0 deletions crates/op-rbuilder/src/builders/flashblocks/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ where
max_gas_per_txn: self.config.max_gas_per_txn,
address_gas_limiter: self.address_gas_limiter.clone(),
resource_metering: self.config.resource_metering.clone(),
backrun_bundle_store: self.config.backrun_bundle_store.clone(),
audit_tx: self.config.backrun_bundle_store.audit_tx(),
})
}

Expand Down
8 changes: 7 additions & 1 deletion crates/op-rbuilder/src/builders/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ mod flashblocks;
mod generator;
mod standard;

use crate::resource_metering::ResourceMetering;
use crate::{bundles::BackrunBundleStore, resource_metering::ResourceMetering};
pub use builder_tx::{
BuilderTransactionCtx, BuilderTransactionError, BuilderTransactions, InvalidContractDataError,
SimulationSuccessResult, get_balance, get_nonce,
Expand Down Expand Up @@ -130,6 +130,9 @@ pub struct BuilderConfig<Specific: Clone> {

/// Resource metering context
pub resource_metering: ResourceMetering,

/// Backrun bundle store for storing backrun transactions
pub backrun_bundle_store: BackrunBundleStore,
}

impl<S: Debug + Clone> core::fmt::Debug for BuilderConfig<S> {
Expand All @@ -152,6 +155,7 @@ impl<S: Debug + Clone> core::fmt::Debug for BuilderConfig<S> {
.field("specific", &self.specific)
.field("max_gas_per_txn", &self.max_gas_per_txn)
.field("gas_limiter_config", &self.gas_limiter_config)
.field("backrun_bundle_store", &self.backrun_bundle_store)
.finish()
}
}
Expand All @@ -171,6 +175,7 @@ impl<S: Default + Clone> Default for BuilderConfig<S> {
max_gas_per_txn: None,
gas_limiter_config: GasLimiterArgs::default(),
resource_metering: ResourceMetering::default(),
backrun_bundle_store: BackrunBundleStore::default(),
}
}
}
Expand All @@ -197,6 +202,7 @@ where
args.enable_resource_metering,
args.resource_metering_buffer_size,
),
backrun_bundle_store: BackrunBundleStore::new(args.backrun_bundle_buffer_size),
specific: S::try_from(args)?,
})
}
Expand Down
2 changes: 2 additions & 0 deletions crates/op-rbuilder/src/builders/standard/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ where
max_gas_per_txn: self.config.max_gas_per_txn,
address_gas_limiter: self.address_gas_limiter.clone(),
resource_metering: self.config.resource_metering.clone(),
backrun_bundle_store: self.config.backrun_bundle_store.clone(),
audit_tx: self.config.backrun_bundle_store.audit_tx(),
};

let builder = OpBuilder::new(best);
Expand Down
Loading
Loading