[feature] Support sink.committer-coordinator-operator.enabled#8372
[feature] Support sink.committer-coordinator-operator.enabled#8372fishfishfishfishaa wants to merge 3 commits into
Conversation
| TestContext context = createContext(); | ||
| writeRecords(context.inputDirectory, 0, 20); | ||
|
|
||
| // 先启动流作业并完成一次正常 checkpoint,确保 coordinator 已经完成一次提交。 |
… comment & e2e time out set 180
ifndef-SleePy
left a comment
There was a problem hiding this comment.
Thanks for contributing. I went through this revision roughly. There are several architectural design issues.
1. Writer checkpoint processing is still coupled with coordinator response
The writer still waits synchronously for the coordinator during checkpoint processing. This puts JM-side latency back on the writer checkpoint path and can become a serious bottleneck for large-parallelism streaming jobs.
2. The failover protocol is too complex and still has coordinator recovery bugs
This PR introduces many extra protocol states to handle failover issues. I am not sure I fully understand all these behaviors, but I think it is hard to cover all corner cases when checkpoint completion/abort, writer task failover, and
JM/coordinator failover happen at the same time.
However, I am pretty sure the coordinator start() and resetToCheckpoint() lifecycle is not correct when global failover happens. I think it should be easy to reproduce the bug by running a streaming job with checkpointing enabled and killing
the JM after some checkpoints have completed successfully. The OperatorCoordinator failover scenario is much more complex than the assumption made in this PR.
3. Coordinator state does not include CommitListener state
The coordinator checkpoint state only persists commitUser. Once commit runs in the coordinator, the CommitListener state also belongs to the coordinator side, but it is not persisted in the coordinator state.
| new SerializedValue<CoordinationRequest>(request); | ||
| FileInfoReceivedResponse response = | ||
| CoordinationResponseUtils.unwrap( | ||
| gateway.sendRequestToCoordinator(operatorId, serializedRequest).get()); |
There was a problem hiding this comment.
This is a synchronous call on the writer side. The writer thread will block here until the coordinator processes the request and returns the ACK.
In high-parallelism jobs, this can become a serious performance bottleneck because checkpoint processing now depends on a synchronous JM-side request/response path.
| freshInstance = false; | ||
| LOG.info("PWC snapshot commitUser={}, checkpointId={}", commitUser, checkpointId); | ||
| checkState(commitUser != null, "PWC has not been started."); | ||
| result.complete(serializeCoordinatorState(commitUser)); |
There was a problem hiding this comment.
The coordinator checkpoint state only contains commitUser. Any committer/listener state owned by the coordinator-side committer is not persisted here.
| public void resetToCheckpoint(long checkpointId, byte[] bytes) throws Exception { | ||
| LOG.info("PWC resetToCheckpoint: checkpointId={}, fresh={}", checkpointId, freshInstance); | ||
| if (freshInstance && checkpointId >= 0) { | ||
| checkState(!started, "PWC can only be restored before it is started."); |
There was a problem hiding this comment.
This does not cover the lifecycle where resetToCheckpoint() is invoked after start().
|
If you don't mind, I will open the second PR under #8220. You can compare it with this PR and check whether the concerns I raised above are valid. Also, I think it would be helpful to communicate through the mailing list, GitHub issue, or any other convenient place before coding. That should help reduce duplicated work. |
| long maxWatermark() { | ||
| long watermark = Long.MIN_VALUE; | ||
| for (SubtaskFileInfo fileInfo : fileInfos.values()) { | ||
| watermark = Math.max(watermark, fileInfo.request().watermark()); |
There was a problem hiding this comment.
This should preserve Flink input-watermark semantics and aggregate the writer watermarks with min, not max. The previous single committer operator only advanced its input watermark after all upstream channels advanced, so a checkpoint with writer watermarks [1000, 0] would commit watermark 0. With max here the coordinator can commit watermark 1000, which may advance snapshot watermark / partition-mark-done too early while slower writers can still produce older partitions.
|
|
||
| private void removePendingSubtask(int subtask) { | ||
| for (PendingCheckpoint checkpoint : checkpoints.values()) { | ||
| if (!checkpoint.staged()) { |
There was a problem hiding this comment.
Staged checkpoints from this attempt still need to be invalidated if the attempt fails before the checkpoint completes. The coordinator stages file info as soon as all requests arrive, but the Flink checkpoint can still abort afterwards. With parallelism 1, subtask 0 can send CK1 and have CK1 marked staged, then fail before CK1 completes; removePendingSubtask leaves the staged CK1 files here. The replacement attempt restores from the previous completed checkpoint and replays the same records into CK2, and completing CK2 then commits both the stale CK1 files from the failed attempt and the replayed CK2 files. Please drop staged data produced by failed attempts until the corresponding checkpoint has completed, or reset the coordinator state for this failure path.
Description
This PR implements PIP-30: Improvement For Paimon Committer In Flink.
One important background is that, for a given Flink checkpoint, the coordinator checkpoint is triggered before writer tasks start their checkpoint. Because of this ordering, I first tried a minimal prototype which maintained coordinator state through a custom state mechanism when Flink did not expose checkpoint state for this use case: #7963.
However, that approach introduced several problems, including two independent state systems, higher complexity, extra dependency on HDFS, and additional blocking while waiting for custom state to be persisted to HDFS.
Thanks to @liubiao for proposing the current design:
https://docs.google.com/document/d/1asWKzoytfeB1D8bS_yRIAHnpR40frLf0fnPn2-WSL74/edit?tab=t.0
The key idea is to move the durable file-info state to the writer task side, so it is persisted together with Flink checkpoint state. Meanwhile, PWC keeps an in-memory pending state for received file info. If the coordinator itself fails over, the in-memory committable map can be rebuilt by writer tasks replaying their pending file info from writer state. The coordinator only persists necessary metadata through the normal coordinator checkpoint interface, such as
commitUser.Following this direction, this PR focuses the optimization on the unaware bucket append path, while keeping the original
FixedBucketSinkpath unaffected.Design
When the option is enabled, writer tasks still flush data and produce committables during
prepareSnapshotPreBarrier.In addition, each writer task records the produced file info in writer-side state in a
checkpointId -> fileInfoform. This state is part of Flink operator state and is persisted together with Flink checkpoints.During
snapshotState:PaimonWriterCoordinatorthroughFileInfoRequest.snapshotState.If the request fails, writer snapshot fails and the checkpoint cannot complete. If the request blocks for too long, it is guarded by Flink checkpoint timeout.
On the PWC side:
notifyCheckpointComplete, and PWC commits all committables up to that checkpoint.After commit succeeds, PWC sends
CommitCompleteEventto active writer subtasks. Writers use this event to clean local pending state up to the committed checkpoint, preventing writer-side state from growing forever.Recovery Semantics
Writer Task Failover
When a writer task fails over, the new attempt restores writer state in
initializeState.The recovered writer then sends its pending file info to PWC through the same request path. PWC tracks valid execution attempts through
executionAttemptReadyandexecutionAttemptFailed, so stale attempt requests can be rejected, and pending file info from failed attempts can be cleaned or replaced.For recovered file info, PWC waits until all recovered writer requests are received, and then calls
filterAndCommitUpToCheckpoint. If this commit creates a new snapshot, PWC treats it as an incomplete previous commit and triggers one extra failover, following the original recommit semantics.JM / PWC Failover
For JM / PWC failover, PWC does not persist the full in-memory
committable mapas coordinator state.Instead, PWC only restores necessary coordinator metadata from Flink coordinator checkpoint state, such as
commitUser. The actual pending file info is restored from writer task state. After recovery, writer tasks replay their pending file info throughFileInfoRequest, and PWC rebuilds its in-memory committable map from these recovered requests.In other words, PWC pending commit state is recovered by writer-state replay, not by independently persisting the full coordinator-side committable map.
Checkpoint Abort Without Task Failure
If a checkpoint is aborted but writer tasks do not fail, already ACKed file info is kept as reliable pending state in PWC memory.
A later successful checkpoint envelope can stage and commit this file info. This preserves the original semantics that checkpoint abort does not mean data rollback.
Scope
This PR focuses on the unaware bucket append path, especially
RowAppendTableSink, where removing the committer operator can help reduce the failover region.It does not change the original
FixedBucketSinkcommit path. Fixed bucket and other paths involving shuffle, assigner, index, or compact operators are not included in this optimization, because those operators may already bind regions together or require different recovery semantics.Tests
This PR adds and updates tests for the new PWC commit path.
Unit tests cover:
FileInfoRequestrequest / ACK behavior;IT coverage verifies coordinator commit behavior and ensures non-target sink paths are not affected.
E2E coverage verifies: