Skip to content

Commit

Permalink
fix(derive): attributes queue
Browse files Browse the repository at this point in the history
  • Loading branch information
refcell committed Apr 4, 2024
1 parent ccc6af0 commit 85a7a19
Showing 1 changed file with 31 additions and 14 deletions.
45 changes: 31 additions & 14 deletions crates/derive/src/stages/attributes_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@
use crate::{
stages::batch_queue::BatchQueue,
traits::{ChainProvider, DataAvailabilityProvider, ResettableStage, SafeBlockFetcher},
traits::{
ChainProvider, DataAvailabilityProvider, LogLevel, ResettableStage, SafeBlockFetcher,
TelemetryProvider,
},
types::{
AttributesWithParent, BlockID, BlockInfo, L2BlockInfo, PayloadAttributes, ResetError,
RollupConfig, SingleBatch, StageError, StageResult, SystemConfig,
},
};
use alloc::boxed::Box;
use alloy_primitives::Bytes;
use async_trait::async_trait;
use core::fmt::Debug;

Expand All @@ -32,17 +36,20 @@ pub trait AttributesBuilder {
/// This stage can be reset by clearing its batch buffer.
/// This stage does not need to retain any references to L1 blocks.
#[derive(Debug)]
pub struct AttributesQueue<DAP, CP, BF, AB>
pub struct AttributesQueue<DAP, CP, BF, T, AB>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
BF: SafeBlockFetcher + Debug,
T: TelemetryProvider + Debug,
AB: AttributesBuilder + Debug,
{
/// The rollup config.
cfg: RollupConfig,
/// The previous stage of the derivation pipeline.
prev: BatchQueue<DAP, CP, BF>,
prev: BatchQueue<DAP, CP, BF, T>,
/// Telemetry provider.
telemetry: T,
/// Whether the current batch is the last in its span.
is_last_in_span: bool,
/// The current batch being processed.
Expand All @@ -51,16 +58,22 @@ where
builder: AB,
}

impl<DAP, CP, BF, AB> AttributesQueue<DAP, CP, BF, AB>
impl<DAP, CP, BF, T, AB> AttributesQueue<DAP, CP, BF, T, AB>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
BF: SafeBlockFetcher + Debug,
T: TelemetryProvider + Debug,
AB: AttributesBuilder + Debug,
{
/// Create a new [AttributesQueue] stage.
pub fn new(cfg: RollupConfig, prev: BatchQueue<DAP, CP, BF>, builder: AB) -> Self {
Self { cfg, prev, is_last_in_span: false, batch: None, builder }
pub fn new(
cfg: RollupConfig,
prev: BatchQueue<DAP, CP, BF, T>,
telemetry: T,
builder: AB,
) -> Self {
Self { cfg, prev, telemetry, is_last_in_span: false, batch: None, builder }
}

/// Returns the L1 origin [BlockInfo].
Expand Down Expand Up @@ -121,31 +134,35 @@ where
}

// Prepare the payload attributes
let tx_count = batch.transactions.len();
let mut attributes = self.builder.prepare_payload_attributes(parent, batch.epoch())?;
attributes.no_tx_pool = true;
attributes.transactions.extend(batch.transactions);

// TODO: log
// log::info!(
// "generated attributes in payload queue",
// "txs" => tx_count,
// "timestamp" => batch.timestamp,
// );
self.telemetry.write(
Bytes::from(alloc::format!(
"generated attributes in payload queue: txs={}, timestamp={}",
tx_count,
batch.timestamp,
)),
LogLevel::Info,
);

Ok(attributes)
}
}

#[async_trait]
impl<DAP, CP, BF, AB> ResettableStage for AttributesQueue<DAP, CP, BF, AB>
impl<DAP, CP, BF, T, AB> ResettableStage for AttributesQueue<DAP, CP, BF, T, AB>
where
DAP: DataAvailabilityProvider + Send + Debug,
CP: ChainProvider + Send + Debug,
BF: SafeBlockFetcher + Send + Debug,
T: TelemetryProvider + Send + Debug,
AB: AttributesBuilder + Send + Debug,
{
async fn reset(&mut self, _: BlockInfo, _: SystemConfig) -> StageResult<()> {
// TODO: log the reset
self.telemetry.write(Bytes::from("resetting attributes queue"), LogLevel::Info);
// TODO: metrice the reset
self.batch = None;
self.is_last_in_span = false;
Expand Down

0 comments on commit 85a7a19

Please sign in to comment.