Skip to content

Commit

Permalink
Merge pull request #85 from ethereum-optimism/refcell/attributes-queu…
Browse files Browse the repository at this point in the history
…e-fixes

fix(derive): Abstractions and Testing
  • Loading branch information
refcell authored Apr 4, 2024
2 parents 85a7a19 + d1470b0 commit dc2bbe6
Show file tree
Hide file tree
Showing 10 changed files with 290 additions and 40 deletions.
231 changes: 194 additions & 37 deletions crates/derive/src/stages/attributes_queue.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
//! Contains the logic for the `AttributesQueue` stage.
use crate::{
stages::batch_queue::BatchQueue,
traits::{
ChainProvider, DataAvailabilityProvider, LogLevel, ResettableStage, SafeBlockFetcher,
TelemetryProvider,
},
traits::{LogLevel, OriginProvider, ResettableStage, TelemetryProvider},
types::{
AttributesWithParent, BlockID, BlockInfo, L2BlockInfo, PayloadAttributes, ResetError,
RollupConfig, SingleBatch, StageError, StageResult, SystemConfig,
Expand All @@ -19,13 +15,23 @@ use core::fmt::Debug;
pub trait AttributesBuilder {
/// Prepare the payload attributes.
fn prepare_payload_attributes(
&self,
&mut self,
l2_parent: L2BlockInfo,
epoch: BlockID,
) -> anyhow::Result<PayloadAttributes>;
}

/// [AttributesQueue] accepts batches from the [super::BatchQueue] stage
/// [AttributesProvider] is a trait abstraction that generalizes the [BatchQueue] stage.
#[async_trait]
pub trait AttributesProvider {
/// Returns the next valid batch upon the given safe head.
async fn next_batch(&mut self, parent: L2BlockInfo) -> StageResult<SingleBatch>;

/// Returns whether the current batch is the last in its span.
fn is_last_in_span(&self) -> bool;
}

/// [AttributesQueue] accepts batches from the [BatchQueue] stage
/// and transforms them into [PayloadAttributes]. The outputted payload
/// attributes cannot be buffered because each batch->attributes transformation
/// pulls in data about the current L2 safe head.
Expand All @@ -36,18 +42,16 @@ pub trait AttributesBuilder {
/// This stage can be reset by clearing its batch buffer.
/// This stage does not need to retain any references to L1 blocks.
#[derive(Debug)]
pub struct AttributesQueue<DAP, CP, BF, T, AB>
pub struct AttributesQueue<P, T, AB>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
BF: SafeBlockFetcher + Debug,
P: AttributesProvider + OriginProvider + Debug,
T: TelemetryProvider + Debug,
AB: AttributesBuilder + Debug,
{
/// The rollup config.
cfg: RollupConfig,
/// The previous stage of the derivation pipeline.
prev: BatchQueue<DAP, CP, BF, T>,
prev: P,
/// Telemetry provider.
telemetry: T,
/// Whether the current batch is the last in its span.
Expand All @@ -58,21 +62,14 @@ where
builder: AB,
}

impl<DAP, CP, BF, T, AB> AttributesQueue<DAP, CP, BF, T, AB>
impl<P, T, AB> AttributesQueue<P, T, AB>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
BF: SafeBlockFetcher + Debug,
P: AttributesProvider + OriginProvider + Debug,
T: TelemetryProvider + Debug,
AB: AttributesBuilder + Debug,
{
/// Create a new [AttributesQueue] stage.
pub fn new(
cfg: RollupConfig,
prev: BatchQueue<DAP, CP, BF, T>,
telemetry: T,
builder: AB,
) -> Self {
pub fn new(cfg: RollupConfig, prev: P, telemetry: T, builder: AB) -> Self {
Self { cfg, prev, telemetry, is_last_in_span: false, batch: None, builder }
}

Expand All @@ -81,7 +78,7 @@ where
self.prev.origin()
}

/// Loads a batch from the previous stage if needed.
/// Loads a [SingleBatch] from the [AttributesProvider] if needed.
pub async fn load_batch(&mut self, parent: L2BlockInfo) -> StageResult<SingleBatch> {
if self.batch.is_none() {
let batch = self.prev.next_batch(parent).await?;
Expand All @@ -91,29 +88,27 @@ where
self.batch.as_ref().cloned().ok_or(StageError::Eof)
}

/// Returns the next payload attributes from the current batch.
/// Returns the next [AttributesWithParent] from the current batch.
pub async fn next_attributes(
&mut self,
parent: L2BlockInfo,
) -> StageResult<AttributesWithParent> {
// Load the batch
// Load the batch.
let batch = self.load_batch(parent).await?;

// Construct the payload attributes from the loaded batch
// Construct the payload attributes from the loaded batch.
let attributes = self.create_next_attributes(batch, parent).await?;
let populated_attributes =
AttributesWithParent { attributes, parent, is_last_in_span: self.is_last_in_span };

// Clear out the local state once we will succeed
// Clear out the local state once payload attributes are prepared.
self.batch = None;
self.is_last_in_span = false;
Ok(populated_attributes)
}

/// Creates the next attributes.
/// Transforms a [SingleBatch] into [PayloadAttributes].
/// This sets `NoTxPool` and appends the batched transactions to the attributes transaction
/// list.
/// Creates the next attributes, transforming a [SingleBatch] into [PayloadAttributes].
/// This sets `no_tx_pool` and appends the batched txs to the attributes tx list.
pub async fn create_next_attributes(
&mut self,
batch: SingleBatch,
Expand All @@ -135,7 +130,10 @@ where

// Prepare the payload attributes
let tx_count = batch.transactions.len();
let mut attributes = self.builder.prepare_payload_attributes(parent, batch.epoch())?;
let mut attributes = self
.builder
.prepare_payload_attributes(parent, batch.epoch())
.map_err(StageError::AttributesBuild)?;
attributes.no_tx_pool = true;
attributes.transactions.extend(batch.transactions);

Expand All @@ -153,19 +151,178 @@ where
}

#[async_trait]
impl<DAP, CP, BF, T, AB> ResettableStage for AttributesQueue<DAP, CP, BF, T, AB>
impl<P, T, AB> ResettableStage for AttributesQueue<P, T, AB>
where
DAP: DataAvailabilityProvider + Send + Debug,
CP: ChainProvider + Send + Debug,
BF: SafeBlockFetcher + Send + Debug,
P: AttributesProvider + OriginProvider + Send + Debug,
T: TelemetryProvider + Send + Debug,
AB: AttributesBuilder + Send + Debug,
{
async fn reset(&mut self, _: BlockInfo, _: SystemConfig) -> StageResult<()> {
self.telemetry.write(Bytes::from("resetting attributes queue"), LogLevel::Info);
// TODO: metrice the reset
// TODO: metrice the reset using telemetry
// telemetry can provide a method of logging and metricing
self.batch = None;
self.is_last_in_span = false;
Err(StageError::Eof)
}
}

#[cfg(test)]
mod tests {
use super::{
AttributesQueue, AttributesWithParent, BlockInfo, L2BlockInfo, PayloadAttributes,
RollupConfig, SingleBatch, StageError, StageResult,
};
use crate::{
stages::test_utils::{new_mock_batch_queue, MockAttributesBuilder, MockBatchQueue},
traits::test_utils::TestTelemetry,
types::RawTransaction,
};
use alloc::{vec, vec::Vec};
use alloy_primitives::b256;

fn new_attributes_queue(
cfg: Option<RollupConfig>,
origin: Option<BlockInfo>,
batches: Vec<StageResult<SingleBatch>>,
) -> AttributesQueue<MockBatchQueue, TestTelemetry, MockAttributesBuilder> {
let cfg = cfg.unwrap_or_default();
let telemetry = TestTelemetry::new();
let mock_batch_queue = new_mock_batch_queue(origin, batches);
let mock_attributes_builder = MockAttributesBuilder::default();
AttributesQueue::new(cfg, mock_batch_queue, telemetry, mock_attributes_builder)
}

#[tokio::test]
async fn test_load_batch_eof() {
let mut attributes_queue = new_attributes_queue(None, None, vec![]);
let parent = L2BlockInfo::default();
let result = attributes_queue.load_batch(parent).await.unwrap_err();
assert_eq!(result, StageError::Eof);
}

#[tokio::test]
async fn test_load_batch_last_in_span() {
let mut attributes_queue = new_attributes_queue(None, None, vec![Ok(Default::default())]);
let parent = L2BlockInfo::default();
let result = attributes_queue.load_batch(parent).await.unwrap();
assert_eq!(result, Default::default());
assert!(attributes_queue.is_last_in_span);
}

#[tokio::test]
async fn test_create_next_attributes_bad_parent_hash() {
let mut attributes_queue = new_attributes_queue(None, None, vec![]);
let bad_hash = b256!("6666666666666666666666666666666666666666666666666666666666666666");
let parent = L2BlockInfo {
block_info: BlockInfo { hash: bad_hash, ..Default::default() },
..Default::default()
};
let batch = SingleBatch::default();
let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
assert_eq!(
result,
StageError::Reset(super::ResetError::BadParentHash(Default::default(), bad_hash))
);
}

#[tokio::test]
async fn test_create_next_attributes_bad_timestamp() {
let mut attributes_queue = new_attributes_queue(None, None, vec![]);
let parent = L2BlockInfo::default();
let batch = SingleBatch { timestamp: 1, ..Default::default() };
let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
assert_eq!(result, StageError::Reset(super::ResetError::BadTimestamp(1, 0)));
}

#[tokio::test]
async fn test_create_next_attributes_bad_parent_timestamp() {
let mut attributes_queue = new_attributes_queue(None, None, vec![]);
let parent = L2BlockInfo {
block_info: BlockInfo { timestamp: 2, ..Default::default() },
..Default::default()
};
let batch = SingleBatch { timestamp: 1, ..Default::default() };
let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
assert_eq!(result, StageError::Reset(super::ResetError::BadTimestamp(1, 2)));
}

#[tokio::test]
async fn test_create_next_attributes_bad_config_timestamp() {
let cfg = RollupConfig { block_time: 1, ..Default::default() };
let mut attributes_queue = new_attributes_queue(Some(cfg), None, vec![]);
let parent = L2BlockInfo {
block_info: BlockInfo { timestamp: 1, ..Default::default() },
..Default::default()
};
let batch = SingleBatch { timestamp: 1, ..Default::default() };
let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
assert_eq!(result, StageError::Reset(super::ResetError::BadTimestamp(1, 2)));
}

#[tokio::test]
async fn test_create_next_attributes_preparation_fails() {
let mut attributes_queue = new_attributes_queue(None, None, vec![]);
let parent = L2BlockInfo::default();
let batch = SingleBatch::default();
let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
assert_eq!(
result,
StageError::AttributesBuild(anyhow::anyhow!("missing payload attribute"))
);
}

#[tokio::test]
async fn test_create_next_attributes_success() {
let cfg = RollupConfig::default();
let telemetry = TestTelemetry::new();
let mock_batch_queue = new_mock_batch_queue(None, vec![]);
let mut payload_attributes = PayloadAttributes::default();
let mock_builder =
MockAttributesBuilder { attributes: vec![Ok(payload_attributes.clone())] };
let mut aq = AttributesQueue::new(cfg, mock_batch_queue, telemetry, mock_builder);
let parent = L2BlockInfo::default();
let txs = vec![RawTransaction::default(), RawTransaction::default()];
let batch = SingleBatch { transactions: txs.clone(), ..Default::default() };
let attributes = aq.create_next_attributes(batch, parent).await.unwrap();
// update the expected attributes
payload_attributes.no_tx_pool = true;
payload_attributes.transactions.extend(txs);
assert_eq!(attributes, payload_attributes);
}

#[tokio::test]
async fn test_next_attributes_load_batch_eof() {
let mut attributes_queue = new_attributes_queue(None, None, vec![]);
let parent = L2BlockInfo::default();
let result = attributes_queue.next_attributes(parent).await.unwrap_err();
assert_eq!(result, StageError::Eof);
}

#[tokio::test]
async fn test_next_attributes_load_batch_last_in_span() {
let cfg = RollupConfig::default();
let telemetry = TestTelemetry::new();
let mock_batch_queue = new_mock_batch_queue(None, vec![Ok(Default::default())]);
let mut pa = PayloadAttributes::default();
let mock_builder = MockAttributesBuilder { attributes: vec![Ok(pa.clone())] };
let mut aq = AttributesQueue::new(cfg, mock_batch_queue, telemetry, mock_builder);
// If we load the batch, we should get the last in span.
// But it won't take it so it will be available in the next_attributes call.
let _ = aq.load_batch(L2BlockInfo::default()).await.unwrap();
assert!(aq.is_last_in_span);
assert!(aq.batch.is_some());
// This should successfully construct the next payload attributes.
// It should also reset the last in span flag and clear the batch.
let attributes = aq.next_attributes(L2BlockInfo::default()).await.unwrap();
pa.no_tx_pool = true;
let populated_attributes = AttributesWithParent {
attributes: pa,
parent: L2BlockInfo::default(),
is_last_in_span: true,
};
assert_eq!(attributes, populated_attributes);
assert!(!aq.is_last_in_span);
assert!(aq.batch.is_none());
}
}
3 changes: 3 additions & 0 deletions crates/derive/src/stages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,6 @@ pub use batch_queue::BatchQueue;

mod attributes_queue;
pub use attributes_queue::AttributesQueue;

#[cfg(test)]
pub mod test_utils;
25 changes: 25 additions & 0 deletions crates/derive/src/stages/test_utils/attributes_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
//! Testing utilities for the attributes queue stage.
use crate::{
stages::attributes_queue::AttributesBuilder,
types::{BlockID, L2BlockInfo, PayloadAttributes},
};
use alloc::vec::Vec;

/// A mock implementation of the [`AttributesBuilder`] for testing.
#[derive(Debug, Default)]
pub struct MockAttributesBuilder {
/// The attributes to return.
pub attributes: Vec<anyhow::Result<PayloadAttributes>>,
}

impl AttributesBuilder for MockAttributesBuilder {
/// Prepares the [PayloadAttributes] for the next payload.
fn prepare_payload_attributes(
&mut self,
_l2_parent: L2BlockInfo,
_epoch: BlockID,
) -> anyhow::Result<PayloadAttributes> {
self.attributes.pop().ok_or(anyhow::anyhow!("missing payload attribute"))?
}
}
Loading

0 comments on commit dc2bbe6

Please sign in to comment.