-
Notifications
You must be signed in to change notification settings - Fork 49
chore: minor cleanup flashblocks logic #357
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,6 @@ use super::outbound::WebSocketPublisher; | |
use super::primitives::{ | ||
ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, FlashblocksPayloadV1, | ||
}; | ||
use crate::RpcClientError; | ||
use crate::flashblocks::metrics::FlashblocksServiceMetrics; | ||
use crate::{ | ||
ClientResult, EngineApiExt, NewPayload, OpExecutionPayloadEnvelope, PayloadVersion, RpcClient, | ||
|
@@ -21,6 +20,7 @@ use op_alloy_rpc_types_engine::{ | |
}; | ||
use reth_optimism_payload_builder::payload_id_optimism; | ||
use serde::{Deserialize, Serialize}; | ||
use std::io; | ||
use std::sync::Arc; | ||
use thiserror::Error; | ||
use tokio::sync::RwLock; | ||
|
@@ -41,12 +41,6 @@ pub enum FlashblocksError { | |
MissingPayload, | ||
} | ||
|
||
impl From<FlashblocksError> for RpcClientError { | ||
fn from(err: FlashblocksError) -> Self { | ||
RpcClientError::InvalidPayload(err.to_string()) | ||
} | ||
} | ||
|
||
#[derive(Debug, Deserialize, Serialize)] | ||
struct FlashbotsMessage { | ||
method: String, | ||
|
@@ -69,30 +63,28 @@ pub struct FlashblockBuilder { | |
|
||
impl FlashblockBuilder { | ||
pub fn new() -> Self { | ||
Self { | ||
base: None, | ||
flashblocks: Vec::new(), | ||
} | ||
Self::default() | ||
} | ||
|
||
pub fn extend(&mut self, payload: FlashblocksPayloadV1) -> Result<(), FlashblocksError> { | ||
tracing::debug!(message = "Extending payload", payload_id = %payload.payload_id, index = payload.index, has_base=payload.base.is_some()); | ||
|
||
// Check base payload rules | ||
match (payload.index, payload.base) { | ||
// First payload must have a base | ||
(0, None) => return Err(FlashblocksError::MissingBasePayload), | ||
(0, Some(base)) => self.base = Some(base), | ||
// Subsequent payloads must have no base | ||
(_, Some(_)) => return Err(FlashblocksError::UnexpectedBasePayload), | ||
(_, None) => {} // Non-zero index without base is fine | ||
} | ||
|
||
// Validate the index is contiguous | ||
if payload.index != self.flashblocks.len() as u64 { | ||
return Err(FlashblocksError::InvalidIndex); | ||
} | ||
|
||
// Check base payload rules | ||
if payload.index == 0 { | ||
if let Some(base) = payload.base { | ||
self.base = Some(base) | ||
} else { | ||
return Err(FlashblocksError::MissingBasePayload); | ||
} | ||
} else if payload.base.is_some() { | ||
return Err(FlashblocksError::UnexpectedBasePayload); | ||
} | ||
|
||
// Update latest diff and accumulate transactions and withdrawals | ||
self.flashblocks.push(payload.diff); | ||
|
||
|
@@ -118,17 +110,14 @@ impl FlashblockBuilder { | |
.last() | ||
.ok_or(FlashblocksError::MissingDelta)?; | ||
|
||
let transactions = self | ||
.flashblocks | ||
.iter() | ||
.flat_map(|diff| diff.transactions.clone()) | ||
.collect(); | ||
|
||
let withdrawals = self | ||
.flashblocks | ||
.iter() | ||
.flat_map(|diff| diff.withdrawals.clone()) | ||
.collect(); | ||
let (transactions, withdrawals) = self.flashblocks.iter().fold( | ||
(Vec::new(), Vec::new()), | ||
|(mut transactions, mut withdrawals), delta| { | ||
transactions.extend(delta.transactions.clone()); | ||
withdrawals.extend(delta.withdrawals.clone()); | ||
(transactions, withdrawals) | ||
}, | ||
); | ||
|
||
let withdrawals_root = diff.withdrawals_root; | ||
|
||
|
@@ -200,7 +189,7 @@ pub struct FlashblocksService { | |
} | ||
|
||
impl FlashblocksService { | ||
pub fn new(client: RpcClient, outbound_addr: SocketAddr) -> eyre::Result<Self> { | ||
pub fn new(client: RpcClient, outbound_addr: SocketAddr) -> io::Result<Self> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I used With that being said I understand the argument for using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also main concern that if we add something to the new we will need to change return types. |
||
let ws_pub = WebSocketPublisher::new(outbound_addr)?.into(); | ||
|
||
Ok(Self { | ||
|
@@ -216,7 +205,7 @@ impl FlashblocksService { | |
&self, | ||
version: PayloadVersion, | ||
payload_id: PayloadId, | ||
) -> Result<Option<OpExecutionPayloadEnvelope>, FlashblocksError> { | ||
) -> Result<OpExecutionPayloadEnvelope, FlashblocksError> { | ||
// Check that we have flashblocks for correct payload | ||
if *self.current_payload_id.read().await != payload_id { | ||
// We have outdated `current_payload_id` so we should fallback to get_payload | ||
|
@@ -237,7 +226,7 @@ impl FlashblocksService { | |
std::mem::replace(&mut *builder, FlashblockBuilder::new()).into_envelope(version)? | ||
}; | ||
|
||
Ok(Some(payload)) | ||
Ok(payload) | ||
} | ||
|
||
pub async fn set_current_payload_id(&self, payload_id: PayloadId) { | ||
|
@@ -314,12 +303,13 @@ impl EngineApiExt for FlashblocksService { | |
let payload_id = payload_id_optimism(&fork_choice_state.head_block_hash, attr, 3); | ||
self.set_current_payload_id(payload_id).await; | ||
} | ||
let result = self | ||
|
||
let resp = self | ||
.client | ||
.fork_choice_updated_v3(fork_choice_state, payload_attributes) | ||
.await?; | ||
|
||
if let Some(payload_id) = result.payload_id { | ||
if let Some(payload_id) = resp.payload_id { | ||
let current_payload = *self.current_payload_id.read().await; | ||
if current_payload != payload_id { | ||
tracing::error!( | ||
|
@@ -334,7 +324,7 @@ impl EngineApiExt for FlashblocksService { | |
} else { | ||
tracing::debug!(message = "Forkchoice updated with no payload ID"); | ||
} | ||
Ok(result) | ||
Ok(resp) | ||
} | ||
|
||
async fn new_payload(&self, new_payload: NewPayload) -> ClientResult<PayloadStatus> { | ||
|
@@ -347,22 +337,19 @@ impl EngineApiExt for FlashblocksService { | |
version: PayloadVersion, | ||
) -> ClientResult<OpExecutionPayloadEnvelope> { | ||
// First try to get the best flashblocks payload from the builder if it exists | ||
|
||
match self.get_best_payload(version, payload_id).await { | ||
Ok(Some(payload)) => { | ||
Ok(payload) => { | ||
info!(message = "Returning fb payload"); | ||
return Ok(payload); | ||
} | ||
Ok(None) => { | ||
info!(message = "No flashblocks payload available"); | ||
Ok(payload) | ||
} | ||
Err(e) => { | ||
error!(message = "Error getting fb best payload", error = %e); | ||
error!(message = "Error getting fb best payload, falling back on client", error = %e); | ||
info!(message = "Falling back to get_payload on client", payload_id = %payload_id); | ||
let result = self.client.get_payload(payload_id, version).await?; | ||
Ok(result) | ||
} | ||
} | ||
|
||
info!(message = "Falling back to get_payload on client", payload_id = %payload_id); | ||
let result = self.client.get_payload(payload_id, version).await?; | ||
Ok(result) | ||
} | ||
|
||
async fn get_block_by_number( | ||
|
@@ -460,28 +447,28 @@ mod tests { | |
assert!(result.is_err()); | ||
assert_eq!(result.unwrap_err(), FlashblocksError::MissingBasePayload); | ||
|
||
// Error: First payload must have index 0 | ||
// Ok: First payload is correct if it has base and index 0 | ||
let result = builder.extend(FlashblocksPayloadV1 { | ||
payload_id: PayloadId::default(), | ||
index: 1, | ||
index: 0, | ||
base: Some(ExecutionPayloadBaseV1 { | ||
..Default::default() | ||
}), | ||
..Default::default() | ||
}); | ||
assert!(result.is_err()); | ||
assert_eq!(result.unwrap_err(), FlashblocksError::UnexpectedBasePayload); | ||
assert!(result.is_ok()); | ||
|
||
// Ok: First payload is correct if it has base and index 0 | ||
// Error: First payload must have index 0 | ||
let result = builder.extend(FlashblocksPayloadV1 { | ||
payload_id: PayloadId::default(), | ||
index: 0, | ||
index: 1, | ||
base: Some(ExecutionPayloadBaseV1 { | ||
..Default::default() | ||
}), | ||
..Default::default() | ||
}); | ||
assert!(result.is_ok()); | ||
assert!(result.is_err()); | ||
assert_eq!(result.unwrap_err(), FlashblocksError::UnexpectedBasePayload); | ||
|
||
// Error: Second payload must have a follow-up index | ||
let result = builder.extend(FlashblocksPayloadV1 { | ||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to add a test for this before rewriting it to make sure the migration works as expected. I can do this today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#362
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#363