Skip to content

KAFKA-17026: Implement updateCacheAndOffsets functionality on LSO movement #16459

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<suppress checks="MethodLength"
files="(KafkaClusterTestKit).java"/>
<suppress checks="JavaNCSS"
files="RemoteLogManagerTest.java"/>
files="(RemoteLogManagerTest|SharePartitionTest).java"/>
<suppress checks="ClassDataAbstractionCoupling" files="SharePartitionManagerTest"/>

<!-- server tests -->
Expand Down
139 changes: 136 additions & 3 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,127 @@ private Optional<Throwable> releaseAcquiredRecordsForCompleteBatch(String member
* @param logStartOffset The new log start offset.
*/
void updateCacheAndOffsets(long logStartOffset) {
// TODO: Provide implementation to update cache and offsets on LSO movement.
lock.writeLock().lock();
try {
if (logStartOffset <= startOffset) {
log.error("The log start offset: {} is not greater than the start offset: {} for the share partition: {}-{}",
logStartOffset, startOffset, groupId, topicIdPartition);
return;
}
log.debug("Updating start offset for share partition: {}-{} from: {} to: {} since LSO has moved to: {}",
groupId, topicIdPartition, startOffset, logStartOffset, logStartOffset);
if (cachedState.isEmpty()) {
// If the cached state is empty, then the start and end offset will be the new log start offset.
// This can occur during the initialization of share partition if LSO has moved.
startOffset = logStartOffset;
endOffset = logStartOffset;
return;
}

// Archive the available records in the cached state that are before the new log start offset.
boolean anyRecordArchived = archiveAvailableRecordsOnLsoMovement(logStartOffset);
// If we have transitioned the state of any batch/offset from AVAILABLE to ARCHIVED,
// then there is a chance that the next fetch offset can change.
if (anyRecordArchived) {
findNextFetchOffset.set(true);
}

// The new startOffset will be the log start offset.
startOffset = logStartOffset;
if (endOffset < startOffset) {
// This case means that the cached state is completely fresh now.
// Example scenario - batch of 0-10 in acquired state in cached state, then LSO moves to 15,
// then endOffset should be 15 as well.
endOffset = startOffset;
}

// Note -
// 1. We will be writing the new starOffset lazily during acknowledge/release acquired records API call.
// 2. We will not be writing the archived state batches to the persister.
} finally {
lock.writeLock().unlock();
}
}

private boolean archiveAvailableRecordsOnLsoMovement(long logStartOffset) {
lock.writeLock().lock();
try {
boolean isAnyOffsetArchived = false, isAnyBatchArchived = false;
for (Map.Entry<Long, InFlightBatch> entry : cachedState.entrySet()) {
long batchStartOffset = entry.getKey();
// We do not need to transition state of batches/offsets that are later than the new log start offset.
if (batchStartOffset >= logStartOffset) {
break;
}
InFlightBatch inFlightBatch = entry.getValue();
boolean fullMatch = checkForFullMatch(inFlightBatch, startOffset, logStartOffset - 1);

// Maintain state per offset if the inflight batch is not a full match or the offset state is managed.
if (!fullMatch || inFlightBatch.offsetState() != null) {
log.debug("Subset or offset tracked batch record found while trying to update offsets and cached" +
" state map due to LSO movement, batch: {}, offsets to update - " +
"first: {}, last: {} for the share partition: {}-{}", inFlightBatch, startOffset,
logStartOffset - 1, groupId, topicIdPartition);

if (inFlightBatch.offsetState() == null) {
if (inFlightBatch.batchState() != RecordState.AVAILABLE) {
continue;
}
inFlightBatch.maybeInitializeOffsetStateUpdate();
}
isAnyOffsetArchived = isAnyOffsetArchived || archivePerOffsetBatchRecords(inFlightBatch, startOffset, logStartOffset - 1);
continue;
}
// The in-flight batch is a full match hence change the state of the complete batch.
isAnyBatchArchived = isAnyBatchArchived || archiveCompleteBatch(inFlightBatch);
}
return isAnyOffsetArchived || isAnyBatchArchived;
} finally {
lock.writeLock().unlock();
}
}

private boolean archivePerOffsetBatchRecords(InFlightBatch inFlightBatch,
long startOffsetToArchive,
long endOffsetToArchive) {
lock.writeLock().lock();
try {
boolean isAnyOffsetArchived = false;
log.trace("Archiving offset tracked batch: {} for the share partition: {}-{}", inFlightBatch, groupId, topicIdPartition);
for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState().entrySet()) {
if (offsetState.getKey() < startOffsetToArchive) {
continue;
}
if (offsetState.getKey() > endOffsetToArchive) {
// No further offsets to process.
break;
}
if (offsetState.getValue().state != RecordState.AVAILABLE) {
continue;
}

offsetState.getValue().archive(EMPTY_MEMBER_ID);
isAnyOffsetArchived = true;
}
return isAnyOffsetArchived;
} finally {
lock.writeLock().unlock();
}
}

private boolean archiveCompleteBatch(InFlightBatch inFlightBatch) {
lock.writeLock().lock();
try {
log.trace("Archiving complete batch: {} for the share partition: {}-{}", inFlightBatch, groupId, topicIdPartition);
if (inFlightBatch.batchState() == RecordState.AVAILABLE) {
// Change the state of complete batch since the same state exists for the entire inFlight batch.
inFlightBatch.archiveBatch(EMPTY_MEMBER_ID);
return true;
}
} finally {
lock.writeLock().unlock();
}
return false;
}

/**
Expand Down Expand Up @@ -903,7 +1023,8 @@ private boolean checkForFullMatch(InFlightBatch inFlightBatch, long firstOffsetT
* @return True if the start offset has moved and within the request first and last offset, false otherwise.
*/
private boolean checkForStartOffsetWithinBatch(long batchFirstOffset, long batchLastOffset) {
return batchFirstOffset < startOffset && batchLastOffset >= startOffset;
long localStartOffset = startOffset();
return batchFirstOffset < localStartOffset && batchLastOffset >= localStartOffset;
}

private Map<Long, RecordState> fetchRecordStateMapForAcknowledgementBatch(
Expand Down Expand Up @@ -1018,7 +1139,7 @@ private Optional<Throwable> acknowledgementBatchRecords(
// Determine if the in-flight batch is a full match from the request batch.
boolean fullMatch = checkForFullMatch(inFlightBatch, batch.firstOffset(), batch.lastOffset());
boolean isPerOffsetClientAck = batch.acknowledgeTypes().size() > 1;
boolean hasStartOffsetMoved = checkForStartOffsetWithinBatch(inFlightBatch.firstOffset, inFlightBatch.lastOffset);
boolean hasStartOffsetMoved = checkForStartOffsetWithinBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset());

// Maintain state per offset if the inflight batch is not a full match or the
// offset state is managed or client sent individual offsets state or
Expand Down Expand Up @@ -1707,6 +1828,13 @@ NavigableMap<Long, InFlightState> offsetState() {
return offsetState;
}

private void archiveBatch(String newMemberId) {
if (batchState == null) {
throw new IllegalStateException("The batch state is not available as the offset state is maintained");
}
batchState.archive(newMemberId);
}

private InFlightState tryUpdateBatchState(RecordState newState, boolean incrementDeliveryCount, int maxDeliveryCount, String newMemberId) {
if (batchState == null) {
throw new IllegalStateException("The batch state update is not available as the offset state is maintained");
Expand Down Expand Up @@ -1853,6 +1981,11 @@ private InFlightState tryUpdateState(RecordState newState, boolean incrementDeli
}
}

private void archive(String newMemberId) {
state = RecordState.ARCHIVED;
memberId = newMemberId;
}

private InFlightState startStateTransition(RecordState newState, boolean incrementDeliveryCount, int maxDeliveryCount, String newMemberId) {
rollbackState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask);
return tryUpdateState(newState, incrementDeliveryCount, maxDeliveryCount, newMemberId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,8 @@ private SharePartitionKey sharePartitionKey(String groupId, TopicIdPartition top
*
* @return The offset for the earliest timestamp.
*/
private long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition) {
// Visible for testing.
long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition) {
// TODO: We need to know the isolation level from group configs, for now we are passing Option.empty() for isolationLevel
Option<TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
topicIdPartition.topicPartition(), ListOffsetsRequest.EARLIEST_TIMESTAMP, Option.empty(),
Expand Down
112 changes: 112 additions & 0 deletions core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -1526,6 +1527,117 @@ public void testAcknowledgeEmptyPartitionCacheMap() {
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp).errorCode());
}

@Test
public void testProcessFetchResponseWithLsoMovementForTopicPartition() {
String groupId = "grp";
Uuid fooId = Uuid.randomUuid();
TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(fooId, new TopicPartition("foo", 1));

Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);

ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
SharePartition sp0 = Mockito.mock(SharePartition.class);
SharePartition sp1 = Mockito.mock(SharePartition.class);

Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new ConcurrentHashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);

SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).withReplicaManager(replicaManager).build();

SharePartitionManager sharePartitionManagersSpy = Mockito.spy(sharePartitionManager);

// Mocking the offsetForEarliestTimestamp method to return a valid LSO.
Mockito.doReturn(1L).when(sharePartitionManagersSpy).offsetForEarliestTimestamp(any(TopicIdPartition.class));

when(sp0.nextFetchOffset()).thenReturn((long) 0, (long) 5);
when(sp1.nextFetchOffset()).thenReturn((long) 4, (long) 4);

when(sp0.acquire(any(), any())).thenReturn(
CompletableFuture.completedFuture(Collections.emptyList()),
CompletableFuture.completedFuture(Collections.singletonList(new ShareFetchResponseData.AcquiredRecords()
.setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))));
when(sp1.acquire(any(), any())).thenReturn(
CompletableFuture.completedFuture(Collections.singletonList(new ShareFetchResponseData.AcquiredRecords()
.setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1))),
CompletableFuture.completedFuture(Collections.emptyList()));

doNothing().when(sp1).updateCacheAndOffsets(any(Long.class));
doNothing().when(sp0).updateCacheAndOffsets(any(Long.class));

CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()),
groupId, Uuid.randomUuid().toString(), Arrays.asList(tp0, tp1), future, partitionMaxBytes);

MemoryRecords records1 = MemoryRecords.withRecords(Compression.NONE,
new SimpleRecord("0".getBytes(), "v".getBytes()),
new SimpleRecord("1".getBytes(), "v".getBytes()),
new SimpleRecord("2".getBytes(), "v".getBytes()),
new SimpleRecord(null, "value".getBytes()));

List<Tuple2<TopicIdPartition, FetchPartitionData>> responseData1 = new ArrayList<>();
responseData1.add(new Tuple2<>(tp0, new FetchPartitionData(Errors.OFFSET_OUT_OF_RANGE, 0L, 0L,
MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)));
responseData1.add(new Tuple2<>(tp1, new FetchPartitionData(Errors.NONE, 0L, 0L,
records1, Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)));
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> result1 =
sharePartitionManagersSpy.processFetchResponse(shareFetchPartitionData, responseData1);

assertTrue(result1.isDone());
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData1 = result1.join();
assertEquals(2, resultData1.size());
assertTrue(resultData1.containsKey(tp0));
assertTrue(resultData1.containsKey(tp1));
assertEquals(0, resultData1.get(tp0).partitionIndex());
assertEquals(1, resultData1.get(tp1).partitionIndex());
assertEquals(Errors.NONE.code(), resultData1.get(tp0).errorCode());
assertEquals(Errors.NONE.code(), resultData1.get(tp1).errorCode());

// Since we have OFFSET_OUT_OF_RANGE exception for tp1 and no exception for tp2 from SharePartition class,
// we should have 1 call for updateCacheAndOffsets for tp0 and 0 calls for tp1.
Mockito.verify(sp0, times(1)).updateCacheAndOffsets(any(Long.class));
Mockito.verify(sp1, times(0)).updateCacheAndOffsets(any(Long.class));

MemoryRecords records2 = MemoryRecords.withRecords(100L, Compression.NONE,
new SimpleRecord("0".getBytes(), "v".getBytes()),
new SimpleRecord("1".getBytes(), "v".getBytes()),
new SimpleRecord("2".getBytes(), "v".getBytes()),
new SimpleRecord(null, "value".getBytes()));

List<Tuple2<TopicIdPartition, FetchPartitionData>> responseData2 = new ArrayList<>();
responseData2.add(new Tuple2<>(tp0, new FetchPartitionData(Errors.NONE, 0L, 0L,
records2, Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)));
responseData2.add(new Tuple2<>(tp1, new FetchPartitionData(Errors.NONE, 0L, 0L,
MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)));
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> result2 =
sharePartitionManagersSpy.processFetchResponse(shareFetchPartitionData, responseData2);

assertTrue(result2.isDone());
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData2 = result2.join();
assertEquals(2, resultData2.size());
assertTrue(resultData2.containsKey(tp0));
assertTrue(resultData2.containsKey(tp1));
assertEquals(0, resultData2.get(tp0).partitionIndex());
assertEquals(1, resultData2.get(tp1).partitionIndex());
assertEquals(Errors.NONE.code(), resultData2.get(tp0).errorCode());
assertEquals(Errors.NONE.code(), resultData2.get(tp1).errorCode());

// Since we don't see any exception for tp1 and tp2 from SharePartition class,
// the updateCacheAndOffsets calls should remain the same as the previous case.
Mockito.verify(sp0, times(1)).updateCacheAndOffsets(any(Long.class));
Mockito.verify(sp1, times(0)).updateCacheAndOffsets(any(Long.class));
}

private ShareFetchResponseData.PartitionData noErrorShareFetchResponse() {
return new ShareFetchResponseData.PartitionData().setPartitionIndex(0);
}
Expand Down
Loading