Skip to content

Commit

Permalink
fix(derive): impl origin provider trait across stages
Browse files Browse the repository at this point in the history
  • Loading branch information
refcell committed Apr 4, 2024
1 parent bbad960 commit 5fcd2b0
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 43 deletions.
16 changes: 11 additions & 5 deletions crates/derive/src/stages/attributes_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,6 @@ where
Self { cfg, prev, telemetry, is_last_in_span: false, batch: None, builder }
}

/// Returns the L1 origin [BlockInfo].
pub fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}

/// Loads a [SingleBatch] from the [AttributesProvider] if needed.
pub async fn load_batch(&mut self, parent: L2BlockInfo) -> StageResult<SingleBatch> {
if self.batch.is_none() {
Expand Down Expand Up @@ -150,6 +145,17 @@ where
}
}

impl<P, T, AB> OriginProvider for AttributesQueue<P, T, AB>
where
P: AttributesProvider + OriginProvider + Debug,
T: TelemetryProvider + Debug,
AB: AttributesBuilder + Debug,
{
fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}
}

#[async_trait]
impl<P, T, AB> ResettableStage for AttributesQueue<P, T, AB>
where
Expand Down
19 changes: 13 additions & 6 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::{
stages::channel_reader::ChannelReader,
traits::{
ChainProvider, DataAvailabilityProvider, ResettableStage, SafeBlockFetcher,
ChainProvider, DataAvailabilityProvider, OriginProvider, ResettableStage, SafeBlockFetcher,
TelemetryProvider,
},
types::{
Expand Down Expand Up @@ -83,11 +83,6 @@ where
}
}

/// Returns the L1 origin [BlockInfo].
pub fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}

/// Returns if the previous batch was the last in the span.
pub fn is_last_in_span(&self) -> bool {
self.next_spans.is_empty()
Expand Down Expand Up @@ -353,6 +348,18 @@ where
}
}

impl<DAP, CP, BF, T> OriginProvider for BatchQueue<DAP, CP, BF, T>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
BF: SafeBlockFetcher + Debug,
T: TelemetryProvider + Debug,
{
fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}
}

#[async_trait]
impl<DAP, CP, BF, T> ResettableStage for BatchQueue<DAP, CP, BF, T>
where
Expand Down
19 changes: 13 additions & 6 deletions crates/derive/src/stages/channel_bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use super::frame_queue::FrameQueue;
use crate::{
params::{ChannelID, MAX_CHANNEL_BANK_SIZE},
traits::{
ChainProvider, DataAvailabilityProvider, LogLevel, ResettableStage, TelemetryProvider,
ChainProvider, DataAvailabilityProvider, LogLevel, OriginProvider, ResettableStage,
TelemetryProvider,
},
types::{BlockInfo, Channel, Frame, RollupConfig, StageError, StageResult, SystemConfig},
};
Expand Down Expand Up @@ -56,11 +57,6 @@ where
Self { cfg, telemetry, channels: HashMap::new(), channel_queue: VecDeque::new(), prev }
}

/// Returns the L1 origin [BlockInfo].
pub fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}

/// Returns the size of the channel bank by accumulating over all channels.
pub fn size(&self) -> usize {
self.channels.iter().fold(0, |acc, (_, c)| acc + c.size())
Expand Down Expand Up @@ -202,6 +198,17 @@ where
}
}

impl<DAP, CP, T> OriginProvider for ChannelBank<DAP, CP, T>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
T: TelemetryProvider + Debug,
{
fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}
}

#[async_trait]
impl<DAP, CP, T> ResettableStage for ChannelBank<DAP, CP, T>
where
Expand Down
20 changes: 14 additions & 6 deletions crates/derive/src/stages/channel_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
use super::channel_bank::ChannelBank;
use crate::{
traits::{ChainProvider, DataAvailabilityProvider, LogLevel, TelemetryProvider},
traits::{
ChainProvider, DataAvailabilityProvider, LogLevel, OriginProvider, TelemetryProvider,
},
types::{Batch, BlockInfo, StageError, StageResult},
};

Expand Down Expand Up @@ -70,18 +72,24 @@ where
Ok(())
}

/// Returns the L1 origin [BlockInfo].
pub fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}

/// Forces the read to continue with the next channel, resetting any
/// decoding / decompression state to a fresh start.
pub fn next_channel(&mut self) {
self.next_batch = None;
}
}

impl<DAP, CP, T> OriginProvider for ChannelReader<DAP, CP, T>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
T: TelemetryProvider + Debug,
{
fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}
}

/// Batch Reader provides a function that iteratively consumes batches from the reader.
/// The L1Inclusion block is also provided at creation time.
/// Warning: the batch reader can read every batch-type.
Expand Down
19 changes: 13 additions & 6 deletions crates/derive/src/stages/frame_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use core::fmt::Debug;
use super::l1_retrieval::L1Retrieval;
use crate::{
traits::{
ChainProvider, DataAvailabilityProvider, LogLevel, ResettableStage, TelemetryProvider,
ChainProvider, DataAvailabilityProvider, LogLevel, OriginProvider, ResettableStage,
TelemetryProvider,
},
types::{BlockInfo, Frame, StageError, StageResult, SystemConfig},
};
Expand Down Expand Up @@ -41,11 +42,6 @@ where
Self { prev, telemetry, queue: VecDeque::new() }
}

/// Returns the L1 [BlockInfo] origin.
pub fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}

/// Fetches the next frame from the [FrameQueue].
pub async fn next_frame(&mut self) -> StageResult<Frame> {
if self.queue.is_empty() {
Expand Down Expand Up @@ -81,6 +77,17 @@ where
}
}

impl<DAP, CP, T> OriginProvider for FrameQueue<DAP, CP, T>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
T: TelemetryProvider + Debug,
{
fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}
}

#[async_trait]
impl<DAP, CP, T> ResettableStage for FrameQueue<DAP, CP, T>
where
Expand Down
21 changes: 13 additions & 8 deletions crates/derive/src/stages/l1_retrieval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
use super::L1Traversal;
use crate::{
traits::{
ChainProvider, DataAvailabilityProvider, DataIter, LogLevel, ResettableStage,
TelemetryProvider,
ChainProvider, DataAvailabilityProvider, DataIter, LogLevel, OriginProvider,
ResettableStage, TelemetryProvider,
},
types::{BlockInfo, StageError, StageResult, SystemConfig},
};
Expand Down Expand Up @@ -47,12 +47,6 @@ where
Self { prev, telemetry, provider, data: None }
}

/// Returns the current L1 [BlockInfo] origin from the previous
/// [L1Traversal] stage.
pub fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}

/// Retrieves the next data item from the [L1Retrieval] stage.
/// Returns an error if there is no data.
pub async fn next_data(&mut self) -> StageResult<Bytes> {
Expand Down Expand Up @@ -81,6 +75,17 @@ where
}
}

impl<DAP, CP, T> OriginProvider for L1Retrieval<DAP, CP, T>
where
DAP: DataAvailabilityProvider,
CP: ChainProvider,
T: TelemetryProvider,
{
fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}
}

#[async_trait]
impl<DAP, CP, T> ResettableStage for L1Retrieval<DAP, CP, T>
where
Expand Down
13 changes: 7 additions & 6 deletions crates/derive/src/stages/l1_traversal.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Contains the [L1Traversal] stage of the derivation pipeline.
use crate::{
traits::{ChainProvider, LogLevel, ResettableStage, TelemetryProvider},
traits::{ChainProvider, LogLevel, OriginProvider, ResettableStage, TelemetryProvider},
types::{BlockInfo, RollupConfig, StageError, StageResult, SystemConfig},
};
use alloc::{boxed::Box, sync::Arc};
Expand Down Expand Up @@ -61,11 +61,6 @@ impl<F: ChainProvider, T: TelemetryProvider> L1Traversal<F, T> {
}
}

/// Returns the current L1 [BlockInfo] in the [L1Traversal] stage, if it exists.
pub fn origin(&self) -> Option<&BlockInfo> {
self.block.as_ref()
}

/// Advances the internal state of the [L1Traversal] stage to the next L1 block.
/// This function fetches the next L1 [BlockInfo] from the data source and updates the
/// [SystemConfig] with the receipts from the block.
Expand Down Expand Up @@ -112,6 +107,12 @@ impl<F: ChainProvider, T: TelemetryProvider> L1Traversal<F, T> {
}
}

impl<F: ChainProvider, T: TelemetryProvider> OriginProvider for L1Traversal<F, T> {
fn origin(&self) -> Option<&BlockInfo> {
self.block.as_ref()
}
}

#[async_trait]
impl<F: ChainProvider + Send, T: TelemetryProvider + Send> ResettableStage for L1Traversal<F, T> {
async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> StageResult<()> {
Expand Down

0 comments on commit 5fcd2b0

Please sign in to comment.