Skip to content

Commit

Permalink
feat(derive): Span Batch Validation (#121)
Browse files Browse the repository at this point in the history
* feat(derive): span batch validation

* feat(derive): span batch validity unit tests

* feat(derive): span batch unit tests with acceptance test

* fix(derive): unit tests

* fix(derive): add more unit tests

* feat(derive): span batch validity unit tests for txs
  • Loading branch information
refcell authored Apr 19, 2024
1 parent 0587b4d commit 5a60f8d
Show file tree
Hide file tree
Showing 6 changed files with 1,385 additions and 23 deletions.
23 changes: 14 additions & 9 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ where
/// Follows the validity rules imposed on consecutive batches.
/// Based on currently available buffered batch and L1 origin information.
/// A [StageError::Eof] is returned if no batch can be derived yet.
pub fn derive_next_batch(&mut self, empty: bool, parent: L2BlockInfo) -> StageResult<Batch> {
pub async fn derive_next_batch(
&mut self,
empty: bool,
parent: L2BlockInfo,
) -> StageResult<Batch> {
// Cannot derive a batch if no origin was prepared.
if self.l1_blocks.is_empty() {
return Err(StageError::MissingOrigin);
Expand Down Expand Up @@ -140,7 +144,8 @@ where
let mut remaining = Vec::new();
for i in 0..self.batches.len() {
let batch = &self.batches[i];
let validity = batch.check_batch(&self.cfg, &self.l1_blocks, parent, &self.fetcher);
let validity =
batch.check_batch(&self.cfg, &self.l1_blocks, parent, &mut self.fetcher).await;
match validity {
BatchValidity::Future => {
remaining.push(batch.clone());
Expand Down Expand Up @@ -221,15 +226,15 @@ where
}

/// Adds a batch to the queue.
pub fn add_batch(&mut self, batch: Batch, parent: L2BlockInfo) -> StageResult<()> {
pub async fn add_batch(&mut self, batch: Batch, parent: L2BlockInfo) -> StageResult<()> {
if self.l1_blocks.is_empty() {
error!("Cannot add batch without an origin");
panic!("Cannot add batch without an origin");
}
let origin = self.origin.ok_or_else(|| anyhow!("cannot add batch with missing origin"))?;
let data = BatchWithInclusionBlock { inclusion_block: origin, batch };
// If we drop the batch, validation logs the drop reason with WARN level.
if data.check_batch(&self.cfg, &self.l1_blocks, parent, &self.fetcher).is_drop() {
if data.check_batch(&self.cfg, &self.l1_blocks, parent, &mut self.fetcher).await.is_drop() {
return Ok(());
}
self.batches.push(data);
Expand Down Expand Up @@ -310,7 +315,7 @@ where
match self.prev.next_batch().await {
Ok(b) => {
if !origin_behind {
self.add_batch(b, parent).ok();
self.add_batch(b, parent).await.ok();
} else {
warn!("Dropping batch: Origin is behind");
}
Expand All @@ -329,7 +334,7 @@ where
}

// Attempt to derive more batches.
let batch = match self.derive_next_batch(out_of_data, parent) {
let batch = match self.derive_next_batch(out_of_data, parent).await {
Ok(b) => b,
Err(e) => match e {
StageError::Eof => {
Expand Down Expand Up @@ -418,15 +423,15 @@ mod tests {
BatchReader::from(compressed)
}

#[test]
fn test_derive_next_batch_missing_origin() {
#[tokio::test]
async fn test_derive_next_batch_missing_origin() {
let data = vec![Ok(Batch::Single(SingleBatch::default()))];
let cfg = Arc::new(RollupConfig::default());
let mock = MockBatchQueueProvider::new(data);
let fetcher = MockBlockFetcher::default();
let mut bq = BatchQueue::new(cfg, mock, fetcher);
let parent = L2BlockInfo::default();
let result = bq.derive_next_batch(false, parent).unwrap_err();
let result = bq.derive_next_batch(false, parent).await.unwrap_err();
assert_eq!(result, StageError::MissingOrigin);
}

Expand Down
26 changes: 24 additions & 2 deletions crates/derive/src/stages/test_utils/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,29 @@ use tracing::{Event, Level, Subscriber};
use tracing_subscriber::{layer::Context, Layer};

/// The storage for the collected traces.
pub type TraceStorage = Arc<Mutex<Vec<(Level, String)>>>;
#[derive(Debug, Default, Clone)]
pub struct TraceStorage(pub Arc<Mutex<Vec<(Level, String)>>>);

impl TraceStorage {
/// Returns the items in the storage that match the specified level.
pub fn get_by_level(&self, level: Level) -> Vec<String> {
self.0
.lock()
.iter()
.filter_map(|(l, message)| if *l == level { Some(message.clone()) } else { None })
.collect()
}

/// Locks the storage and returns the items.
pub fn lock(&self) -> spin::MutexGuard<'_, Vec<(Level, String)>> {
self.0.lock()
}

/// Returns if the storage is empty.
pub fn is_empty(&self) -> bool {
self.0.lock().is_empty()
}
}

#[derive(Debug, Default)]
pub struct CollectingLayer {
Expand All @@ -26,7 +48,7 @@ impl<S: Subscriber> Layer<S> for CollectingLayer {
let level = *metadata.level();
let message = format!("{:?}", event);

let mut storage = self.storage.lock();
let mut storage = self.storage.0.lock();
storage.push((level, message));
}
}
8 changes: 5 additions & 3 deletions crates/derive/src/types/batch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,21 @@ impl BatchWithInclusionBlock {
/// One or more consecutive l1_blocks should be provided.
/// In case of only a single L1 block, the decision whether a batch is valid may have to stay
/// undecided.
pub fn check_batch<BF: L2ChainProvider>(
pub async fn check_batch<BF: L2ChainProvider>(
&self,
cfg: &RollupConfig,
l1_blocks: &[BlockInfo],
l2_safe_head: L2BlockInfo,
fetcher: &BF,
fetcher: &mut BF,
) -> BatchValidity {
match &self.batch {
Batch::Single(single_batch) => {
single_batch.check_batch(cfg, l1_blocks, l2_safe_head, &self.inclusion_block)
}
Batch::Span(span_batch) => {
span_batch.check_batch(cfg, l1_blocks, l2_safe_head, &self.inclusion_block, fetcher)
span_batch
.check_batch(cfg, l1_blocks, l2_safe_head, &self.inclusion_block, fetcher)
.await
}
}
}
Expand Down
Loading

0 comments on commit 5a60f8d

Please sign in to comment.