Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -494,12 +494,10 @@ private void setAssignmentChanges(PartitionChangeRecord record) {

private void maybeUpdateLastKnownLeader(PartitionChangeRecord record) {
if (!useLastKnownLeaderInBalancedRecovery || !eligibleLeaderReplicasEnabled) return;
if (record.isr() != null && record.isr().isEmpty() && (partition.lastKnownElr.length != 1 ||
partition.lastKnownElr[0] != partition.leader)) {
if (record.leader() == NO_LEADER && partition.lastKnownElr.length == 0) {
// Only update the last known leader when the first time the partition becomes leaderless.
record.setLastKnownElr(List.of(partition.leader));
} else if ((record.leader() >= 0 || (partition.leader != NO_LEADER && record.leader() != NO_LEADER))
&& partition.lastKnownElr.length > 0) {
} else if (record.leader() >= 0 && partition.lastKnownElr.length > 0) {
// Clear the LastKnownElr field if the partition will have or continues to have a valid leader.
record.setLastKnownElr(List.of());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,48 @@ public void testEligibleLeaderReplicas_IsrShrinkBelowMinISR(short version) {
}
}

@Test
public void testEligibleLeaderReplicas_lastKnownElrShouldBePopulatedWhenNoLeader() {
PartitionRegistration partition = new PartitionRegistration.Builder()
.setReplicas(new int[] {1, 2, 3})
.setDirectories(new Uuid[] {
DirectoryId.UNASSIGNED,
DirectoryId.UNASSIGNED,
DirectoryId.UNASSIGNED
})
.setIsr(new int[] {1})
.setElr(new int[] {2})
.setLeader(1)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
.setLeaderEpoch(100)
.setPartitionEpoch(200)
.build();

short version = 2; // ELR supported
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");

// No replica is acceptable as leader, so election yields NO_LEADER.
// We intentionally do not change target ISR so record.isr remains null.
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> false,
metadataVersionForPartitionChangeRecordVersion(version), 3)
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(isElrEnabled(version))
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(true);

ApiMessageAndVersion change = builder.build().get();
PartitionChangeRecord record = (PartitionChangeRecord) change.message();

assertEquals(NO_LEADER, record.leader());
// There is no ISR update if we do not perform the leader verification on the ISR members.
assertNull(record.isr(), record.toString());
assertEquals(1, record.lastKnownElr().size(), record.toString());
assertEquals(1, record.lastKnownElr().get(0), record.toString());
partition = partition.merge((PartitionChangeRecord) builder.build().get().message());
assertArrayEquals(new int[] {1}, partition.lastKnownElr);
}


@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testEligibleLeaderReplicas_IsrExpandAboveMinISR(short version) {
Expand Down