Skip to content

Commit

Permalink
Merge pull request #75 from ethereum-optimism/refcell/telemetry
Browse files Browse the repository at this point in the history
feat(derive): Portable Telemetry
  • Loading branch information
refcell authored Apr 4, 2024
2 parents 869d485 + 0eec4c4 commit 50a4410
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 71 deletions.
18 changes: 12 additions & 6 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
use crate::{
stages::channel_reader::ChannelReader,
traits::{ChainProvider, DataAvailabilityProvider, ResettableStage, SafeBlockFetcher},
traits::{
ChainProvider, DataAvailabilityProvider, ResettableStage, SafeBlockFetcher,
TelemetryProvider,
},
types::{
Batch, BatchValidity, BatchWithInclusionBlock, BlockInfo, L2BlockInfo, RollupConfig,
SingleBatch, StageError, StageResult, SystemConfig,
Expand All @@ -28,16 +31,17 @@ use core::fmt::Debug;
/// It is internally responsible for making sure that batches with L1 inclusions block outside it's
/// working range are not considered or pruned.
#[derive(Debug)]
pub struct BatchQueue<DAP, CP, BF>
pub struct BatchQueue<DAP, CP, BF, T>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
BF: SafeBlockFetcher + Debug,
T: TelemetryProvider + Debug,
{
/// The rollup config.
cfg: RollupConfig,
/// The previous stage of the derivation pipeline.
prev: ChannelReader<DAP, CP>,
prev: ChannelReader<DAP, CP, T>,
/// The l1 block ref
origin: Option<BlockInfo>,

Expand All @@ -59,14 +63,15 @@ where
fetcher: BF,
}

impl<DAP, CP, BF> BatchQueue<DAP, CP, BF>
impl<DAP, CP, BF, T> BatchQueue<DAP, CP, BF, T>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
BF: SafeBlockFetcher + Debug,
T: TelemetryProvider + Debug,
{
/// Creates a new [BatchQueue] stage.
pub fn new(cfg: RollupConfig, prev: ChannelReader<DAP, CP>, fetcher: BF) -> Self {
pub fn new(cfg: RollupConfig, prev: ChannelReader<DAP, CP, T>, fetcher: BF) -> Self {
Self {
cfg,
prev,
Expand Down Expand Up @@ -344,11 +349,12 @@ where
}

#[async_trait]
impl<DAP, CP, BF> ResettableStage for BatchQueue<DAP, CP, BF>
impl<DAP, CP, BF, T> ResettableStage for BatchQueue<DAP, CP, BF, T>
where
DAP: DataAvailabilityProvider + Send + Debug,
CP: ChainProvider + Send + Debug,
BF: SafeBlockFetcher + Send + Debug,
T: TelemetryProvider + Send + Debug,
{
async fn reset(&mut self, base: BlockInfo, _: SystemConfig) -> StageResult<()> {
// Copy over the Origin from the next stage.
Expand Down
62 changes: 45 additions & 17 deletions crates/derive/src/stages/channel_bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
use super::frame_queue::FrameQueue;
use crate::{
params::{ChannelID, MAX_CHANNEL_BANK_SIZE},
traits::{ChainProvider, DataAvailabilityProvider, ResettableStage},
traits::{
ChainProvider, DataAvailabilityProvider, LogLevel, ResettableStage, TelemetryProvider,
},
types::{BlockInfo, Channel, Frame, RollupConfig, StageError, StageResult, SystemConfig},
};
use alloc::{boxed::Box, collections::VecDeque, sync::Arc};
Expand All @@ -25,29 +27,33 @@ use hashbrown::HashMap;
/// to `IngestData`. This means that we can do an ingest and then do a read while becoming too
/// large. [ChannelBank] buffers channel frames, and emits full channel data
#[derive(Debug)]
pub struct ChannelBank<DAP, CP>
pub struct ChannelBank<DAP, CP, T>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
T: TelemetryProvider + Debug,
{
/// The rollup configuration.
cfg: Arc<RollupConfig>,
/// Telemetry
telemetry: T,
/// Map of channels by ID.
channels: HashMap<ChannelID, Channel>,
/// Channels in FIFO order.
channel_queue: VecDeque<ChannelID>,
/// The previous stage of the derivation pipeline.
prev: FrameQueue<DAP, CP>,
prev: FrameQueue<DAP, CP, T>,
}

impl<DAP, CP> ChannelBank<DAP, CP>
impl<DAP, CP, T> ChannelBank<DAP, CP, T>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
T: TelemetryProvider + Debug,
{
/// Create a new [ChannelBank] stage.
pub fn new(cfg: Arc<RollupConfig>, prev: FrameQueue<DAP, CP>) -> Self {
Self { cfg, channels: HashMap::new(), channel_queue: VecDeque::new(), prev }
pub fn new(cfg: Arc<RollupConfig>, prev: FrameQueue<DAP, CP, T>, telemetry: T) -> Self {
Self { cfg, telemetry, channels: HashMap::new(), channel_queue: VecDeque::new(), prev }
}

/// Returns the L1 origin [BlockInfo].
Expand Down Expand Up @@ -85,11 +91,23 @@ where

// Check if the channel is not timed out. If it has, ignore the frame.
if current_channel.open_block_number() + self.cfg.channel_timeout < origin.number {
self.telemetry.write(
alloy_primitives::Bytes::from(alloc::format!("Channel {:?} timed out", frame.id)),
LogLevel::Warning,
);
return Ok(());
}

// Ingest the frame. If it fails, ignore the frame.
let frame_id = frame.id;
if current_channel.add_frame(frame, origin).is_err() {
self.telemetry.write(
alloy_primitives::Bytes::from(alloc::format!(
"Failed to add frame to channel: {:?}",
frame_id
)),
LogLevel::Warning,
);
return Ok(());
}

Expand All @@ -102,6 +120,8 @@ where
pub fn read(&mut self) -> StageResult<Option<Bytes>> {
// Bail if there are no channels to read from.
if self.channel_queue.is_empty() {
self.telemetry
.write(alloy_primitives::Bytes::from("No channels to read from"), LogLevel::Debug);
return Err(StageError::Eof);
}

Expand All @@ -113,6 +133,10 @@ where

// Remove all timed out channels from the front of the `channel_queue`.
if channel.open_block_number() + self.cfg.channel_timeout < origin.number {
self.telemetry.write(
alloy_primitives::Bytes::from(alloc::format!("Channel {:?} timed out", first)),
LogLevel::Warning,
);
self.channels.remove(&first);
self.channel_queue.pop_front();
return Ok(None);
Expand Down Expand Up @@ -179,10 +203,11 @@ where
}

#[async_trait]
impl<DAP, CP> ResettableStage for ChannelBank<DAP, CP>
impl<DAP, CP, T> ResettableStage for ChannelBank<DAP, CP, T>
where
DAP: DataAvailabilityProvider + Send + Debug,
CP: ChainProvider + Send + Debug,
T: TelemetryProvider + Send + Debug,
{
async fn reset(&mut self, _: BlockInfo, _: SystemConfig) -> StageResult<()> {
self.channels.clear();
Expand All @@ -198,7 +223,7 @@ mod tests {
stages::{
frame_queue::tests::new_test_frames, l1_retrieval::L1Retrieval, l1_traversal::tests::*,
},
traits::test_utils::TestDAP,
traits::test_utils::{TestDAP, TestTelemetry},
};
use alloc::vec;

Expand All @@ -207,9 +232,10 @@ mod tests {
let mut traversal = new_test_traversal(vec![], vec![]);
traversal.block = None;
let dap = TestDAP::default();
let retrieval = L1Retrieval::new(traversal, dap);
let frame_queue = FrameQueue::new(retrieval);
let mut channel_bank = ChannelBank::new(Arc::new(RollupConfig::default()), frame_queue);
let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new());
let frame_queue = FrameQueue::new(retrieval, TestTelemetry::new());
let mut channel_bank =
ChannelBank::new(Arc::new(RollupConfig::default()), frame_queue, TestTelemetry::new());
let frame = Frame::default();
let err = channel_bank.ingest_frame(frame).unwrap_err();
assert_eq!(err, StageError::Custom(anyhow!("No origin")));
Expand All @@ -220,9 +246,10 @@ mod tests {
let traversal = new_populated_test_traversal();
let results = vec![Ok(Bytes::from(vec![0x00]))];
let dap = TestDAP { results };
let retrieval = L1Retrieval::new(traversal, dap);
let frame_queue = FrameQueue::new(retrieval);
let mut channel_bank = ChannelBank::new(Arc::new(RollupConfig::default()), frame_queue);
let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new());
let frame_queue = FrameQueue::new(retrieval, TestTelemetry::new());
let mut channel_bank =
ChannelBank::new(Arc::new(RollupConfig::default()), frame_queue, TestTelemetry::new());
let mut frames = new_test_frames(100000);
// Ingest frames until the channel bank is full and it stops increasing in size
let mut current_size = 0;
Expand All @@ -248,9 +275,10 @@ mod tests {
let traversal = new_populated_test_traversal();
let results = vec![Ok(Bytes::from(vec![0x00]))];
let dap = TestDAP { results };
let retrieval = L1Retrieval::new(traversal, dap);
let frame_queue = FrameQueue::new(retrieval);
let mut channel_bank = ChannelBank::new(Arc::new(RollupConfig::default()), frame_queue);
let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new());
let frame_queue = FrameQueue::new(retrieval, TestTelemetry::new());
let mut channel_bank =
ChannelBank::new(Arc::new(RollupConfig::default()), frame_queue, TestTelemetry::new());
let err = channel_bank.read().unwrap_err();
assert_eq!(err, StageError::Eof);
let err = channel_bank.next_data().await.unwrap_err();
Expand Down
18 changes: 12 additions & 6 deletions crates/derive/src/stages/channel_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use super::channel_bank::ChannelBank;
use crate::{
traits::{ChainProvider, DataAvailabilityProvider},
traits::{ChainProvider, DataAvailabilityProvider, LogLevel, TelemetryProvider},
types::{Batch, BlockInfo, StageError, StageResult},
};

Expand All @@ -13,30 +13,36 @@ use miniz_oxide::inflate::decompress_to_vec_zlib;

/// [ChannelReader] is a stateful stage that does the following:
#[derive(Debug)]
pub struct ChannelReader<DAP, CP>
pub struct ChannelReader<DAP, CP, T>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
T: TelemetryProvider + Debug,
{
/// The previous stage of the derivation pipeline.
prev: ChannelBank<DAP, CP>,
prev: ChannelBank<DAP, CP, T>,
/// Telemetry
telemetry: T,
/// The batch reader.
next_batch: Option<BatchReader>,
}

impl<DAP, CP> ChannelReader<DAP, CP>
impl<DAP, CP, T> ChannelReader<DAP, CP, T>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
T: TelemetryProvider + Debug,
{
/// Create a new [ChannelReader] stage.
pub fn new(prev: ChannelBank<DAP, CP>) -> Self {
Self { prev, next_batch: None }
pub fn new(prev: ChannelBank<DAP, CP, T>, telemetry: T) -> Self {
Self { prev, telemetry, next_batch: None }
}

/// Pulls out the next Batch from the available channel.
pub async fn next_batch(&mut self) -> StageResult<Batch> {
if let Err(e) = self.set_batch_reader().await {
self.telemetry
.write(alloc::format!("Failed to set batch reader: {:?}", e), LogLevel::Error);
self.next_channel();
return Err(e);
}
Expand Down
Loading

0 comments on commit 50a4410

Please sign in to comment.