[Feature] PIP-30: Run commit on the JobManager for unaware-bucket append tables#8401
[Feature] PIP-30: Run commit on the JobManager for unaware-bucket append tables#8401ifndef-SleePy wants to merge 6 commits into
Conversation
… commit on the JobManager for unaware-bucket append tables
…nk.coordinator-commit.enabled
| + state)); | ||
| return; | ||
| } | ||
| committer.snapshotState(); |
There was a problem hiding this comment.
If committer.snapshotState() or the state serialization below throws, runInEventLoop catches the exception and only calls context.failJob(t), leaving the CompletableFuture passed to checkpointCoordinator incomplete. The Flink OperatorCoordinator contract requires this future to be completed either successfully or exceptionally; otherwise the coordinator checkpoint can hang until the checkpoint timeout instead of being aborted immediately. Please complete result exceptionally on any error in this checkpoint path, for example by using a checkpoint-specific executor wrapper that has access to the future.
There was a problem hiding this comment.
Nice catch, fixed.
| runInEventLoop( | ||
| () -> { | ||
| if (event instanceof CommittableEvent) { | ||
| handleCommittableEvent(subtask, (CommittableEvent) event); |
There was a problem hiding this comment.
handleEventFromOperator receives the attemptNumber, but the coordinator never fences it: executionAttemptReady / executionAttemptFailed are no-ops and events are merged only by subtask. If an event from a failed/stale attempt is delivered after a region failover, the coordinator can accept its committables as if they came from the current writer attempt. Depending on checkpoint ids this may either fail the job as a duplicate/older checkpoint, or worse, merge stale committables into a later successful checkpoint and commit data from an invalid attempt. I think it should add valid attempt per subtask to avoid the “The PaimonWriterCoordinator receives the File Info of the failed task” problem described in the https://docs.google.com/document/d/1asWKzoytfeB1D8bS_yRIAHnpR40frLf0fnPn2-WSL74/edit?tab=t.0
There was a problem hiding this comment.
Thanks for reviewing. That's really a good point. This was actually considered during the initial design; after carefully reviewing Flink's OperatorCoordinator implementation, I simplified this part. This can be understood as Flink doing the fencing: in DefaultOperatorCoordinatorHandler.deliverOperatorEventToCoordinator , events from non- RUNNING executions are filtered out, so the coordinator will not receive events from a failed task. The only case is that an event is received first and then the coordinator is notified that the attempt has failed, so resetting in the subtaskReset method is enough.
| // writers always report a committable per (subtask, checkpoint) during | ||
| // snapshot, even if empty; missing means the writer is broken | ||
| if (!alignCommittables(checkpointId)) { | ||
| throw new IllegalStateException("Not all committables reported by writer"); |
There was a problem hiding this comment.
The writer and coordinator process committables asynchronously. It is possible for a writer to finish its checkpoint while the coordinator has not yet collected all committable/file info events, for example due to network delay or coordinator event-loop lag. In such cases, alignCommittables failure may only mean delayed event processing rather than an unrecoverable error; the missing file info may still arrive later if the writer snapshot has succeeded.
The design doc currently describes the Writer-Coordinator FileInfo handoff as synchronous. I agree that the async design can reduce blocking in the writer checkpoint path, but then the design doc should be updated to describe the actual async protocol.
There was a problem hiding this comment.
It is impossible for a writer to finish its checkpoint while the coordinator has not yet collected all committable/file info events, because Flink RPC guarantees FIFO ordering between messages from the same sender to the same RpcEndpoint . Both sendOperatorEventToCoordinator and acknowledgeCheckpoint land on the same JobMasterGateway mailbox (it extends both JobMasterOperatorEventGateway and CheckpointCoordinatorGateway), and the writer issues them serially on the task thread — the event first (in prepareSnapshotPreBarrier) and the ACK last — so the coordinator always sees the event before the checkpoint is considered complete.
And you are right about the second point — the design doc does describe the handoff as synchronous, which is inaccurate. I missed that part of the original design doc and will correct it. Actually, if the task fails to send the event to the coordinator, Flink itself will trigger a failover — that's part of the RPC's own behavior, not something we handle explicitly.
There was a problem hiding this comment.
Thanks for pointing it out, I've fixed the doc: https://docs.google.com/document/d/1asWKzoytfeB1D8bS_yRIAHnpR40frLf0fnPn2-WSL74/edit?tab=t.0
Purpose
This PR is the core part of [PIP-30 (#8220)]: enabling Paimon's commit step to run on the Flink JobManager as an
OperatorCoordinatorinstead of in a downstream committer operator, so that each writer subtask becomes a self-contained failover region and a single TM failure no longer restarts the whole pipeline.The scope is intentionally limited to the main functionality — coordinator-based commit for streaming unaware-bucket append tables — gated by a new option
sink.coordinator-commit.enabled(defaultfalse). Secondary features are split into follow-up PRs to keep this one reviewable.Built on top of the
Committer.Contextrefactor merged as PR #8221.Key design
Why a separate coordinator (not the existing
WriteOperatorCoordinator)Two reasons:
No functional overlap with
WriteOperatorCoordinator. The existingWriteOperatorCoordinatorserves write-restore / manifest-scan requests for tables whose writers need to recover their in-memory file view on restart — primary-key tables and fixed-bucket append tables. Unaware-bucket append tables do not go through that path at all (AppendFileStoreWriteforcesignorePreviousFiles = true, so the manifest-scan code is dead for them). This PR only addresses region failover for unaware-bucket append tables, so the two coordinators have no shared state and no shared channel; merging them would only entangle two independent features.Stateful
OperatorCoordinatorrecovery is hard. A coordinator that owns durable state has to deal with many combinations ofstart/resetToCheckpoint/subtaskReset/ failover ordering, plus the JM event-loop threading model. Implementing this from scratch produces enough corner cases to be a real burden on both reviewers and future maintainers. Following the FLIP-27 source coordinator design (and the in-treeDataStatisticsCoordinatorprecedent), the implementation was refactored on the way back to the community to delegate this complexity toRecreateOnResetOperatorCoordinator: on every reset the framework throws away the inner coordinator and constructs a brand-new one, collapsing the reset/start state machine to a single trivial branch.RecreateOnResetOperatorCoordinatoris incompatible withCoordinationRequestHandler(it does not forwardhandleCoordinationRequest), so reusingWriteOperatorCoordinator— which implementsCoordinationRequestHandlerfor the manifest-scan channel — would forfeit this simplification.Coordinator-side state via an in-memory state backend
OperatorCoordinatoronly persists abyte[]per checkpoint, whereas the existing committer-side code is written against Flink's keyed/operator state interfaces (ListState,BroadcastState, etc.). To avoid forking the committer code, this PR introducesMemoryBackendStateStore, an in-memory implementation of Paimon'sStateStoreabstraction (introduced in PR #8221) backed byHashMap-backed list/broadcast states. On checkpoint, all states are serialized into a singleCoordinatorStatebyte array; on restore, the bytes are deserialized back into the in-memory backend before the committer is reconstructed. The committer itself sees the sameStateStoreAPI as in the operator path.How it works
OperatorEventchannel duringsnapshotState.notifyCheckpointComplete(cp)triggers an asynchronous commit for all subtask committables aligned atcp.initializeState; the coordinator waits inRESTORING, aligns them, recommits the aligned checkpoint, and then intentionally fails the job once to force a clean restart (failoverAfterRecovery).FlinkSink#doCommitearly-returns into the coordinator-commit branch only when the option is on and a strict set of preconditions hold (streaming + checkpointing, append-only, unaware-bucket,write-only=true,precommit-compact=false, no auto-tag,max-concurrent-checkpoints=1). Any violation is rejected at sink build time.Out of scope (follow-up PRs)
Tests
CommittingWriteOperatorCoordinatorTest— coordinator-side unit tests (snapshot/restore, region failover alignment, partial failover, precondition state machine).CommittableEventTest/WriterCommittablesTest— unit tests for the committable event payload and the per-subtask buffer used inside the coordinator.CoordinatorCommittingRowDataStoreWriteOperatorTest— writer-side unit tests.FlinkSinkTest— precondition checks for the option.CoordinatorCommitITCase— end-to-end streaming IT.