Skip to content

Commit 0eec4c4

Browse files
committed
fix(derive): merge upstream changes
2 parents 62f3efe + 869d485 commit 0eec4c4

20 files changed

+956
-141
lines changed

crates/derive/src/stages/batch_queue.rs

Lines changed: 372 additions & 0 deletions
Large diffs are not rendered by default.

crates/derive/src/stages/channel_bank.rs

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,8 @@ where
5252
T: TelemetryProvider + Debug,
5353
{
5454
/// Create a new [ChannelBank] stage.
55-
pub fn new(cfg: RollupConfig, prev: FrameQueue<DAP, CP, T>, telemetry: T) -> Self {
56-
Self {
57-
cfg: Arc::new(cfg),
58-
telemetry,
59-
channels: HashMap::new(),
60-
channel_queue: VecDeque::new(),
61-
prev,
62-
}
55+
pub fn new(cfg: Arc<RollupConfig>, prev: FrameQueue<DAP, CP, T>, telemetry: T) -> Self {
56+
Self { cfg, telemetry, channels: HashMap::new(), channel_queue: VecDeque::new(), prev }
6357
}
6458

6559
/// Returns the L1 origin [BlockInfo].
@@ -227,36 +221,35 @@ mod tests {
227221
use super::*;
228222
use crate::{
229223
stages::{
230-
frame_queue::tests::new_test_frames, l1_retrieval::L1Retrieval,
231-
l1_traversal::tests::new_test_traversal,
224+
frame_queue::tests::new_test_frames, l1_retrieval::L1Retrieval, l1_traversal::tests::*,
232225
},
233226
traits::test_utils::{TestDAP, TestTelemetry},
234227
};
235228
use alloc::vec;
236229

237230
#[test]
238231
fn test_ingest_empty_origin() {
239-
let mut traversal = new_test_traversal(false, false);
232+
let mut traversal = new_test_traversal(vec![], vec![]);
240233
traversal.block = None;
241234
let dap = TestDAP::default();
242235
let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new());
243236
let frame_queue = FrameQueue::new(retrieval, TestTelemetry::new());
244237
let mut channel_bank =
245-
ChannelBank::new(RollupConfig::default(), frame_queue, TestTelemetry::new());
238+
ChannelBank::new(Arc::new(RollupConfig::default()), frame_queue, TestTelemetry::new());
246239
let frame = Frame::default();
247240
let err = channel_bank.ingest_frame(frame).unwrap_err();
248241
assert_eq!(err, StageError::Custom(anyhow!("No origin")));
249242
}
250243

251244
#[test]
252245
fn test_ingest_and_prune_channel_bank() {
253-
let traversal = new_test_traversal(true, true);
246+
let traversal = new_populated_test_traversal();
254247
let results = vec![Ok(Bytes::from(vec![0x00]))];
255248
let dap = TestDAP { results };
256249
let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new());
257250
let frame_queue = FrameQueue::new(retrieval, TestTelemetry::new());
258251
let mut channel_bank =
259-
ChannelBank::new(RollupConfig::default(), frame_queue, TestTelemetry::new());
252+
ChannelBank::new(Arc::new(RollupConfig::default()), frame_queue, TestTelemetry::new());
260253
let mut frames = new_test_frames(100000);
261254
// Ingest frames until the channel bank is full and it stops increasing in size
262255
let mut current_size = 0;
@@ -279,16 +272,16 @@ mod tests {
279272

280273
#[tokio::test]
281274
async fn test_read_empty_channel_bank() {
282-
let traversal = new_test_traversal(true, true);
275+
let traversal = new_populated_test_traversal();
283276
let results = vec![Ok(Bytes::from(vec![0x00]))];
284277
let dap = TestDAP { results };
285278
let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new());
286279
let frame_queue = FrameQueue::new(retrieval, TestTelemetry::new());
287280
let mut channel_bank =
288-
ChannelBank::new(RollupConfig::default(), frame_queue, TestTelemetry::new());
281+
ChannelBank::new(Arc::new(RollupConfig::default()), frame_queue, TestTelemetry::new());
289282
let err = channel_bank.read().unwrap_err();
290283
assert_eq!(err, StageError::Eof);
291284
let err = channel_bank.next_data().await.unwrap_err();
292-
assert_eq!(err, StageError::Custom(anyhow!("Not Enough Data")));
285+
assert_eq!(err, StageError::NotEnoughData);
293286
}
294287
}

crates/derive/src/stages/channel_reader.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::{
55
traits::{ChainProvider, DataAvailabilityProvider, LogLevel, TelemetryProvider},
66
types::{Batch, BlockInfo, StageError, StageResult},
77
};
8+
89
use alloc::vec::Vec;
910
use anyhow::anyhow;
1011
use core::fmt::Debug;

crates/derive/src/stages/frame_queue.rs

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ use alloc::{boxed::Box, collections::VecDeque};
1313
use anyhow::anyhow;
1414
use async_trait::async_trait;
1515

16-
/// The frame queue stage of the derivation pipeline.
16+
/// The [FrameQueue] stage of the derivation pipeline.
17+
/// This stage takes the output of the [L1Retrieval] stage and parses it into frames.
1718
#[derive(Debug)]
1819
pub struct FrameQueue<DAP, CP, T>
1920
where
@@ -35,28 +36,33 @@ where
3536
CP: ChainProvider + Debug,
3637
T: TelemetryProvider + Debug,
3738
{
38-
/// Create a new frame queue stage.
39+
/// Create a new [FrameQueue] stage with the given previous [L1Retrieval] stage.
3940
pub fn new(prev: L1Retrieval<DAP, CP, T>, telemetry: T) -> Self {
4041
Self { prev, telemetry, queue: VecDeque::new() }
4142
}
4243

43-
/// Returns the L1 origin [BlockInfo].
44+
/// Returns the L1 [BlockInfo] origin.
4445
pub fn origin(&self) -> Option<&BlockInfo> {
4546
self.prev.origin()
4647
}
4748

48-
/// Fetches the next frame from the frame queue.
49+
/// Fetches the next frame from the [FrameQueue].
4950
pub async fn next_frame(&mut self) -> StageResult<Frame> {
5051
if self.queue.is_empty() {
5152
match self.prev.next_data().await {
5253
Ok(data) => {
53-
// TODO: what do we do with frame parsing errors?
5454
if let Ok(frames) = Frame::parse_frames(data.as_ref()) {
5555
self.queue.extend(frames);
56+
} else {
57+
// TODO: log parsing frame error
58+
// Failed to parse frames, but there may be more frames in the queue for
59+
// the pipeline to advance, so don't return an error here.
5660
}
5761
}
5862
Err(e) => {
59-
return Err(anyhow!("Error fetching next data: {e}").into());
63+
// TODO: log retrieval error
64+
// The error must be bubbled up without a wrapper in case it's an EOF error.
65+
return Err(e);
6066
}
6167
}
6268
}
@@ -68,7 +74,7 @@ where
6874
),
6975
LogLevel::Debug,
7076
);
71-
return Err(anyhow!("Not enough data").into());
77+
return Err(StageError::NotEnoughData);
7278
}
7379

7480
self.queue.pop_front().ok_or_else(|| anyhow!("Frame queue is impossibly empty.").into())
@@ -92,7 +98,7 @@ where
9298
pub(crate) mod tests {
9399
use super::*;
94100
use crate::{
95-
stages::l1_traversal::tests::new_test_traversal,
101+
stages::l1_traversal::tests::new_populated_test_traversal,
96102
traits::test_utils::{TestDAP, TestTelemetry},
97103
DERIVATION_VERSION_0,
98104
};
@@ -123,71 +129,71 @@ pub(crate) mod tests {
123129
#[tokio::test]
124130
async fn test_frame_queue_empty_bytes() {
125131
let telemetry = TestTelemetry::new();
126-
let traversal = new_test_traversal(true, true);
132+
let traversal = new_populated_test_traversal();
127133
let results = vec![Ok(Bytes::from(vec![0x00]))];
128134
let dap = TestDAP { results };
129135
let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new());
130136
let mut frame_queue = FrameQueue::new(retrieval, telemetry);
131137
let err = frame_queue.next_frame().await.unwrap_err();
132-
assert_eq!(err, anyhow!("Not enough data").into());
138+
assert_eq!(err, StageError::NotEnoughData);
133139
}
134140

135141
#[tokio::test]
136142
async fn test_frame_queue_no_frames_decoded() {
137143
let telemetry = TestTelemetry::new();
138-
let traversal = new_test_traversal(true, true);
144+
let traversal = new_populated_test_traversal();
139145
let results = vec![Err(StageError::Eof), Ok(Bytes::default())];
140146
let dap = TestDAP { results };
141147
let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new());
142148
let mut frame_queue = FrameQueue::new(retrieval, telemetry);
143149
let err = frame_queue.next_frame().await.unwrap_err();
144-
assert_eq!(err, anyhow!("Not enough data").into());
150+
assert_eq!(err, StageError::NotEnoughData);
145151
}
146152

147153
#[tokio::test]
148154
async fn test_frame_queue_wrong_derivation_version() {
149155
let telemetry = TestTelemetry::new();
150-
let traversal = new_test_traversal(true, true);
156+
let traversal = new_populated_test_traversal();
151157
let results = vec![Ok(Bytes::from(vec![0x01]))];
152158
let dap = TestDAP { results };
153159
let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new());
154160
let mut frame_queue = FrameQueue::new(retrieval, telemetry);
155161
let err = frame_queue.next_frame().await.unwrap_err();
156-
assert_eq!(err, anyhow!("Unsupported derivation version").into());
162+
assert_eq!(err, StageError::NotEnoughData);
157163
}
158164

159165
#[tokio::test]
160166
async fn test_frame_queue_frame_too_short() {
161167
let telemetry = TestTelemetry::new();
162-
let traversal = new_test_traversal(true, true);
168+
let traversal = new_populated_test_traversal();
163169
let results = vec![Ok(Bytes::from(vec![0x00, 0x01]))];
164170
let dap = TestDAP { results };
165171
let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new());
166172
let mut frame_queue = FrameQueue::new(retrieval, telemetry);
167173
let err = frame_queue.next_frame().await.unwrap_err();
168-
assert_eq!(err, anyhow!("Frame too short to decode").into());
174+
assert_eq!(err, StageError::NotEnoughData);
169175
}
170176

171177
#[tokio::test]
172178
async fn test_frame_queue_single_frame() {
173179
let data = new_encoded_test_frames(1);
174180
let telemetry = TestTelemetry::new();
175-
let traversal = new_test_traversal(true, true);
181+
let traversal = new_populated_test_traversal();
176182
let dap = TestDAP { results: vec![Ok(data)] };
177183
let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new());
178184
let mut frame_queue = FrameQueue::new(retrieval, telemetry);
179185
let frame_decoded = frame_queue.next_frame().await.unwrap();
180186
let frame = new_test_frames(1);
181187
assert_eq!(frame[0], frame_decoded);
182188
let err = frame_queue.next_frame().await.unwrap_err();
183-
assert_eq!(err, anyhow!("Not enough data").into());
189+
assert_eq!(err, StageError::Eof);
184190
}
185191

186192
#[tokio::test]
187193
async fn test_frame_queue_multiple_frames() {
188194
let telemetry = TestTelemetry::new();
189195
let data = new_encoded_test_frames(3);
190-
let traversal = new_test_traversal(true, true);
196+
let traversal = new_populated_test_traversal();
191197
let dap = TestDAP { results: vec![Ok(data)] };
192198
let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new());
193199
let mut frame_queue = FrameQueue::new(retrieval, telemetry);
@@ -196,6 +202,6 @@ pub(crate) mod tests {
196202
assert_eq!(frame_decoded.number, i);
197203
}
198204
let err = frame_queue.next_frame().await.unwrap_err();
199-
assert_eq!(err, anyhow!("Not enough data").into());
205+
assert_eq!(err, StageError::Eof);
200206
}
201207
}

crates/derive/src/stages/l1_retrieval.rs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@ use alloy_primitives::Bytes;
1313
use anyhow::anyhow;
1414
use async_trait::async_trait;
1515

16-
/// The L1 retrieval stage of the derivation pipeline.
16+
/// The [L1Retrieval] stage of the derivation pipeline.
17+
/// For each L1 [BlockInfo] pulled from the [L1Traversal] stage,
18+
/// [L1Retrieval] fetches the associated data from a specified
19+
/// [DataAvailabilityProvider]. This data is returned as a generic
20+
/// [DataIter] that can be iterated over.
1721
#[derive(Debug)]
1822
pub struct L1Retrieval<DAP, CP, T>
1923
where
@@ -37,20 +41,20 @@ where
3741
CP: ChainProvider,
3842
T: TelemetryProvider,
3943
{
40-
/// Creates a new L1 retrieval stage with the given data availability provider and previous
41-
/// stage.
44+
/// Creates a new [L1Retrieval] stage with the previous [L1Traversal]
45+
/// stage and given [DataAvailabilityProvider].
4246
pub fn new(prev: L1Traversal<CP, T>, provider: DAP, telemetry: T) -> Self {
4347
Self { prev, telemetry, provider, data: None }
4448
}
4549

46-
/// Returns the current L1 block in the traversal stage, if it exists.
50+
/// Returns the current L1 [BlockInfo] origin from the previous
51+
/// [L1Traversal] stage.
4752
pub fn origin(&self) -> Option<&BlockInfo> {
4853
self.prev.origin()
4954
}
5055

51-
/// Retrieves the next data item from the L1 retrieval stage.
52-
/// If there is data, it pushes it into the next stage.
53-
/// If there is no data, it returns an error.
56+
/// Retrieves the next data item from the [L1Retrieval] stage.
57+
/// Returns an error if there is no data.
5458
pub async fn next_data(&mut self) -> StageResult<Bytes> {
5559
if self.data.is_none() {
5660
self.telemetry.write(
@@ -94,15 +98,15 @@ where
9498
mod tests {
9599
use super::*;
96100
use crate::{
97-
stages::l1_traversal::tests::new_test_traversal,
101+
stages::l1_traversal::tests::*,
98102
traits::test_utils::{TestDAP, TestIter, TestTelemetry},
99103
};
100104
use alloc::vec;
101105
use alloy_primitives::Address;
102106

103107
#[tokio::test]
104108
async fn test_l1_retrieval_origin() {
105-
let traversal = new_test_traversal(true, true);
109+
let traversal = new_populated_test_traversal();
106110
let dap = TestDAP { results: vec![] };
107111
let telemetry = TestTelemetry::new();
108112
let retrieval = L1Retrieval::new(traversal, dap, telemetry);
@@ -112,7 +116,7 @@ mod tests {
112116

113117
#[tokio::test]
114118
async fn test_l1_retrieval_next_data() {
115-
let traversal = new_test_traversal(true, true);
119+
let traversal = new_populated_test_traversal();
116120
let results = vec![Err(StageError::Eof), Ok(Bytes::default())];
117121
let dap = TestDAP { results };
118122
let telemetry = TestTelemetry::new();
@@ -140,8 +144,8 @@ mod tests {
140144
// Create a new traversal with no blocks or receipts.
141145
// This would bubble up an error if the prev stage
142146
// (traversal) is called in the retrieval stage.
143-
let traversal = new_test_traversal(false, false);
144147
let telemetry = TestTelemetry::new();
148+
let traversal = new_test_traversal(vec![], vec![]);
145149
let dap = TestDAP { results: vec![] };
146150
let mut retrieval =
147151
L1Retrieval { prev: traversal, telemetry, provider: dap, data: Some(data) };
@@ -159,7 +163,7 @@ mod tests {
159163
results: vec![Err(StageError::Eof)],
160164
};
161165
let telemetry = TestTelemetry::new();
162-
let traversal = new_test_traversal(true, true);
166+
let traversal = new_populated_test_traversal();
163167
let dap = TestDAP { results: vec![] };
164168
let mut retrieval =
165169
L1Retrieval { prev: traversal, telemetry, provider: dap, data: Some(data) };

0 commit comments

Comments
 (0)