Skip to content

fix(kafka): prevent data loss on cooperative rebalance in unordered reader#4009

Open
TayPark wants to merge 2 commits intoredpanda-data:mainfrom
TayPark:fix/unordered-reader-rebalance-dataloss
Open

fix(kafka): prevent data loss on cooperative rebalance in unordered reader#4009
TayPark wants to merge 2 commits intoredpanda-data:mainfrom
TayPark:fix/unordered-reader-rebalance-dataloss

Conversation

@TayPark
Copy link

@TayPark TayPark commented Feb 23, 2026

Fixes #4010


We've been running Vector as our Kafka-to-S3 sink connector, but kept hitting two reliability problems: S3 uploads failing when IRSA tokens expired (Vector doesn't refresh tokens gracefully — it drops the batch), and OOM crashes from the adaptive concurrency controller under bursty load. We started a PoC with Redpanda Connect to replace it.

  Kafka ───► Redpanda Connect (unordered_processing) ───► S3
              per-partition input batcher (50MB / 1min)
              deterministic path: topic+partition+offset.jsonl.zst

Our pipeline uses input batching with a deterministic S3 path so that reprocessed data overwrites the same file — at-least-once with idempotent writes:

input:
  redpanda:
    consumer_group: my-group
    unordered_processing:
      enabled: true
      checkpoint_limit: 1
      batching:
        byte_size: 52428800  # 50 MiB per partition
        period: "1m"

output:
  aws_s3:
    path: '${!meta("kafka_topic")}+${!meta("kafka_partition")}+${!meta("kafka_offset")}.jsonl.zst'
    max_in_flight: 10

We tested rebalancing by triggering HPA scale-ups. Redpanda Connect generally handled it better than Vector, but during one cooperative-sticky rebalance we saw that 7 revoked partitions each lost exactly one batcher window of messages (10,433 total). We traced through the source and found the root cause.

The bug

 Timeline          Consumer A                        Kafka broker
 ───────────────────────────────────────────────────────────────────

 T0  normal        sendBatch()
                     Track(r)
                     batchChan ← batch
                     pipeline processing...

 T1  rebalance     OnPartitionsRevoked:
                     CommitMarkedOffsets()    ← in-flight batch NOT marked
                     removeTopicPartitions()
                       close() → SoftStop    ← batcher discards buffered msgs
                       delete from map
                     return                  ───► partition assigned to Consumer B

 T2  pipeline      onAck() → commitFn()
     completes       MarkCommitRecords(r)    ← no ownership check in franz-go
                                                re-inserts into g.uncommitted

 T3  auto-commit   commits offset           ───► broker accepts
                                                  Consumer B starts past the gap
                                                  data LOST

Two things go wrong:

  1. MarkCommitRecords after revoke: franz-go doesn't validate partition ownership in MarkCommitRecords. After revoke() deletes the partition from g.uncommitted, a late call re-inserts it, and the next auto-commit cycle commits it.

  2. Batcher silent discard: loop() returns immediately on SoftStop (L277) without flushing buffered messages. One batcher window of data is discarded.

The fix

Block MarkCommitRecords for revoked partitions. checkpointTracker gets a revoked set. removeTopicPartitions marks partitions as revoked before deleting. commitFn checks the set and skips MarkCommitRecords for revoked partitions. OnPartitionsAssigned clears the set on re-assignment.

Flush batcher on SoftStop. When the partition tracker's loop receives SoftStop, it flushes remaining buffered messages into the pipeline before returning.

 With fix:

 T1  OnPartitionsRevoked:
       SoftStop → batcher.Flush() → sendBatch()    ← buffered msgs delivered
       revoked[partition] = marked

 T2  onAck() → commitFn()
       isRevoked() == true → skip MarkCommitRecords ← offset NOT advanced

 T3  Consumer B starts at last committed offset
       reprocesses same range → same S3 path        ← idempotent, no loss

Test plan

  • TestCheckpointTracker_RevokedPartitionBlocksCommit — revoke partition, ack in-flight batch, verify commitFn skipped
  • TestCheckpointTracker_ClearRevokedOnReassign — revoke then re-assign, verify revoked state cleared
  • TestCheckpointTracker_SoftStopFlushesBatcher — buffer messages below threshold, SoftStop, verify all flushed
  • Existing TestPartitionCacheOrdering / TestPartitionCacheBatching pass (no regression)

…eader

Block MarkCommitRecords for revoked partitions and flush batcher on
SoftStop to prevent offset advancement without delivery guarantee.

When a cooperative-sticky rebalance revokes partitions while batches are
in-flight, onAck() could call MarkCommitRecords on already-revoked
partitions. franz-go unconditionally re-inserts these into g.uncommitted,
and the next auto-commit advances the offset past undelivered data.

Changes:
- Add revoked partition tracking to checkpointTracker
- Skip MarkCommitRecords for revoked partitions in commitFn
- Clear revoked state on partition re-assignment (OnPartitionsAssigned)
- Flush batcher contents on SoftStop instead of discarding

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@CLAassistant
Copy link

CLAassistant commented Feb 23, 2026

CLA assistant check
All committers have signed the CLA.

@mmatczuk
Copy link
Contributor

Code review

Found 2 issues:

  1. SoftStop flush uses context.Background() — can block indefinitely (bugs: bounded blocking)

    The flush added in the SoftStop handler calls both p.batcher.Flush(context.Background()) and p.sendBatch(context.Background(), ...). sendBatch pushes to the unbuffered outBatchChan. With context.Background(), this has no deadline and blocks forever if nothing reads from the channel.

    Since close() waits on HasStoppedChan() (which fires when loop() returns), and loop() can't return while sendBatch is blocked, this can stall the OnPartitionsRevoked callback and the entire rebalance. The close(ctx) escape hatch lets the caller eventually proceed if its context expires, but the goroutine leaks.

    Consider threading the ctx from close() into the SoftStop handler (e.g., via a stored context or channel), or using a derived context with a timeout.

    if p.batcher != nil {
    p.batcherLock.Lock()
    if batch, _ := p.batcher.Flush(context.Background()); len(batch) > 0 {
    record := p.topBatchRecord
    p.topBatchRecord = nil
    p.batcherLock.Unlock()
    _ = p.sendBatch(context.Background(), batch, record)
    } else {
    p.batcherLock.Unlock()
    }
    }

  2. Commit message format (commit policy: "system: message" or "system(subsystem): message")

    The commit fix(kafka): prevent data loss on cooperative rebalance in unordered reader uses conventional-commits style (type(scope): message). The repo convention is system: message or system(subsystem): message. Expected: kafka: prevent data loss on cooperative rebalance in unordered reader or kafka(unordered-reader): prevent data loss on cooperative rebalance.

Areas for human review

The following areas had observations where the automated review was not confident enough to flag as issues. A human reviewer should verify these.

  1. commitFn field mutated after construction without synchronization — The field is written on L548 after newCheckpointTracker returns on L546, then read later from ack callbacks on arbitrary goroutines. The write happens before any goroutine that reads it starts, so it's benign in practice, but the Go memory model doesn't strictly guarantee visibility without explicit synchronization. The race detector may flag it.

    checkpoints := newCheckpointTracker(f.res, batchChan, func(*kgo.Record) {}, f.batchPolicy)
    if f.consumerGroup != "" {
    checkpoints.commitFn = func(r *kgo.Record) {
    if cl == nil {
    return
    }
    if checkpoints.isRevoked(r.Topic, r.Partition) {
    return
    }
    cl.MarkCommitRecords(r)
    }
    }

  2. revoked map grows monotonically for partitions reassigned to other consumersclearRevoked is called from OnPartitionsAssigned and only clears partitions assigned to this consumer. In cooperative-sticky rebalance, revoked partitions typically go to other consumers, so those entries are never cleared. Each entry is small (string key + int32 + empty struct), so memory impact is negligible, but the map never shrinks over the lifetime of the consumer.

    if c.revoked[topicName] == nil {
    c.revoked[topicName] = map[int32]struct{}{}
    }
    c.revoked[topicName][lostPartition] = struct{}{}

Generated with Claude Code

Pass the close context from partitionTracker.close() to the SoftStop
handler in loop(), replacing context.Background(). This prevents
infinite blocking on outBatchChan when the reader is gone during
cooperative rebalance.

Also fix unused parameter lint warnings in tests.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@TayPark
Copy link
Author

TayPark commented Mar 12, 2026

Hi @mmatczuk, thank you for the thorough review.

I've addressed both issues in the latest commit:

  1. Bounded context for SoftStop flush: partitionTracker now receives the close context from close(ctx) and uses it in the SoftStop handler instead of context.Background(). The happens-before guarantee is provided by the channel close in TriggerSoftStop().
  2. Lint fix: Renamed unused parameter r to _ in test callbacks.

Could you approve the CI workflow run when you get a chance? Looking forward to your feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Unordered kafka reader: data loss on cooperative rebalance (MarkCommitRecords after revoke)

3 participants