diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index f1417f4e2..95cac8de6 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -8,7 +8,7 @@ use hashbrown::HashMap; use crate::{ params::{ChannelID, MAX_CHANNEL_BANK_SIZE}, traits::{ChainProvider, DataAvailabilityProvider}, - types::{BlockInfo, Channel, RollupConfig}, + types::{BlockInfo, Channel, Frame, RollupConfig}, }; use super::l1_retrieval::L1Retrieval; @@ -81,4 +81,28 @@ where } Ok(()) } + + /// Adds new L1 data to the channel bank. Should only be called after all data has been read. + pub fn ingest_frame(&mut self, frame: Frame) -> Result<()> { + let origin = *self.origin().ok_or(anyhow!("No origin"))?; + + let current_channel = self.channels.entry(frame.id).or_insert_with(|| { + // Create a new channel + let channel = Channel::new(frame.id, origin); + self.channel_queue.push_back(frame.id); + channel + }); + + // Check if the channel is not timed out. If it has, ignore the frame. + if current_channel.open_block_number() + self.cfg.channel_timeout < origin.number { + return Ok(()); + } + + // Ingest the frame. If it fails, ignore the frame. + if current_channel.add_frame(frame, origin).is_err() { + return Ok(()); + } + + self.prune() + } }