From 0a1ca04a53d2c5350449dd7bfbde7a4f33aa4ee4 Mon Sep 17 00:00:00 2001 From: refcell Date: Tue, 8 Oct 2024 17:07:48 -0400 Subject: [PATCH] chore(derive): test channel reader flushing (#661) --- crates/derive/src/stages/channel_reader.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index 9d7cd6052..5866b69a5 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -264,6 +264,15 @@ mod test { data.into() } + #[tokio::test] + async fn test_flush_channel_reader() { + let mock = MockChannelReaderProvider::new(vec![Ok(Some(new_compressed_batch_data()))]); + let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); + reader.next_batch = Some(BatchReader::from(new_compressed_batch_data())); + reader.flush_channel().await.unwrap(); + assert!(reader.next_batch.is_none()); + } + #[tokio::test] async fn test_reset_channel_reader() { let mock = MockChannelReaderProvider::new(vec![Ok(None)]);