From 1bae07f3bae57236b2abdb4108c950f84539ba7c Mon Sep 17 00:00:00 2001 From: clabby Date: Sat, 24 Feb 2024 19:34:41 -0700 Subject: [PATCH] Add frame queue frame queue --- crates/derive/src/stages/frame_queue.rs | 69 +++++++++++++++++++++++-- crates/derive/src/types/frame.rs | 14 ++--- 2 files changed, 73 insertions(+), 10 deletions(-) diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index 4bfc13d3b..be8e4b769 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -1,17 +1,80 @@ //! This module contains the [FrameQueue] stage of the derivation pipeline. use super::l1_retrieval::L1Retrieval; -use crate::traits::{ChainProvider, DataAvailabilityProvider}; -use alloc::collections::VecDeque; +use crate::{ + traits::{ChainProvider, DataAvailabilityProvider, ResettableStage}, + types::{BlockInfo, Frame, SystemConfig}, +}; +use alloc::{boxed::Box, collections::VecDeque}; use alloy_primitives::Bytes; +use anyhow::{anyhow, bail, Result}; +use async_trait::async_trait; pub struct FrameQueue where + T: Into, DAP: DataAvailabilityProvider, CP: ChainProvider, { /// The previous stage in the pipeline. pub prev: L1Retrieval, /// The current frame queue. - queue: VecDeque, + queue: VecDeque, +} + +impl FrameQueue +where + T: Into, + DAP: DataAvailabilityProvider, + CP: ChainProvider, +{ + /// Create a new frame queue stage. + pub fn new(prev: L1Retrieval) -> Self { + Self { + prev, + queue: VecDeque::new(), + } + } + + /// Returns the L1 origin [BlockInfo]. + pub fn origin(&self) -> Option<&BlockInfo> { + self.prev.origin() + } + + /// Fetches the next frame from the frame queue. + pub async fn next_frame(&mut self) -> Result { + if self.queue.is_empty() { + match self.prev.next_data().await { + Ok(data) => { + if let Ok(frames) = Frame::parse_frames(data.as_ref()) { + self.queue.extend(frames); + } + } + Err(e) => { + bail!("Error fetching next data: {e}") + } + } + } + // If we did not add more frames but still have more data, retry this function. + if self.queue.is_empty() { + bail!("Not enough data"); + } + + self.queue + .pop_front() + .ok_or_else(|| anyhow!("Frame queue is impossibly empty.")) + } +} + +#[async_trait] +impl ResettableStage for FrameQueue +where + T: Into, + DAP: DataAvailabilityProvider + Send, + CP: ChainProvider + Send, +{ + async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> Result<()> { + self.queue = VecDeque::default(); + Ok(()) + } } diff --git a/crates/derive/src/types/frame.rs b/crates/derive/src/types/frame.rs index ae8a0195e..3c7fae171 100644 --- a/crates/derive/src/types/frame.rs +++ b/crates/derive/src/types/frame.rs @@ -83,7 +83,7 @@ impl Frame { /// Frames are stored in L1 transactions with the following format: /// * `data = DerivationVersion0 ++ Frame(s)` /// Where there is one or more frames concatenated together. - fn parse_frames(encoded: &[u8]) -> Result> { + pub fn parse_frames(encoded: &[u8]) -> Result> { if encoded.is_empty() { bail!("No frames to parse"); } @@ -120,9 +120,9 @@ mod test { #[test] fn test_encode_frame_roundtrip() { let frame = Frame { - id: [1; 16], - number: 0, - data: std::vec![], + id: [0xFF; 16], + number: 0xEE, + data: std::vec![0xDD; 50], is_last: true, }; @@ -133,9 +133,9 @@ mod test { #[test] fn test_decode_many() { let frame = Frame { - id: [1; 16], - number: 0, - data: std::vec![], + id: [0xFF; 16], + number: 0xEE, + data: std::vec![0xDD; 50], is_last: true, }; let mut bytes = Vec::new();