Skip to content

Commit

Permalink
fix(derive): channel bank testing with spinlocked primitives
Browse files Browse the repository at this point in the history
  • Loading branch information
refcell committed Apr 4, 2024
1 parent df76da5 commit 382d8a5
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 18 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ serde = { version = "1.0.197", default-features = false, features = ["derive"],
[dev-dependencies]
tokio = { version = "1.36", features = ["full"] }
proptest = "1.4.0"
spin = { version = "0.9.8", features = ["mutex"] } # Spin is used for testing synchronization primitives

[features]
serde = ["dep:serde", "alloy-primitives/serde"]
2 changes: 1 addition & 1 deletion crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ where
DAP: DataAvailabilityProvider + Send + Debug,
CP: ChainProvider + Send + Debug,
BF: SafeBlockFetcher + Send + Debug,
T: TelemetryProvider + Send + Debug,
T: TelemetryProvider + Send + Debug + Sync,
{
async fn reset(&mut self, base: BlockInfo, _: SystemConfig) -> StageResult<()> {
// Copy over the Origin from the next stage.
Expand Down
35 changes: 25 additions & 10 deletions crates/derive/src/stages/channel_bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ where
/// The rollup configuration.
cfg: Arc<RollupConfig>,
/// Telemetry
telemetry: T,
telemetry: Arc<T>,
/// Map of channels by ID.
channels: HashMap<ChannelID, Channel>,
/// Channels in FIFO order.
Expand All @@ -52,7 +52,7 @@ where
T: TelemetryProvider + Debug,
{
/// Create a new [ChannelBank] stage.
pub fn new(cfg: Arc<RollupConfig>, prev: FrameQueue<DAP, CP, T>, telemetry: T) -> Self {
pub fn new(cfg: Arc<RollupConfig>, prev: FrameQueue<DAP, CP, T>, telemetry: Arc<T>) -> Self {
Self { cfg, telemetry, channels: HashMap::new(), channel_queue: VecDeque::new(), prev }
}

Expand Down Expand Up @@ -207,7 +207,7 @@ impl<DAP, CP, T> ResettableStage for ChannelBank<DAP, CP, T>
where
DAP: DataAvailabilityProvider + Send + Debug,
CP: ChainProvider + Send + Debug,
T: TelemetryProvider + Send + Debug,
T: TelemetryProvider + Send + Sync + Debug,
{
async fn reset(&mut self, _: BlockInfo, _: SystemConfig) -> StageResult<()> {
self.channels.clear();
Expand All @@ -234,8 +234,12 @@ mod tests {
let dap = TestDAP::default();
let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new());
let frame_queue = FrameQueue::new(retrieval, TestTelemetry::new());
let mut channel_bank =
ChannelBank::new(Arc::new(RollupConfig::default()), frame_queue, TestTelemetry::new());
let telemetry = Arc::new(TestTelemetry::new());
let mut channel_bank = ChannelBank::new(
Arc::new(RollupConfig::default()),
frame_queue,
Arc::clone(&telemetry),
);
let frame = Frame::default();
let err = channel_bank.ingest_frame(frame).unwrap_err();
assert_eq!(err, StageError::MissingOrigin);
Expand All @@ -247,18 +251,21 @@ mod tests {
let dap = TestDAP::default();
let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new());
let frame_queue = FrameQueue::new(retrieval, TestTelemetry::new());
let telem = Arc::new(TestTelemetry::new());
let mut channel_bank =
ChannelBank::new(Arc::new(RollupConfig::default()), frame_queue, TestTelemetry::new());
ChannelBank::new(Arc::new(RollupConfig::default()), frame_queue, Arc::clone(&telem));
let frame = Frame { id: [0xFF; 16], ..Default::default() };
assert_eq!(channel_bank.size(), 0);
assert!(channel_bank.channels.is_empty());
assert_eq!(telem.count_calls(LogLevel::Warning), 0);
assert_eq!(channel_bank.ingest_frame(frame.clone()), Ok(()));
assert_eq!(channel_bank.size(), crate::params::FRAME_OVERHEAD);
assert_eq!(channel_bank.channels.len(), 1);
// This should fail since the frame is already ingested.
assert_eq!(channel_bank.ingest_frame(frame), Ok(()));
assert_eq!(channel_bank.size(), crate::params::FRAME_OVERHEAD);
assert_eq!(channel_bank.channels.len(), 1);
assert_eq!(telem.count_calls(LogLevel::Warning), 1);
}

#[test]
Expand All @@ -268,8 +275,12 @@ mod tests {
let dap = TestDAP { results };
let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new());
let frame_queue = FrameQueue::new(retrieval, TestTelemetry::new());
let mut channel_bank =
ChannelBank::new(Arc::new(RollupConfig::default()), frame_queue, TestTelemetry::new());
let telemetry = Arc::new(TestTelemetry::new());
let mut channel_bank = ChannelBank::new(
Arc::new(RollupConfig::default()),
frame_queue,
Arc::clone(&telemetry),
);
let mut frames = new_test_frames(100000);
// Ingest frames until the channel bank is full and it stops increasing in size
let mut current_size = 0;
Expand Down Expand Up @@ -297,8 +308,12 @@ mod tests {
let dap = TestDAP { results };
let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new());
let frame_queue = FrameQueue::new(retrieval, TestTelemetry::new());
let mut channel_bank =
ChannelBank::new(Arc::new(RollupConfig::default()), frame_queue, TestTelemetry::new());
let telemetry = Arc::new(TestTelemetry::new());
let mut channel_bank = ChannelBank::new(
Arc::new(RollupConfig::default()),
frame_queue,
Arc::clone(&telemetry),
);
let err = channel_bank.read().unwrap_err();
assert_eq!(err, StageError::Eof);
let err = channel_bank.next_data().await.unwrap_err();
Expand Down
25 changes: 18 additions & 7 deletions crates/derive/src/traits/test_utils/telemetry.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,41 @@
//! Test Utilities for Telemetry
use crate::traits::{LogLevel, TelemetryProvider};
use alloc::{rc::Rc, vec::Vec};
use alloc::{sync::Arc, vec::Vec};
use alloy_primitives::Bytes;
use core::cell::RefCell;
use spin::mutex::Mutex;

/// Mock telemetry provider
#[derive(Debug, Default)]
pub struct TestTelemetry {
/// Holds telemetry data with log levels for assertions.
pub(crate) telemetry_calls: Rc<RefCell<Vec<(Bytes, LogLevel)>>>,
pub(crate) telemetry_calls: Arc<Mutex<Vec<(Bytes, LogLevel)>>>,
}

impl TestTelemetry {
/// Creates a new [TestTelemetry] instance.
pub fn new() -> Self {
Self::default()
}

/// Checks the existance of a given ([Bytes], [LogLevel]) call.
pub fn exists(&self, data: Bytes, level: LogLevel) -> bool {
let guard = self.telemetry_calls.lock();
guard.iter().filter(|(d, l)| *d == data && *l == level).count() > 0
}

/// Counts the number of telemetry calls with the given [LogLevel].
pub fn count_calls(&self, level: LogLevel) -> usize {
let guard = self.telemetry_calls.lock();
guard.iter().filter(|(_, l)| *l == level).count()
}
}

impl TelemetryProvider for TestTelemetry {
fn write<I: Into<alloy_primitives::Bytes>>(&self, data: I, level: LogLevel) {
let data = (data.into(), level);
{
let mut calls = self.telemetry_calls.borrow_mut();
(*calls).push(data);
}
let binding = self.telemetry_calls.clone();
let mut guard = binding.lock();
guard.push(data);
}
}

0 comments on commit 382d8a5

Please sign in to comment.