Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1057,8 +1057,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still
// ongoing. If the transaction is expected to be ongoing, we will not set a VerificationGuard. If the transaction is aborted, hasOngoingTransaction is false and
// requestVerificationGuard is the sentinel, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
if (batch.isTransactional && !hasOngoingTransaction(batch.producerId, batch.producerEpoch()) && batchMissingRequiredVerification(batch, requestVerificationGuard))
throw new InvalidTxnStateException("Record was not part of an ongoing transaction")
if (batch.isTransactional && !hasOngoingTransaction(batch.producerId, batch.producerEpoch)) {
// Check epoch first: if producer epoch is stale, throw recoverable InvalidProducerEpochException.
val entry = producerStateManager.activeProducers.get(batch.producerId)
if (entry != null && batch.producerEpoch < entry.producerEpoch) {
val message = "Epoch of producer " + batch.producerId + " is " + batch.producerEpoch + ", which is smaller than the last seen epoch " + entry.producerEpoch
throw new Nothing(message)
}
// Only check verification if epoch is current
if (batchMissingRequiredVerification(batch, requestVerificationGuard)) throw new Nothing("Record was not part of an ongoing transaction")
}
}

// We cache offset metadata for the start of each transaction. This allows us to
Expand Down
89 changes: 89 additions & 0 deletions core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4735,6 +4735,95 @@ class UnifiedLogTest {
}

(log, segmentWithOverflow)

@Test
def testStaleProducerEpochReturnsRecoverableErrorForTV1Clients(): Unit = {
// Producer epoch gets incremented (coordinator fail over, completed transaction, etc.)
// and client has stale cached epoch. Fix prevents fatal InvalidTxnStateException.

val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true)
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)

val producerId = 123L
val oldEpoch = 5.toShort
val newEpoch = 6.toShort

// Step 1: Simulate a scenario where producer epoch was incremented to fence the producer
val previousRecords = MemoryRecords.withTransactionalRecords(
Compression.NONE, producerId, newEpoch, 0,
new SimpleRecord("previous-key".getBytes, "previous-value".getBytes)
)
val previousGuard = log.maybeStartTransactionVerification(producerId, 0, newEpoch, false) // TV1 = supportsEpochBump = false
log.appendAsLeader(previousRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching, previousGuard)

// Complete the transaction normally (commits do update producer state with current epoch)
val commitMarker = MemoryRecords.withEndTransactionMarker(
producerId, newEpoch, new EndTransactionMarker(ControlRecordType.COMMIT, 0)
)
log.appendAsLeader(commitMarker, 0, AppendOrigin.COORDINATOR, RequestLocal.noCaching, VerificationGuard.SENTINEL)

// Step 2: TV1 client tries to write with stale cached epoch (before learning about epoch increment)
val staleEpochRecords = MemoryRecords.withTransactionalRecords(
Compression.NONE, producerId, oldEpoch, 0,
new SimpleRecord("stale-epoch-key".getBytes, "stale-epoch-value".getBytes)
)

// Step 3: Verify our fix - should get InvalidProducerEpochException (recoverable), not InvalidTxnStateException (fatal)
val exception = assertThrows(classOf[InvalidProducerEpochException], () => {
val staleGuard = log.maybeStartTransactionVerification(producerId, 0, oldEpoch, false)
log.appendAsLeader(staleEpochRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching, staleGuard)
})

// Verify the error message indicates epoch mismatch
assertTrue(exception.getMessage.contains("smaller than the last seen epoch"))
assertTrue(exception.getMessage.contains(s"$oldEpoch"))
assertTrue(exception.getMessage.contains(s"$newEpoch"))
}

@Test
def testStaleProducerEpochReturnsRecoverableErrorForTV2Clients(): Unit = {
// Check producer epoch FIRST - if stale, return recoverable error before verification checks.

val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true)
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)

val producerId = 456L
val originalEpoch = 3.toShort
val bumpedEpoch = 4.toShort

// Step 1: Start transaction with epoch 3 (before timeout)
val initialRecords = MemoryRecords.withTransactionalRecords(
Compression.NONE, producerId, originalEpoch, 0,
new SimpleRecord("ks-initial-key".getBytes, "ks-initial-value".getBytes)
)
val initialGuard = log.maybeStartTransactionVerification(producerId, 0, originalEpoch, true) // TV2 = supportsEpochBump = true
log.appendAsLeader(initialRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching, initialGuard)

// Step 2: Coordinator times out and aborts transaction
// TV2 (KIP-890): Coordinator bumps epoch from 3 → 4 and sends abort marker with epoch 4
val abortMarker = MemoryRecords.withEndTransactionMarker(
producerId, bumpedEpoch, new EndTransactionMarker(ControlRecordType.ABORT, 0)
)
log.appendAsLeader(abortMarker, 0, AppendOrigin.COORDINATOR, RequestLocal.noCaching, VerificationGuard.SENTINEL)

// Step 3: TV2 transactional producer tries to append with stale epoch (timeout recovery scenario)
val staleEpochRecords = MemoryRecords.withTransactionalRecords(
Compression.NONE, producerId, originalEpoch, 0,
new SimpleRecord("ks-resume-key".getBytes, "ks-resume-value".getBytes)
)

// Step 4: Verify our fix works for TV2 - should get InvalidProducerEpochException (recoverable), not InvalidTxnStateException (fatal)
val exception = assertThrows(classOf[InvalidProducerEpochException], () => {
val staleGuard = log.maybeStartTransactionVerification(producerId, 0, originalEpoch, true) // TV2 = supportsEpochBump = true
log.appendAsLeader(staleEpochRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching, staleGuard)
})

// Verify the error message indicates epoch mismatch (3 < 4)
assertTrue(exception.getMessage.contains("smaller than the last seen epoch"))
assertTrue(exception.getMessage.contains(s"$originalEpoch"))
assertTrue(exception.getMessage.contains(s"$bumpedEpoch"))
}
}

Expand Down
Loading