From 5fcd2b08b348fe28e68c13cff1b03af594d4061b Mon Sep 17 00:00:00 2001 From: refcell Date: Thu, 4 Apr 2024 11:24:43 -0400 Subject: [PATCH] fix(derive): impl origin provider trait across stages --- crates/derive/src/stages/attributes_queue.rs | 16 ++++++++++----- crates/derive/src/stages/batch_queue.rs | 19 ++++++++++++------ crates/derive/src/stages/channel_bank.rs | 19 ++++++++++++------ crates/derive/src/stages/channel_reader.rs | 20 +++++++++++++------ crates/derive/src/stages/frame_queue.rs | 19 ++++++++++++------ crates/derive/src/stages/l1_retrieval.rs | 21 ++++++++++++-------- crates/derive/src/stages/l1_traversal.rs | 13 ++++++------ 7 files changed, 84 insertions(+), 43 deletions(-) diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index 4771a1b37..7d3224373 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -73,11 +73,6 @@ where Self { cfg, prev, telemetry, is_last_in_span: false, batch: None, builder } } - /// Returns the L1 origin [BlockInfo]. - pub fn origin(&self) -> Option<&BlockInfo> { - self.prev.origin() - } - /// Loads a [SingleBatch] from the [AttributesProvider] if needed. pub async fn load_batch(&mut self, parent: L2BlockInfo) -> StageResult { if self.batch.is_none() { @@ -150,6 +145,17 @@ where } } +impl OriginProvider for AttributesQueue +where + P: AttributesProvider + OriginProvider + Debug, + T: TelemetryProvider + Debug, + AB: AttributesBuilder + Debug, +{ + fn origin(&self) -> Option<&BlockInfo> { + self.prev.origin() + } +} + #[async_trait] impl ResettableStage for AttributesQueue where diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index 06a769d2f..509d22ff9 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -3,7 +3,7 @@ use crate::{ stages::channel_reader::ChannelReader, traits::{ - ChainProvider, DataAvailabilityProvider, ResettableStage, SafeBlockFetcher, + ChainProvider, DataAvailabilityProvider, OriginProvider, ResettableStage, SafeBlockFetcher, TelemetryProvider, }, types::{ @@ -83,11 +83,6 @@ where } } - /// Returns the L1 origin [BlockInfo]. - pub fn origin(&self) -> Option<&BlockInfo> { - self.prev.origin() - } - /// Returns if the previous batch was the last in the span. pub fn is_last_in_span(&self) -> bool { self.next_spans.is_empty() @@ -353,6 +348,18 @@ where } } +impl OriginProvider for BatchQueue +where + DAP: DataAvailabilityProvider + Debug, + CP: ChainProvider + Debug, + BF: SafeBlockFetcher + Debug, + T: TelemetryProvider + Debug, +{ + fn origin(&self) -> Option<&BlockInfo> { + self.prev.origin() + } +} + #[async_trait] impl ResettableStage for BatchQueue where diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index fd9518328..a3126d058 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -4,7 +4,8 @@ use super::frame_queue::FrameQueue; use crate::{ params::{ChannelID, MAX_CHANNEL_BANK_SIZE}, traits::{ - ChainProvider, DataAvailabilityProvider, LogLevel, ResettableStage, TelemetryProvider, + ChainProvider, DataAvailabilityProvider, LogLevel, OriginProvider, ResettableStage, + TelemetryProvider, }, types::{BlockInfo, Channel, Frame, RollupConfig, StageError, StageResult, SystemConfig}, }; @@ -56,11 +57,6 @@ where Self { cfg, telemetry, channels: HashMap::new(), channel_queue: VecDeque::new(), prev } } - /// Returns the L1 origin [BlockInfo]. - pub fn origin(&self) -> Option<&BlockInfo> { - self.prev.origin() - } - /// Returns the size of the channel bank by accumulating over all channels. pub fn size(&self) -> usize { self.channels.iter().fold(0, |acc, (_, c)| acc + c.size()) @@ -202,6 +198,17 @@ where } } +impl OriginProvider for ChannelBank +where + DAP: DataAvailabilityProvider + Debug, + CP: ChainProvider + Debug, + T: TelemetryProvider + Debug, +{ + fn origin(&self) -> Option<&BlockInfo> { + self.prev.origin() + } +} + #[async_trait] impl ResettableStage for ChannelBank where diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index 5708f97e5..0d63b9d46 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -2,7 +2,9 @@ use super::channel_bank::ChannelBank; use crate::{ - traits::{ChainProvider, DataAvailabilityProvider, LogLevel, TelemetryProvider}, + traits::{ + ChainProvider, DataAvailabilityProvider, LogLevel, OriginProvider, TelemetryProvider, + }, types::{Batch, BlockInfo, StageError, StageResult}, }; @@ -70,11 +72,6 @@ where Ok(()) } - /// Returns the L1 origin [BlockInfo]. - pub fn origin(&self) -> Option<&BlockInfo> { - self.prev.origin() - } - /// Forces the read to continue with the next channel, resetting any /// decoding / decompression state to a fresh start. pub fn next_channel(&mut self) { @@ -82,6 +79,17 @@ where } } +impl OriginProvider for ChannelReader +where + DAP: DataAvailabilityProvider + Debug, + CP: ChainProvider + Debug, + T: TelemetryProvider + Debug, +{ + fn origin(&self) -> Option<&BlockInfo> { + self.prev.origin() + } +} + /// Batch Reader provides a function that iteratively consumes batches from the reader. /// The L1Inclusion block is also provided at creation time. /// Warning: the batch reader can read every batch-type. diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index 73d1b9fd0..65d4f8c26 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -5,7 +5,8 @@ use core::fmt::Debug; use super::l1_retrieval::L1Retrieval; use crate::{ traits::{ - ChainProvider, DataAvailabilityProvider, LogLevel, ResettableStage, TelemetryProvider, + ChainProvider, DataAvailabilityProvider, LogLevel, OriginProvider, ResettableStage, + TelemetryProvider, }, types::{BlockInfo, Frame, StageError, StageResult, SystemConfig}, }; @@ -41,11 +42,6 @@ where Self { prev, telemetry, queue: VecDeque::new() } } - /// Returns the L1 [BlockInfo] origin. - pub fn origin(&self) -> Option<&BlockInfo> { - self.prev.origin() - } - /// Fetches the next frame from the [FrameQueue]. pub async fn next_frame(&mut self) -> StageResult { if self.queue.is_empty() { @@ -81,6 +77,17 @@ where } } +impl OriginProvider for FrameQueue +where + DAP: DataAvailabilityProvider + Debug, + CP: ChainProvider + Debug, + T: TelemetryProvider + Debug, +{ + fn origin(&self) -> Option<&BlockInfo> { + self.prev.origin() + } +} + #[async_trait] impl ResettableStage for FrameQueue where diff --git a/crates/derive/src/stages/l1_retrieval.rs b/crates/derive/src/stages/l1_retrieval.rs index 39f09bb57..f0d6dfd5c 100644 --- a/crates/derive/src/stages/l1_retrieval.rs +++ b/crates/derive/src/stages/l1_retrieval.rs @@ -3,8 +3,8 @@ use super::L1Traversal; use crate::{ traits::{ - ChainProvider, DataAvailabilityProvider, DataIter, LogLevel, ResettableStage, - TelemetryProvider, + ChainProvider, DataAvailabilityProvider, DataIter, LogLevel, OriginProvider, + ResettableStage, TelemetryProvider, }, types::{BlockInfo, StageError, StageResult, SystemConfig}, }; @@ -47,12 +47,6 @@ where Self { prev, telemetry, provider, data: None } } - /// Returns the current L1 [BlockInfo] origin from the previous - /// [L1Traversal] stage. - pub fn origin(&self) -> Option<&BlockInfo> { - self.prev.origin() - } - /// Retrieves the next data item from the [L1Retrieval] stage. /// Returns an error if there is no data. pub async fn next_data(&mut self) -> StageResult { @@ -81,6 +75,17 @@ where } } +impl OriginProvider for L1Retrieval +where + DAP: DataAvailabilityProvider, + CP: ChainProvider, + T: TelemetryProvider, +{ + fn origin(&self) -> Option<&BlockInfo> { + self.prev.origin() + } +} + #[async_trait] impl ResettableStage for L1Retrieval where diff --git a/crates/derive/src/stages/l1_traversal.rs b/crates/derive/src/stages/l1_traversal.rs index 7cab80805..c4e04d32e 100644 --- a/crates/derive/src/stages/l1_traversal.rs +++ b/crates/derive/src/stages/l1_traversal.rs @@ -1,7 +1,7 @@ //! Contains the [L1Traversal] stage of the derivation pipeline. use crate::{ - traits::{ChainProvider, LogLevel, ResettableStage, TelemetryProvider}, + traits::{ChainProvider, LogLevel, OriginProvider, ResettableStage, TelemetryProvider}, types::{BlockInfo, RollupConfig, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, sync::Arc}; @@ -61,11 +61,6 @@ impl L1Traversal { } } - /// Returns the current L1 [BlockInfo] in the [L1Traversal] stage, if it exists. - pub fn origin(&self) -> Option<&BlockInfo> { - self.block.as_ref() - } - /// Advances the internal state of the [L1Traversal] stage to the next L1 block. /// This function fetches the next L1 [BlockInfo] from the data source and updates the /// [SystemConfig] with the receipts from the block. @@ -112,6 +107,12 @@ impl L1Traversal { } } +impl OriginProvider for L1Traversal { + fn origin(&self) -> Option<&BlockInfo> { + self.block.as_ref() + } +} + #[async_trait] impl ResettableStage for L1Traversal { async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> StageResult<()> {