Skip to content

Commit

Permalink
feat(derive): Stage multiplexer (#693)
Browse files Browse the repository at this point in the history
* feat(derive): Stage multiplexer

* don't export macro

* docs

* lint

* macro hygene

* docs

* 🧹 `ChannelAssembler` + tests

* error naming

* remove holocene channel bank code

* add generics and additional fields to macro

* support sub-stage inputs

* add multiplexer error

* rebase

* tests

* channel provider tests
  • Loading branch information
clabby authored Oct 15, 2024
1 parent 4e7f4d1 commit 87eab5a
Show file tree
Hide file tree
Showing 16 changed files with 840 additions and 116 deletions.
6 changes: 4 additions & 2 deletions bin/client/src/l1/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use kona_derive::{
pipeline::{DerivationPipeline, Pipeline, PipelineBuilder, StepResult},
sources::EthereumDataSource,
stages::{
AttributesQueue, BatchQueue, BatchStream, ChannelBank, ChannelReader, FrameQueue,
AttributesQueue, BatchQueue, BatchStream, ChannelProvider, ChannelReader, FrameQueue,
L1Retrieval, L1Traversal,
},
traits::{BlobProvider, OriginProvider, Signal},
Expand Down Expand Up @@ -50,7 +50,9 @@ pub type OracleAttributesQueue<DAP, O> = AttributesQueue<
BatchQueue<
BatchStream<
ChannelReader<
ChannelBank<FrameQueue<L1Retrieval<DAP, L1Traversal<OracleL1ChainProvider<O>>>>>,
ChannelProvider<
FrameQueue<L1Retrieval<DAP, L1Traversal<OracleL1ChainProvider<O>>>>,
>,
>,
OracleL2ChainProvider<O>,
>,
Expand Down
27 changes: 19 additions & 8 deletions crates/derive/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
//! This module contains derivation errors thrown within the pipeline.
use crate::batch::{SpanBatchError, MAX_SPAN_BATCH_ELEMENTS};
use crate::{
batch::{SpanBatchError, MAX_SPAN_BATCH_ELEMENTS},
stages::MultiplexerError,
};
use alloc::string::String;
use alloy_eips::BlockNumHash;
use alloy_primitives::B256;
Expand Down Expand Up @@ -47,15 +50,20 @@ pub enum PipelineError {
/// [PipelineError::Eof] will be encountered.
#[error("Not enough data")]
NotEnoughData,
/// No channels are available in the [ChannelBank].
/// No channels are available in the [ChannelProvider].
///
/// [ChannelBank]: crate::stages::ChannelBank
#[error("The channel bank is empty")]
ChannelBankEmpty,
/// Failed to find channel in the [ChannelBank].
/// [ChannelProvider]: crate::stages::ChannelProvider
#[error("The channel provider is empty")]
ChannelProviderEmpty,
/// The channel has already been built by the [ChannelAssembler] stage.
///
/// [ChannelBank]: crate::stages::ChannelBank
#[error("Channel not found in channel bank")]
/// [ChannelAssembler]: crate::stages::ChannelAssembler
#[error("Channel already built")]
ChannelAlreadyBuilt,
/// Failed to find channel in the [ChannelProvider].
///
/// [ChannelProvider]: crate::stages::ChannelProvider
#[error("Channel not found in channel provider")]
ChannelNotFound,
/// No channel returned by the [ChannelReader] stage.
///
Expand Down Expand Up @@ -89,6 +97,9 @@ pub enum PipelineError {
/// [PipelineEncodingError] variant.
#[error("Decode error: {0}")]
BadEncoding(#[from] PipelineEncodingError),
/// A multiplexer stage error.
#[error("Multiplexer error: {0}")]
Multiplexer(#[from] MultiplexerError),
/// Provider error variant.
#[error("Blob provider error: {0}")]
Provider(String),
Expand Down
12 changes: 6 additions & 6 deletions crates/derive/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
use super::{AttributesBuilder, DataAvailabilityProvider, DerivationPipeline};
use crate::stages::{
AttributesQueue, BatchQueue, BatchStream, ChannelBank, ChannelReader, FrameQueue, L1Retrieval,
L1Traversal,
AttributesQueue, BatchQueue, BatchStream, ChannelProvider, ChannelReader, FrameQueue,
L1Retrieval, L1Traversal,
};
use alloc::sync::Arc;
use core::fmt::Debug;
Expand All @@ -14,8 +14,8 @@ use op_alloy_protocol::BlockInfo;
type L1TraversalStage<P> = L1Traversal<P>;
type L1RetrievalStage<DAP, P> = L1Retrieval<DAP, L1TraversalStage<P>>;
type FrameQueueStage<DAP, P> = FrameQueue<L1RetrievalStage<DAP, P>>;
type ChannelBankStage<DAP, P> = ChannelBank<FrameQueueStage<DAP, P>>;
type ChannelReaderStage<DAP, P> = ChannelReader<ChannelBankStage<DAP, P>>;
type ChannelProviderStage<DAP, P> = ChannelProvider<FrameQueueStage<DAP, P>>;
type ChannelReaderStage<DAP, P> = ChannelReader<ChannelProviderStage<DAP, P>>;
type BatchStreamStage<DAP, P, T> = BatchStream<ChannelReaderStage<DAP, P>, T>;
type BatchQueueStage<DAP, P, T> = BatchQueue<BatchStreamStage<DAP, P, T>, T>;
type AttributesQueueStage<DAP, P, T, B> = AttributesQueue<BatchQueueStage<DAP, P, T>, B>;
Expand Down Expand Up @@ -131,8 +131,8 @@ where
l1_traversal.block = Some(builder.origin.expect("origin must be set"));
let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source);
let frame_queue = FrameQueue::new(l1_retrieval, Arc::clone(&rollup_config));
let channel_bank = ChannelBank::new(Arc::clone(&rollup_config), frame_queue);
let channel_reader = ChannelReader::new(channel_bank, Arc::clone(&rollup_config));
let channel_provider = ChannelProvider::new(Arc::clone(&rollup_config), frame_queue);
let channel_reader = ChannelReader::new(channel_provider, Arc::clone(&rollup_config));
let batch_stream =
BatchStream::new(channel_reader, rollup_config.clone(), l2_chain_provider.clone());
let batch_queue =
Expand Down
2 changes: 1 addition & 1 deletion crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ where
mod tests {
use super::*;
use crate::{
stages::channel_reader::BatchReader,
stages::channel::channel_reader::BatchReader,
test_utils::{CollectingLayer, TestBatchQueueProvider, TraceStorage},
};
use alloc::vec;
Expand Down
225 changes: 225 additions & 0 deletions crates/derive/src/stages/channel/channel_assembler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
//! This module contains the [ChannelAssembler] stage.
use super::{ChannelReaderProvider, NextFrameProvider};
use crate::{
pipeline::{OriginAdvancer, PipelineResult, ResettableStage},
prelude::{OriginProvider, PipelineError},
};
use alloc::{boxed::Box, sync::Arc};
use alloy_primitives::Bytes;
use async_trait::async_trait;
use core::fmt::Debug;
use op_alloy_genesis::{RollupConfig, SystemConfig};
use op_alloy_protocol::{BlockInfo, Channel};

/// The [ChannelAssembler] stage is responsible for assembling the [Frame]s from the [FrameQueue]
/// stage into a raw compressed [Channel].
///
/// [Frame]: op_alloy_protocol::Frame
/// [FrameQueue]: crate::stages::FrameQueue
/// [Channel]: op_alloy_protocol::Channel
#[derive(Debug)]
pub struct ChannelAssembler<P>
where
P: NextFrameProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug,
{
/// The rollup configuration.
pub(crate) cfg: Arc<RollupConfig>,
/// The previous stage of the derivation pipeline.
pub(crate) prev: P,
/// The current [Channel] being assembled.
pub(crate) channel: Option<Channel>,
}

impl<P> ChannelAssembler<P>
where
P: NextFrameProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug,
{
/// Creates a new [ChannelAssembler] stage with the given configuration and previous stage.
pub fn new(cfg: Arc<RollupConfig>, prev: P) -> Self {
crate::set!(STAGE_RESETS, 0, &["channel-assembly"]);
Self { cfg, prev, channel: None }
}

/// Consumes [Self] and returns the previous stage.
pub fn into_prev(self) -> P {
self.prev
}

/// Returns whether or not the channel currently being assembled has timed out.
pub fn is_timed_out(&self) -> PipelineResult<bool> {
let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?;
let is_timed_out = self
.channel
.as_ref()
.map(|c| {
c.open_block_number() + self.cfg.channel_timeout(origin.timestamp) < origin.number
})
.unwrap_or_default();

Ok(is_timed_out)
}
}

#[async_trait]
impl<P> ChannelReaderProvider for ChannelAssembler<P>
where
P: NextFrameProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug,
{
async fn next_data(&mut self) -> PipelineResult<Option<Bytes>> {
let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?;

// Time out the channel if it has timed out.
if self.channel.is_some() && self.is_timed_out()? {
#[cfg(feature = "metrics")]
{
let open_block_number =
self.channel.as_ref().map(|c| c.open_block_number()).unwrap_or_default();
crate::observe!(CHANNEL_TIMEOUTS, (origin.number - open_block_number) as f64);
}
self.channel = None;
}

// Grab the next frame from the previous stage.
let next_frame = self.prev.next_frame().await?;

// Start a new channel if the frame number is 0.
if next_frame.number == 0 {
self.channel = Some(Channel::new(next_frame.id, origin));
}

if let Some(channel) = self.channel.as_mut() {
// Add the frame to the channel. If this fails, return NotEnoughData and discard the
// frame.
if channel.add_frame(next_frame, origin).is_err() {
return Err(PipelineError::NotEnoughData.temp());
}

// If the channel is ready, forward the channel to the next stage.
if channel.is_ready() {
let channel_bytes =
channel.frame_data().ok_or(PipelineError::ChannelNotFound.crit())?;

// Reset the channel and return the compressed bytes.
self.channel = None;
return Ok(Some(channel_bytes));
}
}

Err(PipelineError::NotEnoughData.temp())
}
}

#[async_trait]
impl<P> OriginAdvancer for ChannelAssembler<P>
where
P: NextFrameProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug,
{
async fn advance_origin(&mut self) -> PipelineResult<()> {
self.prev.advance_origin().await
}
}

impl<P> OriginProvider for ChannelAssembler<P>
where
P: NextFrameProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug,
{
fn origin(&self) -> Option<BlockInfo> {
self.prev.origin()
}
}

#[async_trait]
impl<P> ResettableStage for ChannelAssembler<P>
where
P: NextFrameProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug,
{
async fn reset(
&mut self,
block_info: BlockInfo,
system_config: &SystemConfig,
) -> PipelineResult<()> {
self.prev.reset(block_info, system_config).await?;
self.channel = None;
crate::inc!(STAGE_RESETS, &["channel-assembly"]);
Ok(())
}
}

#[cfg(test)]
mod test {
use super::ChannelAssembler;
use crate::{
prelude::PipelineError,
stages::{frame_queue::tests::new_test_frames, ChannelReaderProvider},
test_utils::TestNextFrameProvider,
};
use alloc::sync::Arc;
use op_alloy_genesis::RollupConfig;
use op_alloy_protocol::BlockInfo;

#[tokio::test]
async fn test_assembler_channel_timeout() {
let frames = new_test_frames(2);
let mock = TestNextFrameProvider::new(frames.into_iter().rev().map(Ok).collect());
let cfg = Arc::new(RollupConfig::default());
let mut assembler = ChannelAssembler::new(cfg, mock);

// Set the origin to default block info @ block # 0.
assembler.prev.block_info = Some(BlockInfo::default());

// Read in the first frame. Since the frame isn't the last, the assembler
// should return None.
assert!(assembler.channel.is_none());
assert_eq!(assembler.next_data().await.unwrap_err(), PipelineError::NotEnoughData.temp());
assert!(assembler.channel.is_some());

// Push the origin forward past channel timeout.
assembler.prev.block_info =
Some(BlockInfo { number: assembler.cfg.channel_timeout(0) + 1, ..Default::default() });

// Assert that the assembler has timed out the channel.
assert!(assembler.is_timed_out().unwrap());
assert_eq!(assembler.next_data().await.unwrap_err(), PipelineError::NotEnoughData.temp());
assert!(assembler.channel.is_none());
}

#[tokio::test]
async fn test_assembler_non_starting_frame() {
let frames = new_test_frames(2);
let mock = TestNextFrameProvider::new(frames.into_iter().map(Ok).collect());
let cfg = Arc::new(RollupConfig::default());
let mut assembler = ChannelAssembler::new(cfg, mock);

// Send in the second frame first. This should result in no channel being created,
// and the frame being discarded.
assert!(assembler.channel.is_none());
assert_eq!(assembler.next_data().await.unwrap_err(), PipelineError::NotEnoughData.temp());
assert!(assembler.channel.is_none());
}

#[tokio::test]
async fn test_assembler_already_built() {
let frames = new_test_frames(2);
let mock = TestNextFrameProvider::new(frames.clone().into_iter().rev().map(Ok).collect());
let cfg = Arc::new(RollupConfig::default());
let mut assembler = ChannelAssembler::new(cfg, mock);

// Send in the first frame. This should result in a channel being created.
assert!(assembler.channel.is_none());
assert_eq!(assembler.next_data().await.unwrap_err(), PipelineError::NotEnoughData.temp());
assert!(assembler.channel.is_some());

// Send in a malformed second frame. This should result in an error in `add_frame`.
assembler.prev.data.push(Ok(frames[1].clone()).map(|mut f| {
f.id = Default::default();
f
}));
assert_eq!(assembler.next_data().await.unwrap_err(), PipelineError::NotEnoughData.temp());
assert!(assembler.channel.is_some());

// Send in the second frame again. This should return the channel bytes.
assert!(assembler.next_data().await.unwrap().is_some());
assert!(assembler.channel.is_none());
}
}
Loading

0 comments on commit 87eab5a

Please sign in to comment.