Skip to content

chore: minor cleanup flashblocks logic #357

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 9, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 42 additions & 55 deletions crates/rollup-boost/src/flashblocks/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use super::outbound::WebSocketPublisher;
use super::primitives::{
ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, FlashblocksPayloadV1,
};
use crate::RpcClientError;
use crate::flashblocks::metrics::FlashblocksServiceMetrics;
use crate::{
ClientResult, EngineApiExt, NewPayload, OpExecutionPayloadEnvelope, PayloadVersion, RpcClient,
Expand All @@ -21,6 +20,7 @@ use op_alloy_rpc_types_engine::{
};
use reth_optimism_payload_builder::payload_id_optimism;
use serde::{Deserialize, Serialize};
use std::io;
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::RwLock;
Expand All @@ -41,12 +41,6 @@ pub enum FlashblocksError {
MissingPayload,
}

impl From<FlashblocksError> for RpcClientError {
fn from(err: FlashblocksError) -> Self {
RpcClientError::InvalidPayload(err.to_string())
}
}

#[derive(Debug, Deserialize, Serialize)]
struct FlashbotsMessage {
method: String,
Expand All @@ -69,30 +63,28 @@ pub struct FlashblockBuilder {

impl FlashblockBuilder {
pub fn new() -> Self {
Self {
base: None,
flashblocks: Vec::new(),
}
Self::default()
}

pub fn extend(&mut self, payload: FlashblocksPayloadV1) -> Result<(), FlashblocksError> {
tracing::debug!(message = "Extending payload", payload_id = %payload.payload_id, index = payload.index, has_base=payload.base.is_some());

// Check base payload rules
match (payload.index, payload.base) {
// First payload must have a base
(0, None) => return Err(FlashblocksError::MissingBasePayload),
(0, Some(base)) => self.base = Some(base),
// Subsequent payloads must have no base
(_, Some(_)) => return Err(FlashblocksError::UnexpectedBasePayload),
(_, None) => {} // Non-zero index without base is fine
}

// Validate the index is contiguous
if payload.index != self.flashblocks.len() as u64 {
return Err(FlashblocksError::InvalidIndex);
}

// Check base payload rules
if payload.index == 0 {
Copy link
Collaborator

@ferranbt ferranbt Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to add a test for this before rewriting it to make sure the migration works as expected. I can do this today.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if let Some(base) = payload.base {
self.base = Some(base)
} else {
return Err(FlashblocksError::MissingBasePayload);
}
} else if payload.base.is_some() {
return Err(FlashblocksError::UnexpectedBasePayload);
}

// Update latest diff and accumulate transactions and withdrawals
self.flashblocks.push(payload.diff);

Expand All @@ -118,17 +110,14 @@ impl FlashblockBuilder {
.last()
.ok_or(FlashblocksError::MissingDelta)?;

let transactions = self
.flashblocks
.iter()
.flat_map(|diff| diff.transactions.clone())
.collect();

let withdrawals = self
.flashblocks
.iter()
.flat_map(|diff| diff.withdrawals.clone())
.collect();
let (transactions, withdrawals) = self.flashblocks.iter().fold(
(Vec::new(), Vec::new()),
|(mut transactions, mut withdrawals), delta| {
transactions.extend(delta.transactions.clone());
withdrawals.extend(delta.withdrawals.clone());
(transactions, withdrawals)
},
);

let withdrawals_root = diff.withdrawals_root;

Expand Down Expand Up @@ -200,7 +189,7 @@ pub struct FlashblocksService {
}

impl FlashblocksService {
pub fn new(client: RpcClient, outbound_addr: SocketAddr) -> eyre::Result<Self> {
pub fn new(client: RpcClient, outbound_addr: SocketAddr) -> io::Result<Self> {
Copy link
Collaborator

@SozinM SozinM Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using io in here?
We should keep eyre (or better use anyhow)
FlashblocksService::new should not return specialized io error IMO

Copy link
Collaborator Author

@0xKitsune 0xKitsune Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used io::Result here because the only fallible operation in FlashblocksService::new() is constructing the WebSocketPublisher which returns an io::Result. Since this is the sole error source, returning the specific error type is more explicit and avoids unnecessary dynamic error wrapping and keeps the API clear. Lmk if I am overlooking something but I do not see a downside in this case to using io::Result as it accurately reflects exactly what can fail.

With that being said I understand the argument for using eyre or anyhow for consistency or simplicity but I chose to go with the above approach to prefer being explicit. If there is a strong preference go with eyre, Im happy to adjust.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is do prefer general approach to errors, because you could always specialize later in case you really need to
I think we could ask some more people on that matter, @ferranbt @0xOsiris wdyt?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also main concern that if we add something to the new we will need to change return types.
At some point i was thinking of adding redis to keep traces ids between builder and boost. It that case i would need to change to general type again, because redis error won't cast into io (maybe it would)

let ws_pub = WebSocketPublisher::new(outbound_addr)?.into();

Ok(Self {
Expand All @@ -216,7 +205,7 @@ impl FlashblocksService {
&self,
version: PayloadVersion,
payload_id: PayloadId,
) -> Result<Option<OpExecutionPayloadEnvelope>, FlashblocksError> {
) -> Result<OpExecutionPayloadEnvelope, FlashblocksError> {
// Check that we have flashblocks for correct payload
if *self.current_payload_id.read().await != payload_id {
// We have outdated `current_payload_id` so we should fallback to get_payload
Expand All @@ -237,7 +226,7 @@ impl FlashblocksService {
std::mem::replace(&mut *builder, FlashblockBuilder::new()).into_envelope(version)?
};

Ok(Some(payload))
Ok(payload)
}

pub async fn set_current_payload_id(&self, payload_id: PayloadId) {
Expand Down Expand Up @@ -314,12 +303,13 @@ impl EngineApiExt for FlashblocksService {
let payload_id = payload_id_optimism(&fork_choice_state.head_block_hash, attr, 3);
self.set_current_payload_id(payload_id).await;
}
let result = self

let resp = self
.client
.fork_choice_updated_v3(fork_choice_state, payload_attributes)
.await?;

if let Some(payload_id) = result.payload_id {
if let Some(payload_id) = resp.payload_id {
let current_payload = *self.current_payload_id.read().await;
if current_payload != payload_id {
tracing::error!(
Expand All @@ -334,7 +324,7 @@ impl EngineApiExt for FlashblocksService {
} else {
tracing::debug!(message = "Forkchoice updated with no payload ID");
}
Ok(result)
Ok(resp)
}

async fn new_payload(&self, new_payload: NewPayload) -> ClientResult<PayloadStatus> {
Expand All @@ -347,22 +337,19 @@ impl EngineApiExt for FlashblocksService {
version: PayloadVersion,
) -> ClientResult<OpExecutionPayloadEnvelope> {
// First try to get the best flashblocks payload from the builder if it exists

match self.get_best_payload(version, payload_id).await {
Ok(Some(payload)) => {
Ok(payload) => {
info!(message = "Returning fb payload");
return Ok(payload);
}
Ok(None) => {
info!(message = "No flashblocks payload available");
Ok(payload)
}
Err(e) => {
error!(message = "Error getting fb best payload", error = %e);
error!(message = "Error getting fb best payload, falling back on client", error = %e);
info!(message = "Falling back to get_payload on client", payload_id = %payload_id);
let result = self.client.get_payload(payload_id, version).await?;
Ok(result)
}
}

info!(message = "Falling back to get_payload on client", payload_id = %payload_id);
let result = self.client.get_payload(payload_id, version).await?;
Ok(result)
}

async fn get_block_by_number(
Expand Down Expand Up @@ -460,28 +447,28 @@ mod tests {
assert!(result.is_err());
assert_eq!(result.unwrap_err(), FlashblocksError::MissingBasePayload);

// Error: First payload must have index 0
// Ok: First payload is correct if it has base and index 0
let result = builder.extend(FlashblocksPayloadV1 {
payload_id: PayloadId::default(),
index: 1,
index: 0,
base: Some(ExecutionPayloadBaseV1 {
..Default::default()
}),
..Default::default()
});
assert!(result.is_err());
assert_eq!(result.unwrap_err(), FlashblocksError::UnexpectedBasePayload);
assert!(result.is_ok());

// Ok: First payload is correct if it has base and index 0
// Error: First payload must have index 0
let result = builder.extend(FlashblocksPayloadV1 {
payload_id: PayloadId::default(),
index: 0,
index: 1,
base: Some(ExecutionPayloadBaseV1 {
..Default::default()
}),
..Default::default()
});
assert!(result.is_ok());
assert!(result.is_err());
assert_eq!(result.unwrap_err(), FlashblocksError::UnexpectedBasePayload);

// Error: Second payload must have a follow-up index
let result = builder.extend(FlashblocksPayloadV1 {
Expand Down
Loading