Skip to content

[feature] Support sink.committer-coordinator-operator.enabled#8372

Open
fishfishfishfishaa wants to merge 3 commits into
apache:masterfrom
fishfishfishfishaa:pip30-redesign-0628
Open

[feature] Support sink.committer-coordinator-operator.enabled#8372
fishfishfishfishaa wants to merge 3 commits into
apache:masterfrom
fishfishfishfishaa:pip30-redesign-0628

Conversation

@fishfishfishfishaa

Copy link
Copy Markdown

Description

This PR implements PIP-30: Improvement For Paimon Committer In Flink.

One important background is that, for a given Flink checkpoint, the coordinator checkpoint is triggered before writer tasks start their checkpoint. Because of this ordering, I first tried a minimal prototype which maintained coordinator state through a custom state mechanism when Flink did not expose checkpoint state for this use case: #7963.

However, that approach introduced several problems, including two independent state systems, higher complexity, extra dependency on HDFS, and additional blocking while waiting for custom state to be persisted to HDFS.

Thanks to @liubiao for proposing the current design:
https://docs.google.com/document/d/1asWKzoytfeB1D8bS_yRIAHnpR40frLf0fnPn2-WSL74/edit?tab=t.0

The key idea is to move the durable file-info state to the writer task side, so it is persisted together with Flink checkpoint state. Meanwhile, PWC keeps an in-memory pending state for received file info. If the coordinator itself fails over, the in-memory committable map can be rebuilt by writer tasks replaying their pending file info from writer state. The coordinator only persists necessary metadata through the normal coordinator checkpoint interface, such as commitUser.

Following this direction, this PR focuses the optimization on the unaware bucket append path, while keeping the original FixedBucketSink path unaffected.

Design

When the option is enabled, writer tasks still flush data and produce committables during prepareSnapshotPreBarrier.

In addition, each writer task records the produced file info in writer-side state in a checkpointId -> fileInfo form. This state is part of Flink operator state and is persisted together with Flink checkpoints.

During snapshotState:

  1. The writer first updates its local pending file-info state.
  2. Then it sends the unacknowledged file info to PaimonWriterCoordinator through FileInfoRequest.
  3. The writer waits for the PWC ACK before returning from snapshotState.

If the request fails, writer snapshot fails and the checkpoint cannot complete. If the request blocks for too long, it is guarded by Flink checkpoint timeout.

On the PWC side:

  1. PWC validates whether the request comes from a valid registered subtask attempt.
  2. PWC stores the received file info as reliable in-memory pending state.
  3. If the current checkpoint envelope has not received all subtasks, PWC replies ACK immediately.
  4. If the current request is from the last subtask of the checkpoint envelope, PWC stages pending file info up to this checkpoint into the committable map, and then replies ACK.
  5. Later, Flink checkpoint coordinator triggers notifyCheckpointComplete, and PWC commits all committables up to that checkpoint.

After commit succeeds, PWC sends CommitCompleteEvent to active writer subtasks. Writers use this event to clean local pending state up to the committed checkpoint, preventing writer-side state from growing forever.

Recovery Semantics

Writer Task Failover

When a writer task fails over, the new attempt restores writer state in initializeState.

The recovered writer then sends its pending file info to PWC through the same request path. PWC tracks valid execution attempts through executionAttemptReady and executionAttemptFailed, so stale attempt requests can be rejected, and pending file info from failed attempts can be cleaned or replaced.

For recovered file info, PWC waits until all recovered writer requests are received, and then calls filterAndCommitUpToCheckpoint. If this commit creates a new snapshot, PWC treats it as an incomplete previous commit and triggers one extra failover, following the original recommit semantics.

JM / PWC Failover

For JM / PWC failover, PWC does not persist the full in-memory committable map as coordinator state.

Instead, PWC only restores necessary coordinator metadata from Flink coordinator checkpoint state, such as commitUser. The actual pending file info is restored from writer task state. After recovery, writer tasks replay their pending file info through FileInfoRequest, and PWC rebuilds its in-memory committable map from these recovered requests.

In other words, PWC pending commit state is recovered by writer-state replay, not by independently persisting the full coordinator-side committable map.

Checkpoint Abort Without Task Failure

If a checkpoint is aborted but writer tasks do not fail, already ACKed file info is kept as reliable pending state in PWC memory.

A later successful checkpoint envelope can stage and commit this file info. This preserves the original semantics that checkpoint abort does not mean data rollback.

Scope

This PR focuses on the unaware bucket append path, especially RowAppendTableSink, where removing the committer operator can help reduce the failover region.

It does not change the original FixedBucketSink commit path. Fixed bucket and other paths involving shuffle, assigner, index, or compact operators are not included in this optimization, because those operators may already bind regions together or require different recovery semantics.

Tests

This PR adds and updates tests for the new PWC commit path.

Unit tests cover:

  • writer-side coordinated committable state and ACK tracking;
  • FileInfoRequest request / ACK behavior;
  • checkpoint complete and checkpoint abort handling in PWC;
  • recovered file info and recommit behavior;
  • stale attempt validation;
  • writer operator interaction with the coordinated file-info sender.

IT coverage verifies coordinator commit behavior and ensures non-target sink paths are not affected.

E2E coverage verifies:

  • committing through PWC without the original committer / compact operators in the job plan;
  • checkpoint abort without task failure, followed by successful commit through a later checkpoint;
  • TaskManager failure / region failover behavior;
  • savepoint restore replaying pending file info.

@Mrart

Mrart commented Jun 29, 2026

Copy link
Copy Markdown

@ifndef-SleePy @JingsongLi PTKL

TestContext context = createContext();
writeRecords(context.inputDirectory, 0, 20);

// 先启动流作业并完成一次正常 checkpoint,确保 coordinator 已经完成一次提交。

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

English comment

@ifndef-SleePy ifndef-SleePy left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for contributing. I went through this revision roughly. There are several architectural design issues.

1. Writer checkpoint processing is still coupled with coordinator response

The writer still waits synchronously for the coordinator during checkpoint processing. This puts JM-side latency back on the writer checkpoint path and can become a serious bottleneck for large-parallelism streaming jobs.

2. The failover protocol is too complex and still has coordinator recovery bugs

This PR introduces many extra protocol states to handle failover issues. I am not sure I fully understand all these behaviors, but I think it is hard to cover all corner cases when checkpoint completion/abort, writer task failover, and
JM/coordinator failover happen at the same time.

However, I am pretty sure the coordinator start() and resetToCheckpoint() lifecycle is not correct when global failover happens. I think it should be easy to reproduce the bug by running a streaming job with checkpointing enabled and killing
the JM after some checkpoints have completed successfully. The OperatorCoordinator failover scenario is much more complex than the assumption made in this PR.

3. Coordinator state does not include CommitListener state

The coordinator checkpoint state only persists commitUser. Once commit runs in the coordinator, the CommitListener state also belongs to the coordinator side, but it is not persisted in the coordinator state.

new SerializedValue<CoordinationRequest>(request);
FileInfoReceivedResponse response =
CoordinationResponseUtils.unwrap(
gateway.sendRequestToCoordinator(operatorId, serializedRequest).get());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a synchronous call on the writer side. The writer thread will block here until the coordinator processes the request and returns the ACK.
In high-parallelism jobs, this can become a serious performance bottleneck because checkpoint processing now depends on a synchronous JM-side request/response path.

freshInstance = false;
LOG.info("PWC snapshot commitUser={}, checkpointId={}", commitUser, checkpointId);
checkState(commitUser != null, "PWC has not been started.");
result.complete(serializeCoordinatorState(commitUser));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The coordinator checkpoint state only contains commitUser. Any committer/listener state owned by the coordinator-side committer is not persisted here.

public void resetToCheckpoint(long checkpointId, byte[] bytes) throws Exception {
LOG.info("PWC resetToCheckpoint: checkpointId={}, fresh={}", checkpointId, freshInstance);
if (freshInstance && checkpointId >= 0) {
checkState(!started, "PWC can only be restored before it is started.");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not cover the lifecycle where resetToCheckpoint() is invoked after start().

@ifndef-SleePy

Copy link
Copy Markdown
Contributor

If you don't mind, I will open the second PR under #8220.

You can compare it with this PR and check whether the concerns I raised above are valid.

Also, I think it would be helpful to communicate through the mailing list, GitHub issue, or any other convenient place before coding. That should help reduce duplicated work.

long maxWatermark() {
long watermark = Long.MIN_VALUE;
for (SubtaskFileInfo fileInfo : fileInfos.values()) {
watermark = Math.max(watermark, fileInfo.request().watermark());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should preserve Flink input-watermark semantics and aggregate the writer watermarks with min, not max. The previous single committer operator only advanced its input watermark after all upstream channels advanced, so a checkpoint with writer watermarks [1000, 0] would commit watermark 0. With max here the coordinator can commit watermark 1000, which may advance snapshot watermark / partition-mark-done too early while slower writers can still produce older partitions.


private void removePendingSubtask(int subtask) {
for (PendingCheckpoint checkpoint : checkpoints.values()) {
if (!checkpoint.staged()) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Staged checkpoints from this attempt still need to be invalidated if the attempt fails before the checkpoint completes. The coordinator stages file info as soon as all requests arrive, but the Flink checkpoint can still abort afterwards. With parallelism 1, subtask 0 can send CK1 and have CK1 marked staged, then fail before CK1 completes; removePendingSubtask leaves the staged CK1 files here. The replacement attempt restores from the previous completed checkpoint and replays the same records into CK2, and completing CK2 then commits both the stale CK1 files from the failed attempt and the replayed CK2 files. Please drop staged data produced by failed attempts until the corresponding checkpoint has completed, or reset the coordinator state for this failure path.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants