diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 349a60ba40cb..e53f794cda24 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -45,6 +45,7 @@ files="(KafkaClusterTestKit).java"/> + diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index a18d8dbe854d..aeaba437a2c1 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -252,11 +252,33 @@ public CompletableFuture> futures = new HashMap<>(); + acknowledgeTopics.forEach((topicIdPartition, acknowledgePartitionBatches) -> { + SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey(groupId, topicIdPartition)); + if (sharePartition != null) { + CompletableFuture future = sharePartition.acknowledge(memberId, acknowledgePartitionBatches).thenApply(throwable -> { + if (throwable.isPresent()) { + return Errors.forException(throwable.get()); + } + return Errors.NONE; + }); + futures.put(topicIdPartition, future); + } else { + futures.put(topicIdPartition, CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION)); + } + }); - CompletableFuture> future = new CompletableFuture<>(); - future.completeExceptionally(new UnsupportedOperationException("Not implemented yet")); - - return future; + CompletableFuture allFutures = CompletableFuture.allOf( + futures.values().toArray(new CompletableFuture[0])); + return allFutures.thenApply(v -> { + Map result = new HashMap<>(); + futures.forEach((topicIdPartition, future) -> { + result.put(topicIdPartition, new ShareAcknowledgeResponseData.PartitionData() + .setPartitionIndex(topicIdPartition.partition()) + .setErrorCode(future.join().code())); + }); + return result; + }); } /** diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index bf14d25a3031..de18de41fd20 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -43,6 +43,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.group.share.NoOpShareStatePersister; import org.apache.kafka.server.group.share.Persister; +import org.apache.kafka.server.share.ShareAcknowledgementBatch; import org.apache.kafka.server.share.ShareSessionCache; import org.apache.kafka.server.share.ShareSessionKey; import org.apache.kafka.server.util.timer.MockTimer; @@ -1364,6 +1365,167 @@ public void testReleaseAcquiredRecordsWithEmptyTopicPartitions() { assertEquals(0, result.size()); } + @Test + public void testAcknowledgeSinglePartition() { + String groupId = "grp"; + String memberId = Uuid.randomUuid().toString(); + + TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + SharePartition sp = mock(SharePartition.class); + + when(sp.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(Optional.empty())); + + Map partitionCacheMap = new HashMap<>(); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp), sp); + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() + .withPartitionCacheMap(partitionCacheMap).build(); + + Map> acknowledgeTopics = new HashMap<>(); + acknowledgeTopics.put(tp, Arrays.asList( + new ShareAcknowledgementBatch(12, 20, Collections.singletonList((byte) 1)), + new ShareAcknowledgementBatch(24, 56, Collections.singletonList((byte) 1)) + )); + CompletableFuture> resultFuture = + sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics); + Map result = resultFuture.join(); + assertEquals(1, result.size()); + assertTrue(result.containsKey(tp)); + assertEquals(0, result.get(tp).partitionIndex()); + assertEquals(Errors.NONE.code(), result.get(tp).errorCode()); + } + + @Test + public void testAcknowledgeMultiplePartition() { + String groupId = "grp"; + String memberId = Uuid.randomUuid().toString(); + + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0)); + TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 0)); + + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + SharePartition sp3 = mock(SharePartition.class); + + when(sp1.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(Optional.empty())); + when(sp2.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(Optional.empty())); + when(sp3.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(Optional.empty())); + + Map partitionCacheMap = new HashMap<>(); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp2), sp2); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp3), sp3); + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() + .withPartitionCacheMap(partitionCacheMap).build(); + + Map> acknowledgeTopics = new HashMap<>(); + acknowledgeTopics.put(tp1, Arrays.asList( + new ShareAcknowledgementBatch(12, 20, Collections.singletonList((byte) 1)), + new ShareAcknowledgementBatch(24, 56, Collections.singletonList((byte) 1)) + )); + acknowledgeTopics.put(tp2, Arrays.asList( + new ShareAcknowledgementBatch(15, 26, Collections.singletonList((byte) 2)), + new ShareAcknowledgementBatch(34, 56, Collections.singletonList((byte) 2)) + )); + acknowledgeTopics.put(tp3, Arrays.asList( + new ShareAcknowledgementBatch(4, 15, Collections.singletonList((byte) 3)), + new ShareAcknowledgementBatch(16, 21, Collections.singletonList((byte) 3)) + )); + CompletableFuture> resultFuture = + sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics); + Map result = resultFuture.join(); + assertEquals(3, result.size()); + assertTrue(result.containsKey(tp1)); + assertTrue(result.containsKey(tp2)); + assertTrue(result.containsKey(tp3)); + assertEquals(0, result.get(tp1).partitionIndex()); + assertEquals(Errors.NONE.code(), result.get(tp1).errorCode()); + assertEquals(0, result.get(tp2).partitionIndex()); + assertEquals(Errors.NONE.code(), result.get(tp2).errorCode()); + assertEquals(0, result.get(tp3).partitionIndex()); + assertEquals(Errors.NONE.code(), result.get(tp3).errorCode()); + } + + @Test + public void testAcknowledgeIncorrectGroupId() { + String groupId = "grp"; + String groupId2 = "grp2"; + String memberId = Uuid.randomUuid().toString(); + + TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + SharePartition sp = mock(SharePartition.class); + + Map partitionCacheMap = new HashMap<>(); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp), sp); + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() + .withPartitionCacheMap(partitionCacheMap).build(); + + Map> acknowledgeTopics = new HashMap<>(); + acknowledgeTopics.put(tp, Arrays.asList( + new ShareAcknowledgementBatch(12, 20, Collections.singletonList((byte) 1)), + new ShareAcknowledgementBatch(24, 56, Collections.singletonList((byte) 1)) + )); + CompletableFuture> resultFuture = + sharePartitionManager.acknowledge(memberId, groupId2, acknowledgeTopics); + Map result = resultFuture.join(); + assertEquals(1, result.size()); + assertTrue(result.containsKey(tp)); + assertEquals(0, result.get(tp).partitionIndex()); + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp).errorCode()); + } + + @Test + public void testAcknowledgeIncorrectMemberId() { + String groupId = "grp"; + String memberId = Uuid.randomUuid().toString(); + + TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + SharePartition sp = mock(SharePartition.class); + when(sp.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture( + Optional.of(new InvalidRequestException("Member is not the owner of batch record")) + )); + Map partitionCacheMap = new HashMap<>(); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp), sp); + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() + .withPartitionCacheMap(partitionCacheMap).build(); + + Map> acknowledgeTopics = new HashMap<>(); + acknowledgeTopics.put(tp, Arrays.asList( + new ShareAcknowledgementBatch(12, 20, Collections.singletonList((byte) 1)), + new ShareAcknowledgementBatch(24, 56, Collections.singletonList((byte) 1)) + )); + + CompletableFuture> resultFuture = + sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics); + Map result = resultFuture.join(); + assertEquals(1, result.size()); + assertTrue(result.containsKey(tp)); + assertEquals(0, result.get(tp).partitionIndex()); + assertEquals(Errors.INVALID_REQUEST.code(), result.get(tp).errorCode()); + } + + @Test + public void testAcknowledgeEmptyPartitionCacheMap() { + String groupId = "grp"; + String memberId = Uuid.randomUuid().toString(); + + TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo4", 3)); + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().build(); + + Map> acknowledgeTopics = new HashMap<>(); + acknowledgeTopics.put(tp, Arrays.asList( + new ShareAcknowledgementBatch(78, 90, Collections.singletonList((byte) 2)), + new ShareAcknowledgementBatch(94, 99, Collections.singletonList((byte) 2)) + )); + CompletableFuture> resultFuture = + sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics); + Map result = resultFuture.join(); + assertEquals(1, result.size()); + assertTrue(result.containsKey(tp)); + assertEquals(3, result.get(tp).partitionIndex()); + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp).errorCode()); + } + private ShareFetchResponseData.PartitionData noErrorShareFetchResponse() { return new ShareFetchResponseData.PartitionData().setPartitionIndex(0); }