Skip to content

Commit

Permalink
Revert "SmallVec"
Browse files Browse the repository at this point in the history
This reverts commit ca0d6685f8c2e135c56b1d407ea5b4e74e42fed9.
  • Loading branch information
zwang28 committed Jan 3, 2025
1 parent 85a38fb commit a45de93
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ impl StreamConsumer for DispatchExecutor {
end_of_stream = true;
continue;
};
let mut barrier_batch = smallvec![];
let mut barrier_batch = vec![];
let msg: Message = msg?;
let max_peek_attempts = match msg {
Message::Chunk(c) => {
Expand Down
3 changes: 1 addition & 2 deletions src/stream/src/executor/exchange/permit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ mod tests {
use std::pin::pin;

use futures::FutureExt;
use smallvec::smallvec;

use super::*;
use crate::executor::DispatcherBarrier as Barrier;
Expand All @@ -222,7 +221,7 @@ mod tests {
let (tx, mut rx) = channel(0, 0, 1);

let send = || {
tx.send(Message::BarrierBatch(smallvec![
tx.send(Message::BarrierBatch(vec![
Barrier::with_prev_epoch_for_test(514, 114),
]))
};
Expand Down
4 changes: 1 addition & 3 deletions src/stream/src/executor/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,7 @@ impl StreamConsumer for SenderConsumer {
.send(match msg {
Message::Chunk(chunk) => DispatcherMessageBatch::Chunk(chunk),
Message::Barrier(barrier) => {
DispatcherMessageBatch::BarrierBatch(smallvec![
barrier.into_dispatcher()
])
DispatcherMessageBatch::BarrierBatch(vec![barrier.into_dispatcher()])
}
Message::Watermark(watermark) => {
DispatcherMessageBatch::Watermark(watermark)
Expand Down
9 changes: 4 additions & 5 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use risingwave_pb::stream_plan::{
PbUpdateMutation, PbWatermark, ResumeMutation, SourceChangeSplitMutation, StopMutation,
SubscriptionUpstreamInfo, ThrottleMutation,
};
use smallvec::{smallvec, SmallVec};
use smallvec::SmallVec;

use crate::error::StreamResult;
use crate::task::{ActorId, FragmentId};
Expand Down Expand Up @@ -1047,24 +1047,23 @@ impl<M> MessageInner<M> {
pub type Message = MessageInner<BarrierMutationType>;
pub type DispatcherMessage = MessageInner<()>;

const EMPIRICAL_BARRIER_BATCH_SIZE: usize = 8;
/// `MessageBatchInner` is used exclusively by `Dispatcher` and the `Merger`/`Receiver` for exchanging messages between them.
/// It shares the same message type as the fundamental `MessageInner`, but batches multiple barriers into a single message.
#[derive(Debug, EnumAsInner, PartialEq, Clone)]
pub enum MessageBatchInner<M> {
Chunk(StreamChunk),
BarrierBatch(SmallVec<[BarrierInner<M>; EMPIRICAL_BARRIER_BATCH_SIZE]>),
BarrierBatch(Vec<BarrierInner<M>>),
Watermark(Watermark),
}
pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
pub type DispatcherBarriers = SmallVec<[DispatcherBarrier; EMPIRICAL_BARRIER_BATCH_SIZE]>;
pub type DispatcherBarriers = Vec<DispatcherBarrier>;
pub type DispatcherMessageBatch = MessageBatchInner<()>;

impl From<DispatcherMessage> for DispatcherMessageBatch {
fn from(m: DispatcherMessage) -> Self {
match m {
DispatcherMessage::Chunk(c) => Self::Chunk(c),
DispatcherMessage::Barrier(b) => Self::BarrierBatch(smallvec![b]),
DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
DispatcherMessage::Watermark(w) => Self::Watermark(w),
}
}
Expand Down

0 comments on commit a45de93

Please sign in to comment.