Skip to content

Commit

Permalink
Merge pull request #69 from ethereum-optimism/refcell/payload-queue
Browse files Browse the repository at this point in the history
feat(derive): Attributes Queue
  • Loading branch information
refcell authored Apr 4, 2024
2 parents 92fcc87 + dc2bbe6 commit bbad960
Show file tree
Hide file tree
Showing 15 changed files with 589 additions and 15 deletions.
328 changes: 328 additions & 0 deletions crates/derive/src/stages/attributes_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,328 @@
//! Contains the logic for the `AttributesQueue` stage.
use crate::{
traits::{LogLevel, OriginProvider, ResettableStage, TelemetryProvider},
types::{
AttributesWithParent, BlockID, BlockInfo, L2BlockInfo, PayloadAttributes, ResetError,
RollupConfig, SingleBatch, StageError, StageResult, SystemConfig,
},
};
use alloc::boxed::Box;
use alloy_primitives::Bytes;
use async_trait::async_trait;
use core::fmt::Debug;

pub trait AttributesBuilder {
/// Prepare the payload attributes.
fn prepare_payload_attributes(
&mut self,
l2_parent: L2BlockInfo,
epoch: BlockID,
) -> anyhow::Result<PayloadAttributes>;
}

/// [AttributesProvider] is a trait abstraction that generalizes the [BatchQueue] stage.
#[async_trait]
pub trait AttributesProvider {
/// Returns the next valid batch upon the given safe head.
async fn next_batch(&mut self, parent: L2BlockInfo) -> StageResult<SingleBatch>;

/// Returns whether the current batch is the last in its span.
fn is_last_in_span(&self) -> bool;
}

/// [AttributesQueue] accepts batches from the [BatchQueue] stage
/// and transforms them into [PayloadAttributes]. The outputted payload
/// attributes cannot be buffered because each batch->attributes transformation
/// pulls in data about the current L2 safe head.
///
/// [AttributesQueue] also buffers batches that have been output because
/// multiple batches can be created at once.
///
/// This stage can be reset by clearing its batch buffer.
/// This stage does not need to retain any references to L1 blocks.
#[derive(Debug)]
pub struct AttributesQueue<P, T, AB>
where
P: AttributesProvider + OriginProvider + Debug,
T: TelemetryProvider + Debug,
AB: AttributesBuilder + Debug,
{
/// The rollup config.
cfg: RollupConfig,
/// The previous stage of the derivation pipeline.
prev: P,
/// Telemetry provider.
telemetry: T,
/// Whether the current batch is the last in its span.
is_last_in_span: bool,
/// The current batch being processed.
batch: Option<SingleBatch>,
/// The attributes builder.
builder: AB,
}

impl<P, T, AB> AttributesQueue<P, T, AB>
where
P: AttributesProvider + OriginProvider + Debug,
T: TelemetryProvider + Debug,
AB: AttributesBuilder + Debug,
{
/// Create a new [AttributesQueue] stage.
pub fn new(cfg: RollupConfig, prev: P, telemetry: T, builder: AB) -> Self {
Self { cfg, prev, telemetry, is_last_in_span: false, batch: None, builder }
}

/// Returns the L1 origin [BlockInfo].
pub fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}

/// Loads a [SingleBatch] from the [AttributesProvider] if needed.
pub async fn load_batch(&mut self, parent: L2BlockInfo) -> StageResult<SingleBatch> {
if self.batch.is_none() {
let batch = self.prev.next_batch(parent).await?;
self.batch = Some(batch);
self.is_last_in_span = self.prev.is_last_in_span();
}
self.batch.as_ref().cloned().ok_or(StageError::Eof)
}

/// Returns the next [AttributesWithParent] from the current batch.
pub async fn next_attributes(
&mut self,
parent: L2BlockInfo,
) -> StageResult<AttributesWithParent> {
// Load the batch.
let batch = self.load_batch(parent).await?;

// Construct the payload attributes from the loaded batch.
let attributes = self.create_next_attributes(batch, parent).await?;
let populated_attributes =
AttributesWithParent { attributes, parent, is_last_in_span: self.is_last_in_span };

// Clear out the local state once payload attributes are prepared.
self.batch = None;
self.is_last_in_span = false;
Ok(populated_attributes)
}

/// Creates the next attributes, transforming a [SingleBatch] into [PayloadAttributes].
/// This sets `no_tx_pool` and appends the batched txs to the attributes tx list.
pub async fn create_next_attributes(
&mut self,
batch: SingleBatch,
parent: L2BlockInfo,
) -> StageResult<PayloadAttributes> {
// Sanity check parent hash
if batch.parent_hash != parent.block_info.hash {
return Err(StageError::Reset(ResetError::BadParentHash(
batch.parent_hash,
parent.block_info.hash,
)));
}

// Sanity check timestamp
let actual = parent.block_info.timestamp + self.cfg.block_time;
if actual != batch.timestamp {
return Err(StageError::Reset(ResetError::BadTimestamp(batch.timestamp, actual)));
}

// Prepare the payload attributes
let tx_count = batch.transactions.len();
let mut attributes = self
.builder
.prepare_payload_attributes(parent, batch.epoch())
.map_err(StageError::AttributesBuild)?;
attributes.no_tx_pool = true;
attributes.transactions.extend(batch.transactions);

self.telemetry.write(
Bytes::from(alloc::format!(
"generated attributes in payload queue: txs={}, timestamp={}",
tx_count,
batch.timestamp,
)),
LogLevel::Info,
);

Ok(attributes)
}
}

#[async_trait]
impl<P, T, AB> ResettableStage for AttributesQueue<P, T, AB>
where
P: AttributesProvider + OriginProvider + Send + Debug,
T: TelemetryProvider + Send + Debug,
AB: AttributesBuilder + Send + Debug,
{
async fn reset(&mut self, _: BlockInfo, _: SystemConfig) -> StageResult<()> {
self.telemetry.write(Bytes::from("resetting attributes queue"), LogLevel::Info);
// TODO: metrice the reset using telemetry
// telemetry can provide a method of logging and metricing
self.batch = None;
self.is_last_in_span = false;
Err(StageError::Eof)
}
}

#[cfg(test)]
mod tests {
use super::{
AttributesQueue, AttributesWithParent, BlockInfo, L2BlockInfo, PayloadAttributes,
RollupConfig, SingleBatch, StageError, StageResult,
};
use crate::{
stages::test_utils::{new_mock_batch_queue, MockAttributesBuilder, MockBatchQueue},
traits::test_utils::TestTelemetry,
types::RawTransaction,
};
use alloc::{vec, vec::Vec};
use alloy_primitives::b256;

fn new_attributes_queue(
cfg: Option<RollupConfig>,
origin: Option<BlockInfo>,
batches: Vec<StageResult<SingleBatch>>,
) -> AttributesQueue<MockBatchQueue, TestTelemetry, MockAttributesBuilder> {
let cfg = cfg.unwrap_or_default();
let telemetry = TestTelemetry::new();
let mock_batch_queue = new_mock_batch_queue(origin, batches);
let mock_attributes_builder = MockAttributesBuilder::default();
AttributesQueue::new(cfg, mock_batch_queue, telemetry, mock_attributes_builder)
}

#[tokio::test]
async fn test_load_batch_eof() {
let mut attributes_queue = new_attributes_queue(None, None, vec![]);
let parent = L2BlockInfo::default();
let result = attributes_queue.load_batch(parent).await.unwrap_err();
assert_eq!(result, StageError::Eof);
}

#[tokio::test]
async fn test_load_batch_last_in_span() {
let mut attributes_queue = new_attributes_queue(None, None, vec![Ok(Default::default())]);
let parent = L2BlockInfo::default();
let result = attributes_queue.load_batch(parent).await.unwrap();
assert_eq!(result, Default::default());
assert!(attributes_queue.is_last_in_span);
}

#[tokio::test]
async fn test_create_next_attributes_bad_parent_hash() {
let mut attributes_queue = new_attributes_queue(None, None, vec![]);
let bad_hash = b256!("6666666666666666666666666666666666666666666666666666666666666666");
let parent = L2BlockInfo {
block_info: BlockInfo { hash: bad_hash, ..Default::default() },
..Default::default()
};
let batch = SingleBatch::default();
let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
assert_eq!(
result,
StageError::Reset(super::ResetError::BadParentHash(Default::default(), bad_hash))
);
}

#[tokio::test]
async fn test_create_next_attributes_bad_timestamp() {
let mut attributes_queue = new_attributes_queue(None, None, vec![]);
let parent = L2BlockInfo::default();
let batch = SingleBatch { timestamp: 1, ..Default::default() };
let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
assert_eq!(result, StageError::Reset(super::ResetError::BadTimestamp(1, 0)));
}

#[tokio::test]
async fn test_create_next_attributes_bad_parent_timestamp() {
let mut attributes_queue = new_attributes_queue(None, None, vec![]);
let parent = L2BlockInfo {
block_info: BlockInfo { timestamp: 2, ..Default::default() },
..Default::default()
};
let batch = SingleBatch { timestamp: 1, ..Default::default() };
let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
assert_eq!(result, StageError::Reset(super::ResetError::BadTimestamp(1, 2)));
}

#[tokio::test]
async fn test_create_next_attributes_bad_config_timestamp() {
let cfg = RollupConfig { block_time: 1, ..Default::default() };
let mut attributes_queue = new_attributes_queue(Some(cfg), None, vec![]);
let parent = L2BlockInfo {
block_info: BlockInfo { timestamp: 1, ..Default::default() },
..Default::default()
};
let batch = SingleBatch { timestamp: 1, ..Default::default() };
let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
assert_eq!(result, StageError::Reset(super::ResetError::BadTimestamp(1, 2)));
}

#[tokio::test]
async fn test_create_next_attributes_preparation_fails() {
let mut attributes_queue = new_attributes_queue(None, None, vec![]);
let parent = L2BlockInfo::default();
let batch = SingleBatch::default();
let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
assert_eq!(
result,
StageError::AttributesBuild(anyhow::anyhow!("missing payload attribute"))
);
}

#[tokio::test]
async fn test_create_next_attributes_success() {
let cfg = RollupConfig::default();
let telemetry = TestTelemetry::new();
let mock_batch_queue = new_mock_batch_queue(None, vec![]);
let mut payload_attributes = PayloadAttributes::default();
let mock_builder =
MockAttributesBuilder { attributes: vec![Ok(payload_attributes.clone())] };
let mut aq = AttributesQueue::new(cfg, mock_batch_queue, telemetry, mock_builder);
let parent = L2BlockInfo::default();
let txs = vec![RawTransaction::default(), RawTransaction::default()];
let batch = SingleBatch { transactions: txs.clone(), ..Default::default() };
let attributes = aq.create_next_attributes(batch, parent).await.unwrap();
// update the expected attributes
payload_attributes.no_tx_pool = true;
payload_attributes.transactions.extend(txs);
assert_eq!(attributes, payload_attributes);
}

#[tokio::test]
async fn test_next_attributes_load_batch_eof() {
let mut attributes_queue = new_attributes_queue(None, None, vec![]);
let parent = L2BlockInfo::default();
let result = attributes_queue.next_attributes(parent).await.unwrap_err();
assert_eq!(result, StageError::Eof);
}

#[tokio::test]
async fn test_next_attributes_load_batch_last_in_span() {
let cfg = RollupConfig::default();
let telemetry = TestTelemetry::new();
let mock_batch_queue = new_mock_batch_queue(None, vec![Ok(Default::default())]);
let mut pa = PayloadAttributes::default();
let mock_builder = MockAttributesBuilder { attributes: vec![Ok(pa.clone())] };
let mut aq = AttributesQueue::new(cfg, mock_batch_queue, telemetry, mock_builder);
// If we load the batch, we should get the last in span.
// But it won't take it so it will be available in the next_attributes call.
let _ = aq.load_batch(L2BlockInfo::default()).await.unwrap();
assert!(aq.is_last_in_span);
assert!(aq.batch.is_some());
// This should successfully construct the next payload attributes.
// It should also reset the last in span flag and clear the batch.
let attributes = aq.next_attributes(L2BlockInfo::default()).await.unwrap();
pa.no_tx_pool = true;
let populated_attributes = AttributesWithParent {
attributes: pa,
parent: L2BlockInfo::default(),
is_last_in_span: true,
};
assert_eq!(attributes, populated_attributes);
assert!(!aq.is_last_in_span);
assert!(aq.batch.is_none());
}
}
5 changes: 5 additions & 0 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ where
self.prev.origin()
}

/// Returns if the previous batch was the last in the span.
pub fn is_last_in_span(&self) -> bool {
self.next_spans.is_empty()
}

/// Pops the next batch from the current queued up span-batch cache.
/// The parent is used to set the parent hash of the batch.
/// The parent is verified when the batch is later validated.
Expand Down
1 change: 0 additions & 1 deletion crates/derive/src/stages/engine_queue.rs

This file was deleted.

10 changes: 7 additions & 3 deletions crates/derive/src/stages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
//! the produced execution payloads.
//!
//! **Stages:**
//!
//! 1. L1 Traversal
//! 2. L1 Retrieval
//! 3. Frame Queue
//! 4. Channel Bank
//! 5. Channel Reader (Batch Decoding)
//! 6. Batch Queue
//! 7. Payload Attributes Derivation
//! 8. Engine Queue
//! 8. (Omitted) Engine Queue
mod l1_traversal;
pub use l1_traversal::L1Traversal;
Expand All @@ -30,5 +31,8 @@ pub use channel_reader::ChannelReader;
mod batch_queue;
pub use batch_queue::BatchQueue;

mod engine_queue;
mod payload_derivation;
mod attributes_queue;
pub use attributes_queue::AttributesQueue;

#[cfg(test)]
pub mod test_utils;
1 change: 0 additions & 1 deletion crates/derive/src/stages/payload_derivation.rs

This file was deleted.

25 changes: 25 additions & 0 deletions crates/derive/src/stages/test_utils/attributes_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
//! Testing utilities for the attributes queue stage.
use crate::{
stages::attributes_queue::AttributesBuilder,
types::{BlockID, L2BlockInfo, PayloadAttributes},
};
use alloc::vec::Vec;

/// A mock implementation of the [`AttributesBuilder`] for testing.
#[derive(Debug, Default)]
pub struct MockAttributesBuilder {
/// The attributes to return.
pub attributes: Vec<anyhow::Result<PayloadAttributes>>,
}

impl AttributesBuilder for MockAttributesBuilder {
/// Prepares the [PayloadAttributes] for the next payload.
fn prepare_payload_attributes(
&mut self,
_l2_parent: L2BlockInfo,
_epoch: BlockID,
) -> anyhow::Result<PayloadAttributes> {
self.attributes.pop().ok_or(anyhow::anyhow!("missing payload attribute"))?
}
}
Loading

0 comments on commit bbad960

Please sign in to comment.