diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index 3872ada12cd64..a8fcb8db2bae6 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -414,7 +414,7 @@ impl StreamConsumer for DispatchExecutor { end_of_stream = true; continue; }; - let mut barrier_batch = smallvec![]; + let mut barrier_batch = vec![]; let msg: Message = msg?; let max_peek_attempts = match msg { Message::Chunk(c) => { diff --git a/src/stream/src/executor/exchange/permit.rs b/src/stream/src/executor/exchange/permit.rs index e0ef92406670e..f12e994b2dc30 100644 --- a/src/stream/src/executor/exchange/permit.rs +++ b/src/stream/src/executor/exchange/permit.rs @@ -212,7 +212,6 @@ mod tests { use std::pin::pin; use futures::FutureExt; - use smallvec::smallvec; use super::*; use crate::executor::DispatcherBarrier as Barrier; @@ -222,7 +221,7 @@ mod tests { let (tx, mut rx) = channel(0, 0, 1); let send = || { - tx.send(Message::BarrierBatch(smallvec![ + tx.send(Message::BarrierBatch(vec![ Barrier::with_prev_epoch_for_test(514, 114), ])) }; diff --git a/src/stream/src/executor/integration_tests.rs b/src/stream/src/executor/integration_tests.rs index 0a9587e51f969..350afe181d9e2 100644 --- a/src/stream/src/executor/integration_tests.rs +++ b/src/stream/src/executor/integration_tests.rs @@ -359,9 +359,7 @@ impl StreamConsumer for SenderConsumer { .send(match msg { Message::Chunk(chunk) => DispatcherMessageBatch::Chunk(chunk), Message::Barrier(barrier) => { - DispatcherMessageBatch::BarrierBatch(smallvec![ - barrier.into_dispatcher() - ]) + DispatcherMessageBatch::BarrierBatch(vec![barrier.into_dispatcher()]) } Message::Watermark(watermark) => { DispatcherMessageBatch::Watermark(watermark) diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 42899971050cd..fd4b9b6480fbb 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -44,7 +44,7 @@ use risingwave_pb::stream_plan::{ PbUpdateMutation, PbWatermark, ResumeMutation, SourceChangeSplitMutation, StopMutation, SubscriptionUpstreamInfo, ThrottleMutation, }; -use smallvec::{smallvec, SmallVec}; +use smallvec::SmallVec; use crate::error::StreamResult; use crate::task::{ActorId, FragmentId}; @@ -1047,24 +1047,23 @@ impl MessageInner { pub type Message = MessageInner; pub type DispatcherMessage = MessageInner<()>; -const EMPIRICAL_BARRIER_BATCH_SIZE: usize = 8; /// `MessageBatchInner` is used exclusively by `Dispatcher` and the `Merger`/`Receiver` for exchanging messages between them. /// It shares the same message type as the fundamental `MessageInner`, but batches multiple barriers into a single message. #[derive(Debug, EnumAsInner, PartialEq, Clone)] pub enum MessageBatchInner { Chunk(StreamChunk), - BarrierBatch(SmallVec<[BarrierInner; EMPIRICAL_BARRIER_BATCH_SIZE]>), + BarrierBatch(Vec>), Watermark(Watermark), } pub type MessageBatch = MessageBatchInner; -pub type DispatcherBarriers = SmallVec<[DispatcherBarrier; EMPIRICAL_BARRIER_BATCH_SIZE]>; +pub type DispatcherBarriers = Vec; pub type DispatcherMessageBatch = MessageBatchInner<()>; impl From for DispatcherMessageBatch { fn from(m: DispatcherMessage) -> Self { match m { DispatcherMessage::Chunk(c) => Self::Chunk(c), - DispatcherMessage::Barrier(b) => Self::BarrierBatch(smallvec![b]), + DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]), DispatcherMessage::Watermark(w) => Self::Watermark(w), } }