Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(stream): send barriers in batch when possible #19932

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,17 @@ message StreamMessage {
}
}

message StreamMessageBatch {
message BarrierBatch {
repeated Barrier barriers = 1;
}
oneof stream_message_batch {
data.StreamChunk stream_chunk = 1;
BarrierBatch barrier_batch = 2;
Watermark watermark = 3;
}
}

// Hash mapping for compute node. Stores mapping from virtual node to actor id.
message ActorMapping {
repeated uint32 original_indices = 1;
Expand Down
2 changes: 1 addition & 1 deletion proto/task_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
}

message GetStreamResponse {
stream_plan.StreamMessage message = 1;
stream_plan.StreamMessageBatch message = 1;

Check failure on line 110 in proto/task_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" with name "message" on message "GetStreamResponse" changed type from "stream_plan.StreamMessage" to "stream_plan.StreamMessageBatch".
// The number of permits acquired for this message, which should be sent back to the upstream with `add_permits`.
// In theory, this can also be derived from the message itself by the receiver. Here we send it explicitly to
// avoid any sense of inconsistency for the derivation logic, so the receiver can simply send it back verbatim.
Expand Down
8 changes: 8 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,10 @@ pub struct StreamingDeveloperConfig {
/// When true, all jdbc sinks with connector='jdbc' and jdbc.url="jdbc:postgresql://..."
/// will be switched from jdbc postgresql sinks to rust native (connector='postgres') sinks.
pub switch_jdbc_pg_to_native: bool,

/// The maximum number of consecutive barriers allowed in a message when sent between actors.
#[serde(default = "default::developer::stream_max_barrier_batch_size")]
zwang28 marked this conversation as resolved.
Show resolved Hide resolved
pub max_barrier_batch_size: u32,
}

/// The subsections `[batch.developer]`.
Expand Down Expand Up @@ -2013,6 +2017,10 @@ pub mod default {
32768
}

pub fn stream_max_barrier_batch_size() -> u32 {
1024
}

pub fn stream_hash_agg_max_dirty_groups_heap_size() -> usize {
64 << 20 // 64MB
}
Expand Down
4 changes: 2 additions & 2 deletions src/compute/src/rpc/service/exchange_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_pb::task_service::{
permits, GetDataRequest, GetDataResponse, GetStreamRequest, GetStreamResponse, PbPermits,
};
use risingwave_stream::executor::exchange::permit::{MessageWithPermits, Receiver};
use risingwave_stream::executor::DispatcherMessage;
use risingwave_stream::executor::DispatcherMessageBatch;
use risingwave_stream::task::LocalStreamManager;
use thiserror_ext::AsReport;
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -178,7 +178,7 @@ impl ExchangeServiceImpl {
message: Some(proto),
permits: Some(PbPermits { value: permits }),
};
let bytes = DispatcherMessage::get_encoded_len(&response);
let bytes = DispatcherMessageBatch::get_encoded_len(&response);

yield response;

Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ stream_exchange_connection_pool_size = 1
stream_enable_auto_schema_change = true
stream_enable_shared_source = true
stream_switch_jdbc_pg_to_native = false
stream_max_barrier_batch_size = 1024

[storage]
share_buffers_sync_parallelism = 1
Expand Down
Loading
Loading