Skip to content

Commit

Permalink
KAFKA-16750: Added acknowledge code in SharePartitionManager.java cla…
Browse files Browse the repository at this point in the history
…ss including unit tests
  • Loading branch information
chirag-wadhwa5 committed Jun 26, 2024
1 parent adee6f0 commit 5f943a7
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 5 deletions.
1 change: 1 addition & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
files="(KafkaClusterTestKit).java"/>
<suppress checks="JavaNCSS"
files="RemoteLogManagerTest.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="SharePartitionManagerTest"/>

<!-- server tests -->
<suppress checks="MethodLength|JavaNCSS|NPath" files="DescribeTopicPartitionsRequestHandlerTest.java"/>
Expand Down
36 changes: 31 additions & 5 deletions core/src/main/java/kafka/server/share/SharePartitionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,39 @@ public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.Part
String groupId,
Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics
) {
log.trace("Acknowledge request for topicIdPartitions: {} with groupId: {}",
log.debug("Acknowledge request for topicIdPartitions: {} with groupId: {}",
acknowledgeTopics.keySet(), groupId);
Map<TopicIdPartition, CompletableFuture<Errors>> futures = new HashMap<>();
acknowledgeTopics.forEach((topicIdPartition, acknowledgePartitionBatches) -> {
SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey(groupId, topicIdPartition));
if (sharePartition != null) {
synchronized (sharePartition) {
CompletableFuture<Errors> future = sharePartition.acknowledge(memberId, acknowledgePartitionBatches).thenApply(throwable -> {
if (throwable.isPresent()) {
return Errors.forException(throwable.get());
} else {
return Errors.NONE;
}

});
futures.put(topicIdPartition, future);
}
} else {
futures.put(topicIdPartition, CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION));
}
});

CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> future = new CompletableFuture<>();
future.completeExceptionally(new UnsupportedOperationException("Not implemented yet"));

return future;
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.values().toArray(new CompletableFuture[0]));
return allFutures.thenApply(v -> {
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> result = new HashMap<>();
futures.forEach((topicIdPartition, future) -> {
result.put(topicIdPartition, new ShareAcknowledgeResponseData.PartitionData()
.setPartitionIndex(topicIdPartition.partition())
.setErrorCode(future.join().code()));
});
return result;
});
}

/**
Expand Down
177 changes: 177 additions & 0 deletions core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidShareSessionEpochException;
import org.apache.kafka.common.errors.ShareSessionNotFoundException;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData;
import org.apache.kafka.common.protocol.ApiKeys;
Expand All @@ -41,6 +42,7 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.group.share.NoOpShareStatePersister;
import org.apache.kafka.server.group.share.Persister;
import org.apache.kafka.server.share.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.ShareSessionCache;
import org.apache.kafka.server.share.ShareSessionKey;
import org.apache.kafka.server.util.timer.SystemTimer;
Expand All @@ -54,6 +56,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

import java.util.ArrayList;
Expand Down Expand Up @@ -1221,6 +1224,180 @@ public void testReplicaManagerFetchShouldProceed() {
any(), any(), any(ReplicaQuota.class), any());
}

@Test
public void testAcknowledgeSuccess() {
String groupId = "grp";
String memberId = Uuid.randomUuid().toString();

TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 2));
TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 4));

SharePartition sp1 = mock(SharePartition.class);
SharePartition sp2 = mock(SharePartition.class);
when(sp1.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
when(sp2.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(
Optional.of(new InvalidRequestException("Batch record not found. The base offset is not found in the cache."))
));
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<>();

SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).build();
acknowledgeTopics.put(tp1, Arrays.asList(
new ShareAcknowledgementBatch(12, 20, Collections.singletonList((byte) 1)),
new ShareAcknowledgementBatch(24, 56, Collections.singletonList((byte) 1))
));
acknowledgeTopics.put(tp2, Arrays.asList(
new ShareAcknowledgementBatch(5, 17, Collections.singletonList((byte) 3)),
new ShareAcknowledgementBatch(19, 26, Collections.singletonList((byte) 1))
));
acknowledgeTopics.put(tp3, Arrays.asList(
new ShareAcknowledgementBatch(45, 60, Collections.singletonList((byte) 2)),
new ShareAcknowledgementBatch(67, 82, Collections.singletonList((byte) 2))
));
CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> resultFuture =
sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics);
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> result = resultFuture.join();
assertEquals(3, result.size());
assertTrue(result.containsKey(tp1));
assertTrue(result.containsKey(tp2));
assertTrue(result.containsKey(tp3));
assertEquals(0, result.get(tp1).partitionIndex());
assertEquals(Errors.NONE.code(), result.get(tp1).errorCode());
assertEquals(2, result.get(tp2).partitionIndex());
assertEquals(Errors.INVALID_REQUEST.code(), result.get(tp2).errorCode());
assertEquals(4, result.get(tp3).partitionIndex());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp3).errorCode());
}

@Test
public void testAcknowledgeIncorrectGroupId() {
String groupId = "grp";
String groupId2 = "grp2";
String memberId = Uuid.randomUuid().toString();

TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 2));
TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 4));

SharePartition sp1 = mock(SharePartition.class);
SharePartition sp2 = mock(SharePartition.class);
when(sp1.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
when(sp2.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(
Optional.of(new InvalidRequestException("Batch record not found. The base offset is not found in the cache."))
));
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<>();

SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).build();

acknowledgeTopics.put(tp1, Arrays.asList(
new ShareAcknowledgementBatch(12, 20, Collections.singletonList((byte) 1)),
new ShareAcknowledgementBatch(24, 56, Collections.singletonList((byte) 1))
));
acknowledgeTopics.put(tp2, Arrays.asList(
new ShareAcknowledgementBatch(5, 17, Collections.singletonList((byte) 3)),
new ShareAcknowledgementBatch(19, 26, Collections.singletonList((byte) 1))
));
acknowledgeTopics.put(tp3, Arrays.asList(
new ShareAcknowledgementBatch(45, 60, Collections.singletonList((byte) 2)),
new ShareAcknowledgementBatch(67, 82, Collections.singletonList((byte) 2))
));
CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> resultFuture =
sharePartitionManager.acknowledge(memberId, groupId2, acknowledgeTopics);
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> result = resultFuture.join();
assertEquals(3, result.size());
assertTrue(result.containsKey(tp1));
assertTrue(result.containsKey(tp2));
assertTrue(result.containsKey(tp3));
assertEquals(0, result.get(tp1).partitionIndex());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp1).errorCode());
assertEquals(2, result.get(tp2).partitionIndex());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp2).errorCode());
assertEquals(4, result.get(tp3).partitionIndex());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp3).errorCode());
}

@Test
public void testAcknowledgeIncorrectMemberId() {
String groupId = "grp";
String memberId2 = Uuid.randomUuid().toString();

TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 2));

SharePartition sp1 = mock(SharePartition.class);
SharePartition sp2 = mock(SharePartition.class);
when(sp1.acknowledge(ArgumentMatchers.eq(memberId2), any())).thenReturn(CompletableFuture.completedFuture(
Optional.of(new InvalidRequestException("Member is not the owner of batch record"))
));
when(sp2.acknowledge(ArgumentMatchers.eq(memberId2), any())).thenReturn(CompletableFuture.completedFuture(
Optional.of(new InvalidRequestException("Member is not the owner of batch record"))
));
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<>();

SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).build();

acknowledgeTopics.put(tp1, Arrays.asList(
new ShareAcknowledgementBatch(12, 20, Collections.singletonList((byte) 1)),
new ShareAcknowledgementBatch(24, 56, Collections.singletonList((byte) 1))
));
acknowledgeTopics.put(tp2, Arrays.asList(
new ShareAcknowledgementBatch(5, 17, Collections.singletonList((byte) 3)),
new ShareAcknowledgementBatch(19, 26, Collections.singletonList((byte) 1))
));

CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> resultFuture =
sharePartitionManager.acknowledge(memberId2, groupId, acknowledgeTopics);
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> result = resultFuture.join();
assertEquals(2, result.size());
assertTrue(result.containsKey(tp1));
assertTrue(result.containsKey(tp2));
assertEquals(0, result.get(tp1).partitionIndex());
assertEquals(Errors.INVALID_REQUEST.code(), result.get(tp1).errorCode());
assertEquals(2, result.get(tp2).partitionIndex());
assertEquals(Errors.INVALID_REQUEST.code(), result.get(tp2).errorCode());
}

@Test
public void testAcknowledgeEmptyPartitionCacheMap() {
String groupId = "grp";
String memberId = Uuid.randomUuid().toString();

TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo4", 3));

SharePartition sp1 = mock(SharePartition.class);
SharePartition sp2 = mock(SharePartition.class);
when(sp1.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
when(sp2.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(
Optional.of(new InvalidRequestException("Batch record not found. The base offset is not found in the cache."))
));
Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<>();

SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().build();
acknowledgeTopics.put(tp, Arrays.asList(
new ShareAcknowledgementBatch(78, 90, Collections.singletonList((byte) 2)),
new ShareAcknowledgementBatch(94, 99, Collections.singletonList((byte) 2))
));
CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> resultFuture =
sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics);
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> result = resultFuture.join();
assertEquals(1, result.size());
assertTrue(result.containsKey(tp));
assertEquals(3, result.get(tp).partitionIndex());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp).errorCode());
}

private ShareFetchResponseData.PartitionData noErrorShareFetchResponse() {
return new ShareFetchResponseData.PartitionData().setPartitionIndex(0);
}
Expand Down

0 comments on commit 5f943a7

Please sign in to comment.