Skip to content

Commit

Permalink
refactor(stream): send barriers in batch when possible
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Dec 25, 2024
1 parent 81f651f commit 997af79
Show file tree
Hide file tree
Showing 10 changed files with 259 additions and 117 deletions.
12 changes: 12 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,18 @@ message StreamMessage {
}
}

message StreamMessageBatch {
message BarrierBatch {
repeated Barrier barriers = 1;
}
oneof stream_message_batch {
data.StreamChunk stream_chunk = 1;
BarrierBatch barrier_batch = 2;
Watermark watermark = 3;
}
}


// Hash mapping for compute node. Stores mapping from virtual node to actor id.
message ActorMapping {
repeated uint32 original_indices = 1;
Expand Down
2 changes: 1 addition & 1 deletion proto/task_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ message GetStreamRequest {
}

message GetStreamResponse {
stream_plan.StreamMessage message = 1;
stream_plan.StreamMessageBatch message = 1;

Check failure on line 110 in proto/task_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" with name "message" on message "GetStreamResponse" changed type from "stream_plan.StreamMessage" to "stream_plan.StreamMessageBatch".
// The number of permits acquired for this message, which should be sent back to the upstream with `add_permits`.
// In theory, this can also be derived from the message itself by the receiver. Here we send it explicitly to
// avoid any sense of inconsistency for the derivation logic, so the receiver can simply send it back verbatim.
Expand Down
4 changes: 2 additions & 2 deletions src/compute/src/rpc/service/exchange_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use risingwave_pb::task_service::{
permits, GetDataRequest, GetDataResponse, GetStreamRequest, GetStreamResponse, PbPermits,
};
use risingwave_stream::executor::exchange::permit::{MessageWithPermits, Receiver};
use risingwave_stream::executor::DispatcherMessage;
use risingwave_stream::task::LocalStreamManager;
use thiserror_ext::AsReport;
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -172,13 +171,14 @@ impl ExchangeServiceImpl {
permits.add_permits(permits_to_add);
}
Either::Right(MessageWithPermits { message, permits }) => {
use risingwave_stream::executor::DispatcherMessageBatch;
let proto = message.to_protobuf();
// forward the acquired permit to the downstream
let response = GetStreamResponse {
message: Some(proto),
permits: Some(PbPermits { value: permits }),
};
let bytes = DispatcherMessage::get_encoded_len(&response);
let bytes = DispatcherMessageBatch::get_encoded_len(&response);

yield response;

Expand Down
Loading

0 comments on commit 997af79

Please sign in to comment.