diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index c847784c1..9d7cd6052 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -264,6 +264,17 @@ mod test { data.into() } + #[tokio::test] + async fn test_reset_channel_reader() { + let mock = MockChannelReaderProvider::new(vec![Ok(None)]); + let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); + reader.next_batch = Some(BatchReader::from(vec![0x00, 0x01, 0x02])); + assert!(!reader.prev.reset); + reader.reset(BlockInfo::default(), &SystemConfig::default()).await.unwrap(); + assert!(reader.next_batch.is_none()); + assert!(reader.prev.reset); + } + #[tokio::test] async fn test_next_batch_batch_reader_set_fails() { let mock = MockChannelReaderProvider::new(vec![Err(PipelineError::Eof.temp())]); diff --git a/crates/derive/src/stages/test_utils/channel_reader.rs b/crates/derive/src/stages/test_utils/channel_reader.rs index df7461d49..8d43e75ce 100644 --- a/crates/derive/src/stages/test_utils/channel_reader.rs +++ b/crates/derive/src/stages/test_utils/channel_reader.rs @@ -18,12 +18,14 @@ pub struct MockChannelReaderProvider { pub data: Vec>>, /// The origin block info pub block_info: Option, + /// Tracks if the channel reader provider has been reset. + pub reset: bool, } impl MockChannelReaderProvider { /// Creates a new [MockChannelReaderProvider] with the given data. pub fn new(data: Vec>>) -> Self { - Self { data, block_info: Some(BlockInfo::default()) } + Self { data, block_info: Some(BlockInfo::default()), reset: false } } } @@ -50,6 +52,7 @@ impl ChannelReaderProvider for MockChannelReaderProvider { #[async_trait] impl ResettableStage for MockChannelReaderProvider { async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> PipelineResult<()> { + self.reset = true; Ok(()) } }