Skip to content

Commit 8d839a6

Browse files
authored
Submit delayed payments to block processor (#742)
1 parent a0073cb commit 8d839a6

File tree

6 files changed

+81
-44
lines changed

6 files changed

+81
-44
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ tracing = "0.1.37"
178178
metrics = { version = "0.24.1" }
179179
ahash = "0.8.6"
180180
time = { version = "0.3.36", features = ["macros", "formatting", "parsing"] }
181+
uuid = { version = "1.6.1", features = ["serde", "v5", "v4"] }
181182

182183
eth-sparse-mpt = { path = "crates/eth-sparse-mpt" }
183184
bid-scraper = { path = "crates/bid-scraper" }
@@ -186,3 +187,4 @@ rbuilder-primitives = { path = "crates/rbuilder-primitives" }
186187
rbuilder-config = { path = "crates/rbuilder-config" }
187188
sysperf = { path = "crates/sysperf" }
188189
metrics_macros = { path = "crates/rbuilder/src/telemetry/metrics_macros" }
190+

crates/rbuilder-operator/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ clickhouse = { version = "0.12.2", features = ["time", "uuid", "native-tls"] }
5757
futures-util.workspace = true
5858
parking_lot.workspace = true
5959
lazy_static.workspace = true
60+
uuid.workspace = true
6061

6162
[build-dependencies]
6263
built = { version = "0.7.1", features = ["git2", "chrono"] }

crates/rbuilder-operator/src/blocks_processor.rs

Lines changed: 60 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
1-
use alloy_primitives::{BlockHash, U256};
1+
use alloy_primitives::{Address, BlockHash, U256};
22
use exponential_backoff::Backoff;
3-
use jsonrpsee::{
4-
core::{client::ClientT, traits::ToRpcParams},
5-
http_client::{HttpClient, HttpClientBuilder},
6-
};
3+
use jsonrpsee::core::{client::ClientT, traits::ToRpcParams};
74
use rbuilder::{
85
building::BuiltBlockTrace,
96
live_builder::{
@@ -14,7 +11,7 @@ use rbuilder::{
1411
use rbuilder_primitives::{
1512
mev_boost::SubmitBlockRequest,
1613
serialize::{RawBundle, RawShareBundle},
17-
Bundle, Order,
14+
Bundle, Order, OrderId,
1815
};
1916
use reth_primitives::SealedBlock;
2017
use serde::{Deserialize, Serialize};
@@ -23,12 +20,12 @@ use serde_with::{serde_as, DisplayFromStr};
2320
use std::{sync::Arc, time::Duration};
2421
use time::format_description::well_known;
2522
use tracing::{error, warn, Span};
23+
use uuid::Uuid;
2624

2725
use crate::metrics::inc_submit_block_errors;
2826

2927
const BLOCK_PROCESSOR_ERROR_CATEGORY: &str = "block_processor";
30-
const DEFAULT_BLOCK_CONSUME_BUILT_BLOCK_METHOD: &str = "block_consumeBuiltBlockV2";
31-
pub const SIGNED_BLOCK_CONSUME_BUILT_BLOCK_METHOD: &str = "flashbots_consumeBuiltBlockV2";
28+
pub const SIGNED_BLOCK_CONSUME_BUILT_BLOCK_METHOD: &str = "flashbots_consumeBuiltBlockV3";
3229

3330
#[derive(Debug, Serialize, Deserialize)]
3431
#[serde(rename_all = "camelCase")]
@@ -67,6 +64,14 @@ struct BlocksProcessorHeader {
6764
pub number: Option<U256>,
6865
}
6966

67+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
68+
#[serde(rename_all = "camelCase")]
69+
pub struct BlockProcessorDelayedPayments {
70+
pub source: Uuid,
71+
pub value: U256,
72+
pub address: Address,
73+
}
74+
7075
type ConsumeBuiltBlockRequest = (
7176
BlocksProcessorHeader,
7277
String,
@@ -78,6 +83,7 @@ type ConsumeBuiltBlockRequest = (
7883
String,
7984
U256,
8085
U256,
86+
Vec<BlockProcessorDelayedPayments>,
8187
);
8288

8389
/// Struct to avoid copying ConsumeBuiltBlockRequest since HttpClient::request eats the parameter.
@@ -113,22 +119,6 @@ pub struct BlocksProcessorClient<HttpClientType> {
113119
consume_built_block_method: &'static str,
114120
}
115121

116-
impl BlocksProcessorClient<HttpClient> {
117-
pub fn try_from(
118-
url: &str,
119-
max_request_size: u32,
120-
max_concurrent_requests: usize,
121-
) -> eyre::Result<Self> {
122-
Ok(Self {
123-
client: HttpClientBuilder::default()
124-
.max_request_size(max_request_size)
125-
.max_concurrent_requests(max_concurrent_requests)
126-
.build(url)?,
127-
consume_built_block_method: DEFAULT_BLOCK_CONSUME_BUILT_BLOCK_METHOD,
128-
})
129-
}
130-
}
131-
132122
/// RawBundle::encode_no_blobs but more compatible.
133123
fn encode_bundle_for_blocks_processor(mut bundle: Bundle) -> RawBundle {
134124
// set to 0 when none
@@ -187,6 +177,36 @@ impl<HttpClientType: ClientT> BlocksProcessorClient<HttpClientType> {
187177

188178
let used_share_bundles = Self::get_used_sbundles(built_block_trace);
189179

180+
let delayed_payments = built_block_trace
181+
.included_orders
182+
.iter()
183+
.filter_map(|res| {
184+
let delayed_kickback = if let Some(k) = &res.delayed_kickback {
185+
k
186+
} else {
187+
return None;
188+
};
189+
190+
if delayed_kickback.should_pay_in_block {
191+
return None;
192+
}
193+
194+
let bundle_uuid = match res.order.id() {
195+
OrderId::Bundle(uuid) => uuid,
196+
_ => {
197+
error!(order = ?res.order.id(), "Delayed kickback is found for non-bundle");
198+
return None;
199+
}
200+
};
201+
202+
Some(BlockProcessorDelayedPayments {
203+
source: bundle_uuid,
204+
value: delayed_kickback.payout_value,
205+
address: delayed_kickback.recipient,
206+
})
207+
})
208+
.collect();
209+
190210
let params: ConsumeBuiltBlockRequest = (
191211
header,
192212
closed_at,
@@ -198,6 +218,7 @@ impl<HttpClientType: ClientT> BlocksProcessorClient<HttpClientType> {
198218
builder_name,
199219
built_block_trace.true_bid_value,
200220
best_bid_value,
221+
delayed_payments,
201222
);
202223
let request = ConsumeBuiltBlockRequestArc::new(params);
203224
let backoff = backoff();
@@ -375,4 +396,19 @@ mod tests {
375396
dbg!(total_sleep_time);
376397
assert!(total_sleep_time > 40 && total_sleep_time < 90);
377398
}
399+
400+
#[test]
401+
fn test_delayed_payment_serialize() {
402+
let value = BlockProcessorDelayedPayments {
403+
address: alloy_primitives::address!("93Ea7cB31f76B982601321b2A0d93Ec9A948236D"),
404+
value: U256::from(16),
405+
source: Uuid::try_parse("ff7b2232-b30d-4889-9258-c3632ba4bfc0").unwrap(),
406+
};
407+
408+
let value_str = serde_json::to_string(&value).unwrap();
409+
410+
let expected_str = r#"{"source":"ff7b2232-b30d-4889-9258-c3632ba4bfc0","value":"0x10","address":"0x93ea7cb31f76b982601321b2a0d93ec9a948236d"}"#;
411+
412+
assert_eq!(value_str, expected_str);
413+
}
378414
}

crates/rbuilder-operator/src/flashbots_config.rs

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -274,25 +274,22 @@ impl FlashbotsConfig {
274274
block_processor_key: Option<PrivateKeySigner>,
275275
) -> eyre::Result<Option<Box<dyn BidObserver + Send + Sync>>> {
276276
if let Some(url) = &self.blocks_processor_url {
277-
let bid_observer: Box<dyn BidObserver + Send + Sync> =
278-
if let Some(block_processor_key) = block_processor_key {
279-
let client = crate::signed_http_client::create_client(
280-
url,
281-
block_processor_key,
282-
self.blocks_processor_max_request_size_bytes,
283-
self.blocks_processor_max_concurrent_requests,
284-
)?;
285-
let block_processor =
286-
BlocksProcessorClient::new(client, SIGNED_BLOCK_CONSUME_BUILT_BLOCK_METHOD);
287-
Box::new(BlocksProcessorClientBidObserver::new(block_processor))
288-
} else {
289-
let client = BlocksProcessorClient::try_from(
290-
url,
291-
self.blocks_processor_max_request_size_bytes,
292-
self.blocks_processor_max_concurrent_requests,
293-
)?;
294-
Box::new(BlocksProcessorClientBidObserver::new(client))
295-
};
277+
let bid_observer: Box<dyn BidObserver + Send + Sync> = if let Some(
278+
block_processor_key,
279+
) = block_processor_key
280+
{
281+
let client = crate::signed_http_client::create_client(
282+
url,
283+
block_processor_key,
284+
self.blocks_processor_max_request_size_bytes,
285+
self.blocks_processor_max_concurrent_requests,
286+
)?;
287+
let block_processor =
288+
BlocksProcessorClient::new(client, SIGNED_BLOCK_CONSUME_BUILT_BLOCK_METHOD);
289+
Box::new(BlocksProcessorClientBidObserver::new(block_processor))
290+
} else {
291+
eyre::bail!("Unsigned block processing is not supported: if blocks_processor_url is set, a block_processor_key must also be provided");
292+
};
296293
Ok(Some(bid_observer))
297294
} else {
298295
if block_processor_key.is_some() {

crates/rbuilder/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ mev-share-sse.workspace = true
101101
beacon-api-client.workspace = true
102102
ethereum-consensus.workspace = true
103103
jsonrpsee = { version = "0.20.3", features = ["full"] }
104-
uuid = { version = "1.6.1", features = ["serde", "v5", "v4"] }
104+
uuid.workspace = true
105105
prometheus.workspace = true
106106
warp.workspace = true
107107
lazy_static.workspace = true

0 commit comments

Comments
 (0)