From 5f943a7b88a50e636a9c39ecf473b001e5ac454a Mon Sep 17 00:00:00 2001 From: Chirag Wadhwa Date: Wed, 26 Jun 2024 18:10:37 +0530 Subject: [PATCH] KAFKA-16750: Added acknowledge code in SharePartitionManager.java class including unit tests --- checkstyle/suppressions.xml | 1 + .../server/share/SharePartitionManager.java | 36 +++- .../share/SharePartitionManagerTest.java | 177 ++++++++++++++++++ 3 files changed, 209 insertions(+), 5 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 349a60ba40cb..e53f794cda24 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -45,6 +45,7 @@ files="(KafkaClusterTestKit).java"/> + diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index 848fe6b04121..0bd017ce6360 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -225,13 +225,39 @@ public CompletableFuture> acknowledgeTopics ) { - log.trace("Acknowledge request for topicIdPartitions: {} with groupId: {}", + log.debug("Acknowledge request for topicIdPartitions: {} with groupId: {}", acknowledgeTopics.keySet(), groupId); + Map> futures = new HashMap<>(); + acknowledgeTopics.forEach((topicIdPartition, acknowledgePartitionBatches) -> { + SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey(groupId, topicIdPartition)); + if (sharePartition != null) { + synchronized (sharePartition) { + CompletableFuture future = sharePartition.acknowledge(memberId, acknowledgePartitionBatches).thenApply(throwable -> { + if (throwable.isPresent()) { + return Errors.forException(throwable.get()); + } else { + return Errors.NONE; + } + + }); + futures.put(topicIdPartition, future); + } + } else { + futures.put(topicIdPartition, CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION)); + } + }); - CompletableFuture> future = new CompletableFuture<>(); - future.completeExceptionally(new UnsupportedOperationException("Not implemented yet")); - - return future; + CompletableFuture allFutures = CompletableFuture.allOf( + futures.values().toArray(new CompletableFuture[0])); + return allFutures.thenApply(v -> { + Map result = new HashMap<>(); + futures.forEach((topicIdPartition, future) -> { + result.put(topicIdPartition, new ShareAcknowledgeResponseData.PartitionData() + .setPartitionIndex(topicIdPartition.partition()) + .setErrorCode(future.join().code())); + }); + return result; + }); } /** diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index 68459cf09b7e..ac7e6b9317ff 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.InvalidShareSessionEpochException; import org.apache.kafka.common.errors.ShareSessionNotFoundException; +import org.apache.kafka.common.message.ShareAcknowledgeResponseData; import org.apache.kafka.common.message.ShareFetchResponseData; import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData; import org.apache.kafka.common.protocol.ApiKeys; @@ -41,6 +42,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.group.share.NoOpShareStatePersister; import org.apache.kafka.server.group.share.Persister; +import org.apache.kafka.server.share.ShareAcknowledgementBatch; import org.apache.kafka.server.share.ShareSessionCache; import org.apache.kafka.server.share.ShareSessionKey; import org.apache.kafka.server.util.timer.SystemTimer; @@ -54,6 +56,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.mockito.ArgumentMatchers; import org.mockito.Mockito; import java.util.ArrayList; @@ -1221,6 +1224,180 @@ public void testReplicaManagerFetchShouldProceed() { any(), any(), any(ReplicaQuota.class), any()); } + @Test + public void testAcknowledgeSuccess() { + String groupId = "grp"; + String memberId = Uuid.randomUuid().toString(); + + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 2)); + TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 4)); + + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + when(sp1.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(Optional.empty())); + when(sp2.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture( + Optional.of(new InvalidRequestException("Batch record not found. The base offset is not found in the cache.")) + )); + Map partitionCacheMap = new HashMap<>(); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp2), sp2); + Map> acknowledgeTopics = new HashMap<>(); + + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() + .withPartitionCacheMap(partitionCacheMap).build(); + acknowledgeTopics.put(tp1, Arrays.asList( + new ShareAcknowledgementBatch(12, 20, Collections.singletonList((byte) 1)), + new ShareAcknowledgementBatch(24, 56, Collections.singletonList((byte) 1)) + )); + acknowledgeTopics.put(tp2, Arrays.asList( + new ShareAcknowledgementBatch(5, 17, Collections.singletonList((byte) 3)), + new ShareAcknowledgementBatch(19, 26, Collections.singletonList((byte) 1)) + )); + acknowledgeTopics.put(tp3, Arrays.asList( + new ShareAcknowledgementBatch(45, 60, Collections.singletonList((byte) 2)), + new ShareAcknowledgementBatch(67, 82, Collections.singletonList((byte) 2)) + )); + CompletableFuture> resultFuture = + sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics); + Map result = resultFuture.join(); + assertEquals(3, result.size()); + assertTrue(result.containsKey(tp1)); + assertTrue(result.containsKey(tp2)); + assertTrue(result.containsKey(tp3)); + assertEquals(0, result.get(tp1).partitionIndex()); + assertEquals(Errors.NONE.code(), result.get(tp1).errorCode()); + assertEquals(2, result.get(tp2).partitionIndex()); + assertEquals(Errors.INVALID_REQUEST.code(), result.get(tp2).errorCode()); + assertEquals(4, result.get(tp3).partitionIndex()); + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp3).errorCode()); + } + + @Test + public void testAcknowledgeIncorrectGroupId() { + String groupId = "grp"; + String groupId2 = "grp2"; + String memberId = Uuid.randomUuid().toString(); + + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 2)); + TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 4)); + + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + when(sp1.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(Optional.empty())); + when(sp2.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture( + Optional.of(new InvalidRequestException("Batch record not found. The base offset is not found in the cache.")) + )); + Map partitionCacheMap = new HashMap<>(); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp2), sp2); + Map> acknowledgeTopics = new HashMap<>(); + + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() + .withPartitionCacheMap(partitionCacheMap).build(); + + acknowledgeTopics.put(tp1, Arrays.asList( + new ShareAcknowledgementBatch(12, 20, Collections.singletonList((byte) 1)), + new ShareAcknowledgementBatch(24, 56, Collections.singletonList((byte) 1)) + )); + acknowledgeTopics.put(tp2, Arrays.asList( + new ShareAcknowledgementBatch(5, 17, Collections.singletonList((byte) 3)), + new ShareAcknowledgementBatch(19, 26, Collections.singletonList((byte) 1)) + )); + acknowledgeTopics.put(tp3, Arrays.asList( + new ShareAcknowledgementBatch(45, 60, Collections.singletonList((byte) 2)), + new ShareAcknowledgementBatch(67, 82, Collections.singletonList((byte) 2)) + )); + CompletableFuture> resultFuture = + sharePartitionManager.acknowledge(memberId, groupId2, acknowledgeTopics); + Map result = resultFuture.join(); + assertEquals(3, result.size()); + assertTrue(result.containsKey(tp1)); + assertTrue(result.containsKey(tp2)); + assertTrue(result.containsKey(tp3)); + assertEquals(0, result.get(tp1).partitionIndex()); + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp1).errorCode()); + assertEquals(2, result.get(tp2).partitionIndex()); + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp2).errorCode()); + assertEquals(4, result.get(tp3).partitionIndex()); + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp3).errorCode()); + } + + @Test + public void testAcknowledgeIncorrectMemberId() { + String groupId = "grp"; + String memberId2 = Uuid.randomUuid().toString(); + + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 2)); + + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + when(sp1.acknowledge(ArgumentMatchers.eq(memberId2), any())).thenReturn(CompletableFuture.completedFuture( + Optional.of(new InvalidRequestException("Member is not the owner of batch record")) + )); + when(sp2.acknowledge(ArgumentMatchers.eq(memberId2), any())).thenReturn(CompletableFuture.completedFuture( + Optional.of(new InvalidRequestException("Member is not the owner of batch record")) + )); + Map partitionCacheMap = new HashMap<>(); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp2), sp2); + Map> acknowledgeTopics = new HashMap<>(); + + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() + .withPartitionCacheMap(partitionCacheMap).build(); + + acknowledgeTopics.put(tp1, Arrays.asList( + new ShareAcknowledgementBatch(12, 20, Collections.singletonList((byte) 1)), + new ShareAcknowledgementBatch(24, 56, Collections.singletonList((byte) 1)) + )); + acknowledgeTopics.put(tp2, Arrays.asList( + new ShareAcknowledgementBatch(5, 17, Collections.singletonList((byte) 3)), + new ShareAcknowledgementBatch(19, 26, Collections.singletonList((byte) 1)) + )); + + CompletableFuture> resultFuture = + sharePartitionManager.acknowledge(memberId2, groupId, acknowledgeTopics); + Map result = resultFuture.join(); + assertEquals(2, result.size()); + assertTrue(result.containsKey(tp1)); + assertTrue(result.containsKey(tp2)); + assertEquals(0, result.get(tp1).partitionIndex()); + assertEquals(Errors.INVALID_REQUEST.code(), result.get(tp1).errorCode()); + assertEquals(2, result.get(tp2).partitionIndex()); + assertEquals(Errors.INVALID_REQUEST.code(), result.get(tp2).errorCode()); + } + + @Test + public void testAcknowledgeEmptyPartitionCacheMap() { + String groupId = "grp"; + String memberId = Uuid.randomUuid().toString(); + + TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo4", 3)); + + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + when(sp1.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(Optional.empty())); + when(sp2.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture( + Optional.of(new InvalidRequestException("Batch record not found. The base offset is not found in the cache.")) + )); + Map> acknowledgeTopics = new HashMap<>(); + + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().build(); + acknowledgeTopics.put(tp, Arrays.asList( + new ShareAcknowledgementBatch(78, 90, Collections.singletonList((byte) 2)), + new ShareAcknowledgementBatch(94, 99, Collections.singletonList((byte) 2)) + )); + CompletableFuture> resultFuture = + sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics); + Map result = resultFuture.join(); + assertEquals(1, result.size()); + assertTrue(result.containsKey(tp)); + assertEquals(3, result.get(tp).partitionIndex()); + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp).errorCode()); + } + private ShareFetchResponseData.PartitionData noErrorShareFetchResponse() { return new ShareFetchResponseData.PartitionData().setPartitionIndex(0); }