fix(kafka): prevent data loss on cooperative rebalance in unordered reader#4009
fix(kafka): prevent data loss on cooperative rebalance in unordered reader#4009TayPark wants to merge 2 commits intoredpanda-data:mainfrom
Conversation
…eader Block MarkCommitRecords for revoked partitions and flush batcher on SoftStop to prevent offset advancement without delivery guarantee. When a cooperative-sticky rebalance revokes partitions while batches are in-flight, onAck() could call MarkCommitRecords on already-revoked partitions. franz-go unconditionally re-inserts these into g.uncommitted, and the next auto-commit advances the offset past undelivered data. Changes: - Add revoked partition tracking to checkpointTracker - Skip MarkCommitRecords for revoked partitions in commitFn - Clear revoked state on partition re-assignment (OnPartitionsAssigned) - Flush batcher contents on SoftStop instead of discarding Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Code reviewFound 2 issues:
Areas for human reviewThe following areas had observations where the automated review was not confident enough to flag as issues. A human reviewer should verify these.
Generated with Claude Code |
Pass the close context from partitionTracker.close() to the SoftStop handler in loop(), replacing context.Background(). This prevents infinite blocking on outBatchChan when the reader is gone during cooperative rebalance. Also fix unused parameter lint warnings in tests. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Hi @mmatczuk, thank you for the thorough review. I've addressed both issues in the latest commit:
Could you approve the CI workflow run when you get a chance? Looking forward to your feedback. |
Fixes #4010
We've been running Vector as our Kafka-to-S3 sink connector, but kept hitting two reliability problems: S3 uploads failing when IRSA tokens expired (Vector doesn't refresh tokens gracefully — it drops the batch), and OOM crashes from the adaptive concurrency controller under bursty load. We started a PoC with Redpanda Connect to replace it.
Our pipeline uses input batching with a deterministic S3 path so that reprocessed data overwrites the same file — at-least-once with idempotent writes:
We tested rebalancing by triggering HPA scale-ups. Redpanda Connect generally handled it better than Vector, but during one cooperative-sticky rebalance we saw that 7 revoked partitions each lost exactly one batcher window of messages (10,433 total). We traced through the source and found the root cause.
The bug
Two things go wrong:
MarkCommitRecordsafter revoke: franz-go doesn't validate partition ownership inMarkCommitRecords. Afterrevoke()deletes the partition fromg.uncommitted, a late call re-inserts it, and the next auto-commit cycle commits it.Batcher silent discard:
loop()returns immediately on SoftStop (L277) without flushing buffered messages. One batcher window of data is discarded.The fix
Block MarkCommitRecords for revoked partitions.
checkpointTrackergets arevokedset.removeTopicPartitionsmarks partitions as revoked before deleting.commitFnchecks the set and skipsMarkCommitRecordsfor revoked partitions.OnPartitionsAssignedclears the set on re-assignment.Flush batcher on SoftStop. When the partition tracker's loop receives SoftStop, it flushes remaining buffered messages into the pipeline before returning.
Test plan
TestCheckpointTracker_RevokedPartitionBlocksCommit— revoke partition, ack in-flight batch, verify commitFn skippedTestCheckpointTracker_ClearRevokedOnReassign— revoke then re-assign, verify revoked state clearedTestCheckpointTracker_SoftStopFlushesBatcher— buffer messages below threshold, SoftStop, verify all flushedTestPartitionCacheOrdering/TestPartitionCacheBatchingpass (no regression)