Skip to content

Commit

Permalink
Add frame queue
Browse files Browse the repository at this point in the history
frame queue
  • Loading branch information
clabby committed Feb 25, 2024
1 parent dfc2f49 commit b10125d
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 11 deletions.
69 changes: 66 additions & 3 deletions crates/derive/src/stages/frame_queue.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,80 @@
//! This module contains the [FrameQueue] stage of the derivation pipeline.
use super::l1_retrieval::L1Retrieval;
use crate::traits::{ChainProvider, DataAvailabilityProvider};
use alloc::collections::VecDeque;
use crate::{
traits::{ChainProvider, DataAvailabilityProvider, ResettableStage},
types::{BlockInfo, Frame, SystemConfig},
};
use alloc::{boxed::Box, collections::VecDeque};
use alloy_primitives::Bytes;
use anyhow::{anyhow, bail, Result};
use async_trait::async_trait;

pub struct FrameQueue<T, DAP, CP>
where
T: Into<Bytes>,
DAP: DataAvailabilityProvider,
CP: ChainProvider,
{
/// The previous stage in the pipeline.
pub prev: L1Retrieval<T, DAP, CP>,
/// The current frame queue.
queue: VecDeque<Bytes>,
queue: VecDeque<Frame>,
}

impl<T, DAP, CP> FrameQueue<T, DAP, CP>
where
T: Into<Bytes>,
DAP: DataAvailabilityProvider,
CP: ChainProvider,
{
/// Create a new frame queue stage.
pub fn new(prev: L1Retrieval<T, DAP, CP>) -> Self {
Self {
prev,
queue: VecDeque::new(),
}
}

/// Returns the L1 origin [BlockInfo].
pub fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}

/// Fetches the next frame from the frame queue.
pub async fn next_frame(&mut self) -> Result<Frame> {
if self.queue.is_empty() {
match self.prev.next_data().await {
Ok(data) => {
if let Ok(frames) = Frame::parse_frames(data.as_ref()) {
self.queue.extend(frames);
}
}
Err(e) => {
bail!("Error fetching next data: {e}")
}
}
}
// If we did not add more frames but still have more data, retry this function.
if self.queue.is_empty() {
bail!("Not enough data");
}

self.queue
.pop_front()
.ok_or_else(|| anyhow!("Frame queue is impossibly empty."))
}
}

#[async_trait]
impl<T, DAP, CP> ResettableStage for FrameQueue<T, DAP, CP>
where
T: Into<Bytes>,
DAP: DataAvailabilityProvider + Send,
CP: ChainProvider + Send,
{
async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> Result<()> {
self.queue = VecDeque::default();
Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/derive/src/traits/data_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ pub trait DataAvailabilityProvider {
/// Describes the behavior of a data iterator.
pub trait DataIter<T> {
/// Returns the next item in the iterator, or `None` if the iterator is exhausted.
fn next(&self) -> Option<T>;
fn next(&mut self) -> Option<T>;
}
14 changes: 7 additions & 7 deletions crates/derive/src/types/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl Frame {
/// Frames are stored in L1 transactions with the following format:
/// * `data = DerivationVersion0 ++ Frame(s)`
/// Where there is one or more frames concatenated together.
fn parse_frames(encoded: &[u8]) -> Result<Vec<Self>> {
pub fn parse_frames(encoded: &[u8]) -> Result<Vec<Self>> {
if encoded.is_empty() {
bail!("No frames to parse");
}
Expand Down Expand Up @@ -120,9 +120,9 @@ mod test {
#[test]
fn test_encode_frame_roundtrip() {
let frame = Frame {
id: [1; 16],
number: 0,
data: std::vec![],
id: [0xFF; 16],
number: 0xEE,
data: std::vec![0xDD; 50],
is_last: true,
};

Expand All @@ -133,9 +133,9 @@ mod test {
#[test]
fn test_decode_many() {
let frame = Frame {
id: [1; 16],
number: 0,
data: std::vec![],
id: [0xFF; 16],
number: 0xEE,
data: std::vec![0xDD; 50],
is_last: true,
};
let mut bytes = Vec::new();
Expand Down

0 comments on commit b10125d

Please sign in to comment.