diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index f0067c399..4950fc8fd 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -2,13 +2,17 @@ use crate::{ stages::batch_queue::BatchQueue, - traits::{ChainProvider, DataAvailabilityProvider, ResettableStage, SafeBlockFetcher}, + traits::{ + ChainProvider, DataAvailabilityProvider, LogLevel, ResettableStage, SafeBlockFetcher, + TelemetryProvider, + }, types::{ AttributesWithParent, BlockID, BlockInfo, L2BlockInfo, PayloadAttributes, ResetError, RollupConfig, SingleBatch, StageError, StageResult, SystemConfig, }, }; use alloc::boxed::Box; +use alloy_primitives::Bytes; use async_trait::async_trait; use core::fmt::Debug; @@ -32,17 +36,20 @@ pub trait AttributesBuilder { /// This stage can be reset by clearing its batch buffer. /// This stage does not need to retain any references to L1 blocks. #[derive(Debug)] -pub struct AttributesQueue +pub struct AttributesQueue where DAP: DataAvailabilityProvider + Debug, CP: ChainProvider + Debug, BF: SafeBlockFetcher + Debug, + T: TelemetryProvider + Debug, AB: AttributesBuilder + Debug, { /// The rollup config. cfg: RollupConfig, /// The previous stage of the derivation pipeline. - prev: BatchQueue, + prev: BatchQueue, + /// Telemetry provider. + telemetry: T, /// Whether the current batch is the last in its span. is_last_in_span: bool, /// The current batch being processed. @@ -51,16 +58,22 @@ where builder: AB, } -impl AttributesQueue +impl AttributesQueue where DAP: DataAvailabilityProvider + Debug, CP: ChainProvider + Debug, BF: SafeBlockFetcher + Debug, + T: TelemetryProvider + Debug, AB: AttributesBuilder + Debug, { /// Create a new [AttributesQueue] stage. - pub fn new(cfg: RollupConfig, prev: BatchQueue, builder: AB) -> Self { - Self { cfg, prev, is_last_in_span: false, batch: None, builder } + pub fn new( + cfg: RollupConfig, + prev: BatchQueue, + telemetry: T, + builder: AB, + ) -> Self { + Self { cfg, prev, telemetry, is_last_in_span: false, batch: None, builder } } /// Returns the L1 origin [BlockInfo]. @@ -121,31 +134,35 @@ where } // Prepare the payload attributes + let tx_count = batch.transactions.len(); let mut attributes = self.builder.prepare_payload_attributes(parent, batch.epoch())?; attributes.no_tx_pool = true; attributes.transactions.extend(batch.transactions); - // TODO: log - // log::info!( - // "generated attributes in payload queue", - // "txs" => tx_count, - // "timestamp" => batch.timestamp, - // ); + self.telemetry.write( + Bytes::from(alloc::format!( + "generated attributes in payload queue: txs={}, timestamp={}", + tx_count, + batch.timestamp, + )), + LogLevel::Info, + ); Ok(attributes) } } #[async_trait] -impl ResettableStage for AttributesQueue +impl ResettableStage for AttributesQueue where DAP: DataAvailabilityProvider + Send + Debug, CP: ChainProvider + Send + Debug, BF: SafeBlockFetcher + Send + Debug, + T: TelemetryProvider + Send + Debug, AB: AttributesBuilder + Send + Debug, { async fn reset(&mut self, _: BlockInfo, _: SystemConfig) -> StageResult<()> { - // TODO: log the reset + self.telemetry.write(Bytes::from("resetting attributes queue"), LogLevel::Info); // TODO: metrice the reset self.batch = None; self.is_last_in_span = false;