From 997af794ab789a4d475acfa82a8413fc8b79a450 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Tue, 24 Dec 2024 14:50:47 +0800 Subject: [PATCH] refactor(stream): send barriers in batch when possible --- proto/stream_plan.proto | 12 + proto/task_service.proto | 2 +- .../src/rpc/service/exchange_service.rs | 4 +- src/stream/src/executor/dispatch.rs | 216 ++++++++++++------ src/stream/src/executor/exchange/input.rs | 31 ++- src/stream/src/executor/exchange/output.rs | 2 +- src/stream/src/executor/exchange/permit.rs | 11 +- src/stream/src/executor/integration_tests.rs | 8 +- src/stream/src/executor/mod.rs | 81 ++++--- .../src/executor/monitor/streaming_stats.rs | 9 + 10 files changed, 259 insertions(+), 117 deletions(-) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 8aabadfd2b74d..e34df875bdd6c 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -175,6 +175,18 @@ message StreamMessage { } } +message StreamMessageBatch { + message BarrierBatch { + repeated Barrier barriers = 1; + } + oneof stream_message_batch { + data.StreamChunk stream_chunk = 1; + BarrierBatch barrier_batch = 2; + Watermark watermark = 3; + } +} + + // Hash mapping for compute node. Stores mapping from virtual node to actor id. message ActorMapping { repeated uint32 original_indices = 1; diff --git a/proto/task_service.proto b/proto/task_service.proto index cb14ee809d943..c163d2175b27b 100644 --- a/proto/task_service.proto +++ b/proto/task_service.proto @@ -107,7 +107,7 @@ message GetStreamRequest { } message GetStreamResponse { - stream_plan.StreamMessage message = 1; + stream_plan.StreamMessageBatch message = 1; // The number of permits acquired for this message, which should be sent back to the upstream with `add_permits`. // In theory, this can also be derived from the message itself by the receiver. Here we send it explicitly to // avoid any sense of inconsistency for the derivation logic, so the receiver can simply send it back verbatim. diff --git a/src/compute/src/rpc/service/exchange_service.rs b/src/compute/src/rpc/service/exchange_service.rs index 7e76099edc3ab..18cf8b19185fc 100644 --- a/src/compute/src/rpc/service/exchange_service.rs +++ b/src/compute/src/rpc/service/exchange_service.rs @@ -25,7 +25,6 @@ use risingwave_pb::task_service::{ permits, GetDataRequest, GetDataResponse, GetStreamRequest, GetStreamResponse, PbPermits, }; use risingwave_stream::executor::exchange::permit::{MessageWithPermits, Receiver}; -use risingwave_stream::executor::DispatcherMessage; use risingwave_stream::task::LocalStreamManager; use thiserror_ext::AsReport; use tokio_stream::wrappers::ReceiverStream; @@ -172,13 +171,14 @@ impl ExchangeServiceImpl { permits.add_permits(permits_to_add); } Either::Right(MessageWithPermits { message, permits }) => { + use risingwave_stream::executor::DispatcherMessageBatch; let proto = message.to_protobuf(); // forward the acquired permit to the downstream let response = GetStreamResponse { message: Some(proto), permits: Some(PbPermits { value: permits }), }; - let bytes = DispatcherMessage::get_encoded_len(&response); + let bytes = DispatcherMessageBatch::get_encoded_len(&response); yield response; diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index 90e6ef9592194..5f5a59481d989 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -18,7 +18,7 @@ use std::iter::repeat_with; use std::ops::{Deref, DerefMut}; use std::time::Duration; -use futures::TryStreamExt; +use futures::{FutureExt, TryStreamExt}; use itertools::Itertools; use risingwave_common::array::Op; use risingwave_common::bitmap::BitmapBuilder; @@ -29,11 +29,13 @@ use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate; use risingwave_pb::stream_plan::PbDispatcher; use smallvec::{smallvec, SmallVec}; use tokio::time::Instant; +use tokio_stream::StreamExt; use tracing::{event, Instrument}; use super::exchange::output::{new_output, BoxedOutput}; use super::{ - AddMutation, DispatcherBarrier, DispatcherMessage, TroublemakerExecutor, UpdateMutation, + AddMutation, DispatcherBarriers, DispatcherMessageBatch, MessageBatch, + TroublemakerExecutor, UpdateMutation, }; use crate::executor::prelude::*; use crate::executor::StreamConsumer; @@ -110,56 +112,63 @@ struct DispatchExecutorInner { } impl DispatchExecutorInner { - async fn dispatch(&mut self, msg: Message) -> StreamResult<()> { + async fn dispatch(&mut self, msg: MessageBatch) -> StreamResult<()> { let limit = (self.context.config.developer).exchange_concurrent_dispatchers; - + // Only barrier can be batched for now. match msg { - Message::Watermark(watermark) => { + MessageBatch::BarrierBatch(barrier_batch) => { + if barrier_batch.is_empty() { + return Ok(()); + } + // Only the first barrier in a batch can be mutation. + let mutation = barrier_batch[0].mutation.clone(); + self.pre_mutate_dispatchers(&mutation)?; futures::stream::iter(self.dispatchers.iter_mut()) .map(Ok) .try_for_each_concurrent(limit, |dispatcher| async { let start_time = Instant::now(); - dispatcher.dispatch_watermark(watermark.clone()).await?; + dispatcher + .dispatch_barriers( + barrier_batch + .iter() + .cloned() + .map(|b| b.into_dispatcher()) + .collect(), + ) + .await?; dispatcher.record_output_buffer_blocking_duration(start_time.elapsed()); StreamResult::Ok(()) }) .await?; + self.post_mutate_dispatchers(&mutation)?; } - Message::Chunk(chunk) => { + MessageBatch::Watermark(watermark) => { futures::stream::iter(self.dispatchers.iter_mut()) .map(Ok) .try_for_each_concurrent(limit, |dispatcher| async { let start_time = Instant::now(); - dispatcher.dispatch_data(chunk.clone()).await?; + dispatcher.dispatch_watermark(watermark.clone()).await?; dispatcher.record_output_buffer_blocking_duration(start_time.elapsed()); StreamResult::Ok(()) }) .await?; - - self.metrics - .actor_out_record_cnt - .inc_by(chunk.cardinality() as _); } - Message::Barrier(barrier) => { - let mutation = barrier.mutation.clone(); - self.pre_mutate_dispatchers(&mutation)?; - + MessageBatch::Chunk(chunk) => { futures::stream::iter(self.dispatchers.iter_mut()) .map(Ok) .try_for_each_concurrent(limit, |dispatcher| async { let start_time = Instant::now(); - dispatcher - .dispatch_barrier(barrier.clone().into_dispatcher()) - .await?; + dispatcher.dispatch_data(chunk.clone()).await?; dispatcher.record_output_buffer_blocking_duration(start_time.elapsed()); StreamResult::Ok(()) }) .await?; - self.post_mutate_dispatchers(&mutation)?; + self.metrics + .actor_out_record_cnt + .inc_by(chunk.cardinality() as _); } - }; - + } Ok(()) } @@ -389,37 +398,82 @@ impl StreamConsumer for DispatchExecutor { type BarrierStream = impl Stream> + Send; fn execute(mut self: Box) -> Self::BarrierStream { + // TODO: use config + let barrier_batch_size = std::env::var("RW_ENABLE_BATCH_BARRIER_SIZE") + .map(|s| s.parse::().unwrap()) + .unwrap_or(1); #[try_stream] async move { - let input = self.input.execute(); - - #[for_await] - for msg in input { - let msg: Message = msg?; - let (barrier, span, tracing_span) = match msg { - Message::Chunk(_) => ( - None, - "dispatch_chunk", - tracing::info_span!("dispatch_chunk"), - ), - Message::Barrier(ref barrier) => ( - Some(barrier.clone()), - "dispatch_barrier", - tracing::info_span!("dispatch_barrier"), - ), - Message::Watermark(_) => ( - None, - "dispatch_watermark", - tracing::info_span!("dispatch_watermark"), - ), + let mut input = self.input.execute().peekable(); + let mut end_of_stream = false; + while !end_of_stream { + let Some(msg) = input.next().await else { + end_of_stream = true; + continue; }; + let mut barrier_batch = vec![]; + let msg: Message = msg?; + let end_batch; + if let Message::Barrier(ref barrier) = msg { + end_batch = barrier.mutation.is_some(); + barrier_batch.push(barrier.clone()); + } else { + let (msg, span, tracing_span) = match msg { + Message::Chunk(c) => ( + MessageBatch::Chunk(c), + "dispatch_chunk", + tracing::info_span!("dispatch_chunk"), + ), + Message::Watermark(w) => ( + MessageBatch::Watermark(w), + "dispatch_watermark", + tracing::info_span!("dispatch_watermark"), + ), + Message::Barrier(_) => unreachable!(""), + }; + self.inner + .dispatch(msg) + .instrument(tracing_span) + .instrument_await(span) + .await?; + continue; + } + // Try to peek more consecutive non-mutation barriers. + let b = if end_batch { 0 } else { barrier_batch_size }; + for _ in 0..b { + let peek = input.peek().now_or_never(); + let Some(peek) = peek else { + break; + }; + let Some(msg) = peek else { + end_of_stream = true; + break; + }; + let Ok(Message::Barrier(barrier)) = msg else { + break; + }; + if barrier.mutation.is_some() { + break; + } + let msg: Message = input.next().await.unwrap()?; + let Message::Barrier(ref barrier) = msg else { + unreachable!(""); + }; + barrier_batch.push(barrier.clone()); + } + assert!(!barrier_batch.is_empty()); self.inner - .dispatch(msg) - .instrument(tracing_span) - .instrument_await(span) + .dispatch(MessageBatch::BarrierBatch(barrier_batch.clone())) + .instrument(tracing::info_span!("dispatch_barrier_batch")) + .instrument_await("dispatch_barrier_batch") .await?; - if let Some(barrier) = barrier { + self.inner + .metrics + .metrics + .barrier_batch_size + .observe(barrier_batch.len() as f64); + for barrier in barrier_batch { yield barrier; } } @@ -503,9 +557,9 @@ macro_rules! impl_dispatcher { } } - pub async fn dispatch_barrier(&mut self, barrier: DispatcherBarrier) -> StreamResult<()> { + pub async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> { match self { - $( Self::$variant_name(inner) => inner.dispatch_barrier(barrier).await, )* + $( Self::$variant_name(inner) => inner.dispatch_barriers(barriers).await, )* } } @@ -566,8 +620,8 @@ pub trait DispatchFuture<'a> = Future> + Send; pub trait Dispatcher: Debug + 'static { /// Dispatch a data chunk to downstream actors. fn dispatch_data(&mut self, chunk: StreamChunk) -> impl DispatchFuture<'_>; - /// Dispatch a barrier to downstream actors, generally by broadcasting it. - fn dispatch_barrier(&mut self, barrier: DispatcherBarrier) -> impl DispatchFuture<'_>; + /// Dispatch barriers to downstream actors, generally by broadcasting it. + fn dispatch_barriers(&mut self, barrier: DispatcherBarriers) -> impl DispatchFuture<'_>; /// Dispatch a watermark to downstream actors, generally by broadcasting it. fn dispatch_watermark(&mut self, watermark: Watermark) -> impl DispatchFuture<'_>; @@ -597,7 +651,7 @@ pub trait Dispatcher: Debug + 'static { /// always unlimited. async fn broadcast_concurrent( outputs: impl IntoIterator, - message: DispatcherMessage, + message: DispatcherMessageBatch, ) -> StreamResult<()> { futures::future::try_join_all( outputs @@ -644,23 +698,30 @@ impl Dispatcher for RoundRobinDataDispatcher { }; self.outputs[self.cur] - .send(DispatcherMessage::Chunk(chunk)) + .send(DispatcherMessageBatch::Chunk(chunk)) .await?; self.cur += 1; self.cur %= self.outputs.len(); Ok(()) } - async fn dispatch_barrier(&mut self, barrier: DispatcherBarrier) -> StreamResult<()> { + async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> { // always broadcast barrier - broadcast_concurrent(&mut self.outputs, DispatcherMessage::Barrier(barrier)).await + broadcast_concurrent( + &mut self.outputs, + DispatcherMessageBatch::BarrierBatch(barriers), + ) + .await } async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> { if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) { // always broadcast watermark - broadcast_concurrent(&mut self.outputs, DispatcherMessage::Watermark(watermark)) - .await?; + broadcast_concurrent( + &mut self.outputs, + DispatcherMessageBatch::Watermark(watermark), + ) + .await?; } Ok(()) } @@ -734,16 +795,23 @@ impl Dispatcher for HashDataDispatcher { self.outputs.extend(outputs); } - async fn dispatch_barrier(&mut self, barrier: DispatcherBarrier) -> StreamResult<()> { + async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> { // always broadcast barrier - broadcast_concurrent(&mut self.outputs, DispatcherMessage::Barrier(barrier)).await + broadcast_concurrent( + &mut self.outputs, + DispatcherMessageBatch::BarrierBatch(barriers), + ) + .await } async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> { if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) { // always broadcast watermark - broadcast_concurrent(&mut self.outputs, DispatcherMessage::Watermark(watermark)) - .await?; + broadcast_concurrent( + &mut self.outputs, + DispatcherMessageBatch::Watermark(watermark), + ) + .await?; } Ok(()) } @@ -842,7 +910,7 @@ impl Dispatcher for HashDataDispatcher { new_stream_chunk ); output - .send(DispatcherMessage::Chunk(new_stream_chunk)) + .send(DispatcherMessageBatch::Chunk(new_stream_chunk)) .await?; } StreamResult::Ok(()) @@ -913,14 +981,18 @@ impl Dispatcher for BroadcastDispatcher { } else { chunk.project(&self.output_indices) }; - broadcast_concurrent(self.outputs.values_mut(), DispatcherMessage::Chunk(chunk)).await + broadcast_concurrent( + self.outputs.values_mut(), + DispatcherMessageBatch::Chunk(chunk), + ) + .await } - async fn dispatch_barrier(&mut self, barrier: DispatcherBarrier) -> StreamResult<()> { + async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> { // always broadcast barrier broadcast_concurrent( self.outputs.values_mut(), - DispatcherMessage::Barrier(barrier), + DispatcherMessageBatch::BarrierBatch(barriers), ) .await } @@ -930,7 +1002,7 @@ impl Dispatcher for BroadcastDispatcher { // always broadcast watermark broadcast_concurrent( self.outputs.values_mut(), - DispatcherMessage::Watermark(watermark), + DispatcherMessageBatch::Watermark(watermark), ) .await?; } @@ -1003,11 +1075,11 @@ impl Dispatcher for SimpleDispatcher { assert!(self.output.len() <= 2); } - async fn dispatch_barrier(&mut self, barrier: DispatcherBarrier) -> StreamResult<()> { + async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> { // Only barrier is allowed to be dispatched to multiple outputs during migration. for output in &mut self.output { output - .send(DispatcherMessage::Barrier(barrier.clone())) + .send(DispatcherMessageBatch::BarrierBatch(barriers.clone())) .await?; } Ok(()) @@ -1027,7 +1099,7 @@ impl Dispatcher for SimpleDispatcher { } else { chunk.project(&self.output_indices) }; - output.send(DispatcherMessage::Chunk(chunk)).await + output.send(DispatcherMessageBatch::Chunk(chunk)).await } async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> { @@ -1038,7 +1110,9 @@ impl Dispatcher for SimpleDispatcher { .expect("expect exactly one output"); if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) { - output.send(DispatcherMessage::Watermark(watermark)).await?; + output + .send(DispatcherMessageBatch::Watermark(watermark)) + .await?; } Ok(()) } @@ -1464,7 +1538,7 @@ mod tests { } else { let message = guard.first().unwrap(); let real_chunk = match message { - DispatcherMessage::Chunk(chunk) => chunk, + DispatcherMessageBatch::Chunk(chunk) => chunk, _ => panic!(), }; real_chunk diff --git a/src/stream/src/executor/exchange/input.rs b/src/stream/src/executor/exchange/input.rs index 1c25eab15256f..2c446de4bcf2d 100644 --- a/src/stream/src/executor/exchange/input.rs +++ b/src/stream/src/executor/exchange/input.rs @@ -25,8 +25,8 @@ use tokio::sync::mpsc; use super::permit::Receiver; use crate::executor::prelude::*; use crate::executor::{ - BarrierInner, DispatcherBarrier, DispatcherMessage, DispatcherMessageStream, - DispatcherMessageStreamItem, + BarrierInner, DispatcherBarrier, DispatcherMessage, DispatcherMessageBatch, + DispatcherMessageStream, DispatcherMessageStreamItem, }; use crate::task::{FragmentId, SharedContext, UpDownActorIds, UpDownFragmentIds}; @@ -128,7 +128,9 @@ mod local_input { async fn run_inner(mut channel: Receiver, upstream_actor_id: ActorId) { let span: await_tree::Span = format!("LocalInput (actor {upstream_actor_id})").into(); while let Some(msg) = channel.recv().verbose_instrument_await(span.clone()).await { - yield msg; + for m in msg.into_messages() { + yield m; + } } // Always emit an error outside the loop. This is because we use barrier as the control // message to stop the stream. Reaching here means the channel is closed unexpectedly. @@ -266,12 +268,13 @@ mod remote_input { while let Some(data_res) = stream.next().verbose_instrument_await(span.clone()).await { match data_res { Ok(GetStreamResponse { message, permits }) => { + use crate::executor::DispatcherMessageBatch; let msg = message.unwrap(); - let bytes = DispatcherMessage::get_encoded_len(&msg); + let bytes = DispatcherMessageBatch::get_encoded_len(&msg); exchange_frag_recv_size_metrics.inc_by(bytes as u64); - let msg_res = DispatcherMessage::from_protobuf(&msg); + let msg_res = DispatcherMessageBatch::from_protobuf(&msg); if let Some(add_back_permits) = match permits.unwrap().value { // For records, batch the permits we received to reduce the backward // `AddPermits` messages. @@ -294,8 +297,9 @@ mod remote_input { } let msg = msg_res.context("RemoteInput decode message error")?; - - yield msg; + for m in msg.into_messages() { + yield m; + } } Err(e) => Err(ExchangeChannelClosed::remote_input(up_down_ids.0, Some(e)))?, @@ -358,3 +362,16 @@ pub(crate) fn new_input( Ok(input) } + +impl DispatcherMessageBatch { + fn into_messages(self) -> Vec { + match self { + DispatcherMessageBatch::BarrierBatch(barriers) => barriers + .into_iter() + .map(|b| DispatcherMessage::Barrier(b)) + .collect(), + DispatcherMessageBatch::Chunk(c) => vec![DispatcherMessage::Chunk(c)], + DispatcherMessageBatch::Watermark(w) => vec![DispatcherMessage::Watermark(w)], + } + } +} diff --git a/src/stream/src/executor/exchange/output.rs b/src/stream/src/executor/exchange/output.rs index 145286f561e17..6d5129516f1c4 100644 --- a/src/stream/src/executor/exchange/output.rs +++ b/src/stream/src/executor/exchange/output.rs @@ -22,7 +22,7 @@ use risingwave_common::util::addr::is_local_address; use super::error::ExchangeChannelClosed; use super::permit::Sender; use crate::error::StreamResult; -use crate::executor::DispatcherMessage as Message; +use crate::executor::DispatcherMessageBatch as Message; use crate::task::{ActorId, SharedContext}; /// `Output` provides an interface for `Dispatcher` to send data into downstream actors. diff --git a/src/stream/src/executor/exchange/permit.rs b/src/stream/src/executor/exchange/permit.rs index 8c86eb2753811..5696bbfc0f4bb 100644 --- a/src/stream/src/executor/exchange/permit.rs +++ b/src/stream/src/executor/exchange/permit.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use risingwave_pb::task_service::permits; use tokio::sync::{mpsc, AcquireError, Semaphore, SemaphorePermit}; -use crate::executor::DispatcherMessage as Message; +use crate::executor::DispatcherMessageBatch as Message; /// Message with its required permits. /// @@ -132,7 +132,7 @@ impl Sender { } Some(permits::Value::Record(card as _)) } - Message::Barrier(_) => Some(permits::Value::Barrier(1)), + Message::BarrierBatch(_) => Some(permits::Value::Barrier(1)), Message::Watermark(_) => None, }; @@ -221,13 +221,16 @@ mod tests { let (tx, mut rx) = channel(0, 0, 1); let send = || { - tx.send(Message::Barrier(Barrier::with_prev_epoch_for_test( + tx.send(Message::BarrierBatch(Barrier::with_prev_epoch_for_test( 514, 114, ))) }; assert_matches!(send().now_or_never(), Some(Ok(_))); // send successfully - assert_matches!(rx.recv().now_or_never(), Some(Some(Message::Barrier(_)))); // recv successfully + assert_matches!( + rx.recv().now_or_never(), + Some(Some(Message::BarrierBatch(_))) + ); // recv successfully assert_matches!(send().now_or_never(), Some(Ok(_))); // send successfully // do not recv, so that the channel is full diff --git a/src/stream/src/executor/integration_tests.rs b/src/stream/src/executor/integration_tests.rs index 1d0c58acfef80..605d5792da78f 100644 --- a/src/stream/src/executor/integration_tests.rs +++ b/src/stream/src/executor/integration_tests.rs @@ -357,11 +357,13 @@ impl StreamConsumer for SenderConsumer { channel .send(match msg { - Message::Chunk(chunk) => DispatcherMessage::Chunk(chunk), + Message::Chunk(chunk) => DispatcherMessageBatch::Chunk(chunk), Message::Barrier(barrier) => { - DispatcherMessage::Barrier(barrier.into_dispatcher()) + DispatcherMessageBatch::BarrierBatch(vec![barrier.into_dispatcher()]) + } + Message::Watermark(watermark) => { + DispatcherMessageBatch::Watermark(watermark) } - Message::Watermark(watermark) => DispatcherMessage::Watermark(watermark), }) .await .expect("failed to send message"); diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 0e5e7862e9605..f5acfecd6ff72 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -37,13 +37,12 @@ use risingwave_pb::data::PbEpoch; use risingwave_pb::expr::PbInputRef; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation; -use risingwave_pb::stream_plan::stream_message::StreamMessage; use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate}; use risingwave_pb::stream_plan::{ BarrierMutation, CombinedMutation, Dispatchers, DropSubscriptionsMutation, PauseMutation, - PbAddMutation, PbBarrier, PbBarrierMutation, PbDispatcher, PbStreamMessage, PbUpdateMutation, - PbWatermark, ResumeMutation, SourceChangeSplitMutation, StopMutation, SubscriptionUpstreamInfo, - ThrottleMutation, + PbAddMutation, PbBarrier, PbBarrierMutation, PbDispatcher, PbStreamMessageBatch, + PbUpdateMutation, PbWatermark, ResumeMutation, SourceChangeSplitMutation, StopMutation, + SubscriptionUpstreamInfo, ThrottleMutation, }; use smallvec::SmallVec; @@ -169,10 +168,11 @@ use self::barrier_align::AlignedMessageStream; pub type MessageStreamItemInner = StreamExecutorResult>; pub type MessageStreamItem = MessageStreamItemInner; -pub type DispatcherMessageStreamItem = MessageStreamItemInner<()>; +pub type DispatcherMessageStreamItem = StreamExecutorResult; pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>; pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch}; +use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch}; use risingwave_pb::stream_plan::throttle_mutation::RateLimit; pub trait MessageStreamInner = Stream> + Send; @@ -1046,6 +1046,20 @@ impl MessageInner { pub type Message = MessageInner; pub type DispatcherMessage = MessageInner<()>; +// pub type DispatcherMessage = MessageInner<()>; +#[derive(Debug, EnumAsInner, PartialEq, Clone)] +pub enum MessageBatch { + Chunk(StreamChunk), + BarrierBatch(Vec>), + Watermark(Watermark), +} +pub type DispatcherBarriers = Vec; +#[derive(Debug, EnumAsInner, PartialEq, Clone)] +pub enum DispatcherMessageBatch { + Chunk(StreamChunk), + BarrierBatch(Vec>), + Watermark(Watermark), +} impl From for Message { fn from(chunk: StreamChunk) -> Self { @@ -1082,37 +1096,48 @@ impl Message { } } -impl DispatcherMessage { - pub fn to_protobuf(&self) -> PbStreamMessage { +impl DispatcherMessageBatch { + pub fn to_protobuf(&self) -> PbStreamMessageBatch { let prost = match self { Self::Chunk(stream_chunk) => { let prost_stream_chunk = stream_chunk.to_protobuf(); - StreamMessage::StreamChunk(prost_stream_chunk) + StreamMessageBatch::StreamChunk(prost_stream_chunk) } - Self::Barrier(barrier) => StreamMessage::Barrier(barrier.clone().to_protobuf()), - Self::Watermark(watermark) => StreamMessage::Watermark(watermark.to_protobuf()), + Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch { + barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(), + }), + Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()), }; - PbStreamMessage { - stream_message: Some(prost), + PbStreamMessageBatch { + stream_message_batch: Some(prost), } } - pub fn from_protobuf(prost: &PbStreamMessage) -> StreamExecutorResult { - let res = match prost.get_stream_message()? { - StreamMessage::StreamChunk(chunk) => Self::Chunk(StreamChunk::from_protobuf(chunk)?), - StreamMessage::Barrier(barrier) => Self::Barrier( - DispatcherBarrier::from_protobuf_inner(barrier, |mutation| { - if mutation.is_some() { - if cfg!(debug_assertions) { - panic!("should not receive message of barrier with mutation"); - } else { - warn!(?barrier, "receive message of barrier with mutation"); - } - } - Ok(()) - })?, - ), - StreamMessage::Watermark(watermark) => { + pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult { + let res = match prost.get_stream_message_batch()? { + StreamMessageBatch::StreamChunk(chunk) => { + Self::Chunk(StreamChunk::from_protobuf(chunk)?) + } + StreamMessageBatch::BarrierBatch(barrier_batch) => { + let barriers = barrier_batch + .barriers + .iter() + .map(|barrier| { + DispatcherBarrier::from_protobuf_inner(barrier, |mutation| { + if mutation.is_some() { + if cfg!(debug_assertions) { + panic!("should not receive message of barrier with mutation"); + } else { + warn!(?barrier, "receive message of barrier with mutation"); + } + } + Ok(()) + }) + }) + .try_collect()?; + Self::BarrierBatch(barriers) + } + StreamMessageBatch::Watermark(watermark) => { Self::Watermark(Watermark::from_protobuf(watermark)?) } }; diff --git a/src/stream/src/executor/monitor/streaming_stats.rs b/src/stream/src/executor/monitor/streaming_stats.rs index ccefd82cfd2e9..f59382ff80884 100644 --- a/src/stream/src/executor/monitor/streaming_stats.rs +++ b/src/stream/src/executor/monitor/streaming_stats.rs @@ -160,6 +160,7 @@ pub struct StreamingMetrics { pub barrier_inflight_latency: Histogram, /// The duration of sync to storage. pub barrier_sync_latency: Histogram, + pub barrier_batch_size: Histogram, /// The progress made by the earliest in-flight barriers in the local barrier manager. pub barrier_manager_progress: IntCounter, @@ -847,6 +848,13 @@ impl StreamingMetrics { ); let barrier_sync_latency = register_histogram_with_registry!(opts, registry).unwrap(); + let opts = histogram_opts!( + "stream_barrier_batch_size", + "barrier_batch_size", + exponential_buckets(1.0, 2.0, 8).unwrap() + ); + let barrier_batch_size = register_histogram_with_registry!(opts, registry).unwrap(); + let barrier_manager_progress = register_int_counter_with_registry!( "stream_barrier_manager_progress", "The number of actors that have processed the earliest in-flight barriers", @@ -1129,6 +1137,7 @@ impl StreamingMetrics { over_window_same_output_count, barrier_inflight_latency, barrier_sync_latency, + barrier_batch_size, barrier_manager_progress, kv_log_store_storage_write_count, kv_log_store_storage_write_size,