Skip to content

[Feature] PIP-30: Run commit on the JobManager for unaware-bucket append tables#8401

Open
ifndef-SleePy wants to merge 6 commits into
apache:masterfrom
ifndef-SleePy:coordinator-commit-pr2-write-coordinator
Open

[Feature] PIP-30: Run commit on the JobManager for unaware-bucket append tables#8401
ifndef-SleePy wants to merge 6 commits into
apache:masterfrom
ifndef-SleePy:coordinator-commit-pr2-write-coordinator

Conversation

@ifndef-SleePy

Copy link
Copy Markdown
Contributor

Purpose

This PR is the core part of [PIP-30 (#8220)]: enabling Paimon's commit step to run on the Flink JobManager as an OperatorCoordinator instead of in a downstream committer operator, so that each writer subtask becomes a self-contained failover region and a single TM failure no longer restarts the whole pipeline.

The scope is intentionally limited to the main functionality — coordinator-based commit for streaming unaware-bucket append tables — gated by a new option sink.coordinator-commit.enabled (default false). Secondary features are split into follow-up PRs to keep this one reviewable.

Built on top of the Committer.Context refactor merged as PR #8221.

Key design

Why a separate coordinator (not the existing WriteOperatorCoordinator)

Two reasons:

  1. No functional overlap with WriteOperatorCoordinator. The existing WriteOperatorCoordinator serves write-restore / manifest-scan requests for tables whose writers need to recover their in-memory file view on restart — primary-key tables and fixed-bucket append tables. Unaware-bucket append tables do not go through that path at all (AppendFileStoreWrite forces ignorePreviousFiles = true, so the manifest-scan code is dead for them). This PR only addresses region failover for unaware-bucket append tables, so the two coordinators have no shared state and no shared channel; merging them would only entangle two independent features.

  2. Stateful OperatorCoordinator recovery is hard. A coordinator that owns durable state has to deal with many combinations of start / resetToCheckpoint / subtaskReset / failover ordering, plus the JM event-loop threading model. Implementing this from scratch produces enough corner cases to be a real burden on both reviewers and future maintainers. Following the FLIP-27 source coordinator design (and the in-tree DataStatisticsCoordinator precedent), the implementation was refactored on the way back to the community to delegate this complexity to RecreateOnResetOperatorCoordinator: on every reset the framework throws away the inner coordinator and constructs a brand-new one, collapsing the reset/start state machine to a single trivial branch. RecreateOnResetOperatorCoordinator is incompatible with CoordinationRequestHandler (it does not forward handleCoordinationRequest), so reusing WriteOperatorCoordinator — which implements CoordinationRequestHandler for the manifest-scan channel — would forfeit this simplification.

Coordinator-side state via an in-memory state backend

OperatorCoordinator only persists a byte[] per checkpoint, whereas the existing committer-side code is written against Flink's keyed/operator state interfaces (ListState, BroadcastState, etc.). To avoid forking the committer code, this PR introduces MemoryBackendStateStore, an in-memory implementation of Paimon's StateStore abstraction (introduced in PR #8221) backed by HashMap-backed list/broadcast states. On checkpoint, all states are serialized into a single CoordinatorState byte array; on restore, the bytes are deserialized back into the in-memory backend before the committer is reconstructed. The committer itself sees the same StateStore API as in the operator path.

How it works

  • Writer subtasks buffer per-checkpoint committables in an independent operator state and forward them to the coordinator over the OperatorEvent channel during snapshotState.
  • The coordinator runs everything inside a single-thread event loop; notifyCheckpointComplete(cp) triggers an asynchronous commit for all subtask committables aligned at cp.
  • The coordinator does not persist uncommitted committables. On global failover, writers replay their pending committables from their own state on initializeState; the coordinator waits in RESTORING, aligns them, recommits the aligned checkpoint, and then intentionally fails the job once to force a clean restart (failoverAfterRecovery).
  • The default committer-operator path is untouched. FlinkSink#doCommit early-returns into the coordinator-commit branch only when the option is on and a strict set of preconditions hold (streaming + checkpointing, append-only, unaware-bucket, write-only=true, precommit-compact=false, no auto-tag, max-concurrent-checkpoints=1). Any violation is rejected at sink build time.

Out of scope (follow-up PRs)

  • watermark handling
  • end-of-input handling
  • failover IT case
  • auto-tag-for-savepoint on the coordinator path
  • concurrent checkpoints support

Tests

  • CommittingWriteOperatorCoordinatorTest — coordinator-side unit tests (snapshot/restore, region failover alignment, partial failover, precondition state machine).
  • CommittableEventTest / WriterCommittablesTest — unit tests for the committable event payload and the per-subtask buffer used inside the coordinator.
  • CoordinatorCommittingRowDataStoreWriteOperatorTest — writer-side unit tests.
  • FlinkSinkTest — precondition checks for the option.
  • CoordinatorCommitITCase — end-to-end streaming IT.

+ state));
return;
}
committer.snapshotState();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If committer.snapshotState() or the state serialization below throws, runInEventLoop catches the exception and only calls context.failJob(t), leaving the CompletableFuture passed to checkpointCoordinator incomplete. The Flink OperatorCoordinator contract requires this future to be completed either successfully or exceptionally; otherwise the coordinator checkpoint can hang until the checkpoint timeout instead of being aborted immediately. Please complete result exceptionally on any error in this checkpoint path, for example by using a checkpoint-specific executor wrapper that has access to the future.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, fixed.

runInEventLoop(
() -> {
if (event instanceof CommittableEvent) {
handleCommittableEvent(subtask, (CommittableEvent) event);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleEventFromOperator receives the attemptNumber, but the coordinator never fences it: executionAttemptReady / executionAttemptFailed are no-ops and events are merged only by subtask. If an event from a failed/stale attempt is delivered after a region failover, the coordinator can accept its committables as if they came from the current writer attempt. Depending on checkpoint ids this may either fail the job as a duplicate/older checkpoint, or worse, merge stale committables into a later successful checkpoint and commit data from an invalid attempt. I think it should add valid attempt per subtask to avoid the “The PaimonWriterCoordinator receives the File Info of the failed task” problem described in the https://docs.google.com/document/d/1asWKzoytfeB1D8bS_yRIAHnpR40frLf0fnPn2-WSL74/edit?tab=t.0

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reviewing. That's really a good point. This was actually considered during the initial design; after carefully reviewing Flink's OperatorCoordinator implementation, I simplified this part. This can be understood as Flink doing the fencing: in DefaultOperatorCoordinatorHandler.deliverOperatorEventToCoordinator , events from non- RUNNING executions are filtered out, so the coordinator will not receive events from a failed task. The only case is that an event is received first and then the coordinator is notified that the attempt has failed, so resetting in the subtaskReset method is enough.

// writers always report a committable per (subtask, checkpoint) during
// snapshot, even if empty; missing means the writer is broken
if (!alignCommittables(checkpointId)) {
throw new IllegalStateException("Not all committables reported by writer");

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The writer and coordinator process committables asynchronously. It is possible for a writer to finish its checkpoint while the coordinator has not yet collected all committable/file info events, for example due to network delay or coordinator event-loop lag. In such cases, alignCommittables failure may only mean delayed event processing rather than an unrecoverable error; the missing file info may still arrive later if the writer snapshot has succeeded.
The design doc currently describes the Writer-Coordinator FileInfo handoff as synchronous. I agree that the async design can reduce blocking in the writer checkpoint path, but then the design doc should be updated to describe the actual async protocol.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is impossible for a writer to finish its checkpoint while the coordinator has not yet collected all committable/file info events, because Flink RPC guarantees FIFO ordering between messages from the same sender to the same RpcEndpoint . Both sendOperatorEventToCoordinator and acknowledgeCheckpoint land on the same JobMasterGateway mailbox (it extends both JobMasterOperatorEventGateway and CheckpointCoordinatorGateway), and the writer issues them serially on the task thread — the event first (in prepareSnapshotPreBarrier) and the ACK last — so the coordinator always sees the event before the checkpoint is considered complete.

And you are right about the second point — the design doc does describe the handoff as synchronous, which is inaccurate. I missed that part of the original design doc and will correct it. Actually, if the task fails to send the event to the coordinator, Flink itself will trigger a failover — that's part of the RPC's own behavior, not something we handle explicitly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants