Skip to content

Commit

Permalink
KAFKA-16750: Minor refactorings for better readability
Browse files Browse the repository at this point in the history
  • Loading branch information
chirag-wadhwa5 committed Jun 27, 2024
1 parent 276e4f2 commit 1986472
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 91 deletions.
20 changes: 8 additions & 12 deletions core/src/main/java/kafka/server/share/SharePartitionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -250,23 +250,19 @@ public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.Part
String groupId,
Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics
) {
log.debug("Acknowledge request for topicIdPartitions: {} with groupId: {}",
log.trace("Acknowledge request for topicIdPartitions: {} with groupId: {}",
acknowledgeTopics.keySet(), groupId);
Map<TopicIdPartition, CompletableFuture<Errors>> futures = new HashMap<>();
acknowledgeTopics.forEach((topicIdPartition, acknowledgePartitionBatches) -> {
SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey(groupId, topicIdPartition));
if (sharePartition != null) {
synchronized (sharePartition) {
CompletableFuture<Errors> future = sharePartition.acknowledge(memberId, acknowledgePartitionBatches).thenApply(throwable -> {
if (throwable.isPresent()) {
return Errors.forException(throwable.get());
} else {
return Errors.NONE;
}

});
futures.put(topicIdPartition, future);
}
CompletableFuture<Errors> future = sharePartition.acknowledge(memberId, acknowledgePartitionBatches).thenApply(throwable -> {
if (throwable.isPresent()) {
return Errors.forException(throwable.get());
}
return Errors.NONE;
});
futures.put(topicIdPartition, future);
} else {
futures.put(topicIdPartition, CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION));
}
Expand Down
146 changes: 67 additions & 79 deletions core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1230,7 +1230,7 @@ public void testCloseSharePartitionManager() throws Exception {
Timer timer = Mockito.mock(SystemTimerReaper.class);
Persister persister = Mockito.mock(Persister.class);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withTimer(timer).withShareGroupPersister(persister).build();
.withTimer(timer).withShareGroupPersister(persister).build();

// Verify that 0 calls are made to timer.close() and persister.stop().
Mockito.verify(timer, times(0)).close();
Expand All @@ -1243,52 +1243,81 @@ public void testCloseSharePartitionManager() throws Exception {
}

@Test
public void testAcknowledgeSuccess() {
public void testAcknowledgeSinglePartition() {
String groupId = "grp";
String memberId = Uuid.randomUuid().toString();

TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));

SharePartition sp = mock(SharePartition.class);
when(sp.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp), sp);
Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<>();

SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).build();
acknowledgeTopics.put(tp, Arrays.asList(
new ShareAcknowledgementBatch(12, 20, Collections.singletonList((byte) 1)),
new ShareAcknowledgementBatch(24, 56, Collections.singletonList((byte) 1))
));
CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> resultFuture =
sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics);
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> result = resultFuture.join();
assertEquals(1, result.size());
assertTrue(result.containsKey(tp));
assertEquals(0, result.get(tp).partitionIndex());
assertEquals(Errors.NONE.code(), result.get(tp).errorCode());
}

@Test
public void testAcknowledgeMultiplePartition() {
String groupId = "grp";
String memberId = Uuid.randomUuid().toString();

TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 2));
TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 4));
TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0));
TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 0));

SharePartition sp1 = mock(SharePartition.class);
SharePartition sp2 = mock(SharePartition.class);
SharePartition sp3 = mock(SharePartition.class);
when(sp1.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
when(sp2.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(
Optional.of(new InvalidRequestException("Batch record not found. The base offset is not found in the cache."))
));
when(sp2.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
when(sp3.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp3), sp3);
Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<>();

SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).build();
.withPartitionCacheMap(partitionCacheMap).build();
acknowledgeTopics.put(tp1, Arrays.asList(
new ShareAcknowledgementBatch(12, 20, Collections.singletonList((byte) 1)),
new ShareAcknowledgementBatch(24, 56, Collections.singletonList((byte) 1))
new ShareAcknowledgementBatch(12, 20, Collections.singletonList((byte) 1)),
new ShareAcknowledgementBatch(24, 56, Collections.singletonList((byte) 1))
));
acknowledgeTopics.put(tp2, Arrays.asList(
new ShareAcknowledgementBatch(5, 17, Collections.singletonList((byte) 3)),
new ShareAcknowledgementBatch(19, 26, Collections.singletonList((byte) 1))
new ShareAcknowledgementBatch(15, 26, Collections.singletonList((byte) 2)),
new ShareAcknowledgementBatch(34, 56, Collections.singletonList((byte) 2))
));
acknowledgeTopics.put(tp3, Arrays.asList(
new ShareAcknowledgementBatch(45, 60, Collections.singletonList((byte) 2)),
new ShareAcknowledgementBatch(67, 82, Collections.singletonList((byte) 2))
new ShareAcknowledgementBatch(4, 15, Collections.singletonList((byte) 3)),
new ShareAcknowledgementBatch(16, 21, Collections.singletonList((byte) 3))
));
CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> resultFuture =
sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics);
sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics);
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> result = resultFuture.join();
assertEquals(3, result.size());
assertTrue(result.containsKey(tp1));
assertTrue(result.containsKey(tp2));
assertTrue(result.containsKey(tp3));
assertEquals(0, result.get(tp1).partitionIndex());
assertEquals(Errors.NONE.code(), result.get(tp1).errorCode());
assertEquals(2, result.get(tp2).partitionIndex());
assertEquals(Errors.INVALID_REQUEST.code(), result.get(tp2).errorCode());
assertEquals(4, result.get(tp3).partitionIndex());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp3).errorCode());
assertEquals(0, result.get(tp2).partitionIndex());
assertEquals(Errors.NONE.code(), result.get(tp2).errorCode());
assertEquals(0, result.get(tp3).partitionIndex());
assertEquals(Errors.NONE.code(), result.get(tp3).errorCode());
}

@Test
Expand All @@ -1297,94 +1326,59 @@ public void testAcknowledgeIncorrectGroupId() {
String groupId2 = "grp2";
String memberId = Uuid.randomUuid().toString();

TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 2));
TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 4));
TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));

SharePartition sp1 = mock(SharePartition.class);
SharePartition sp2 = mock(SharePartition.class);
when(sp1.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
when(sp2.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(
Optional.of(new InvalidRequestException("Batch record not found. The base offset is not found in the cache."))
));
SharePartition sp = mock(SharePartition.class);
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp), sp);
Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<>();

SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).build();

acknowledgeTopics.put(tp1, Arrays.asList(
acknowledgeTopics.put(tp, Arrays.asList(
new ShareAcknowledgementBatch(12, 20, Collections.singletonList((byte) 1)),
new ShareAcknowledgementBatch(24, 56, Collections.singletonList((byte) 1))
));
acknowledgeTopics.put(tp2, Arrays.asList(
new ShareAcknowledgementBatch(5, 17, Collections.singletonList((byte) 3)),
new ShareAcknowledgementBatch(19, 26, Collections.singletonList((byte) 1))
));
acknowledgeTopics.put(tp3, Arrays.asList(
new ShareAcknowledgementBatch(45, 60, Collections.singletonList((byte) 2)),
new ShareAcknowledgementBatch(67, 82, Collections.singletonList((byte) 2))
));
CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> resultFuture =
sharePartitionManager.acknowledge(memberId, groupId2, acknowledgeTopics);
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> result = resultFuture.join();
assertEquals(3, result.size());
assertTrue(result.containsKey(tp1));
assertTrue(result.containsKey(tp2));
assertTrue(result.containsKey(tp3));
assertEquals(0, result.get(tp1).partitionIndex());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp1).errorCode());
assertEquals(2, result.get(tp2).partitionIndex());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp2).errorCode());
assertEquals(4, result.get(tp3).partitionIndex());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp3).errorCode());
assertEquals(1, result.size());
assertTrue(result.containsKey(tp));
assertEquals(0, result.get(tp).partitionIndex());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp).errorCode());
}

@Test
public void testAcknowledgeIncorrectMemberId() {
String groupId = "grp";
String memberId2 = Uuid.randomUuid().toString();
String memberId = Uuid.randomUuid().toString();

TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 2));
TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));

SharePartition sp1 = mock(SharePartition.class);
SharePartition sp2 = mock(SharePartition.class);
when(sp1.acknowledge(ArgumentMatchers.eq(memberId2), any())).thenReturn(CompletableFuture.completedFuture(
Optional.of(new InvalidRequestException("Member is not the owner of batch record"))
));
when(sp2.acknowledge(ArgumentMatchers.eq(memberId2), any())).thenReturn(CompletableFuture.completedFuture(
SharePartition sp = mock(SharePartition.class);
when(sp.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(
Optional.of(new InvalidRequestException("Member is not the owner of batch record"))
));
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp), sp);
Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<>();

SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).build();

acknowledgeTopics.put(tp1, Arrays.asList(
acknowledgeTopics.put(tp, Arrays.asList(
new ShareAcknowledgementBatch(12, 20, Collections.singletonList((byte) 1)),
new ShareAcknowledgementBatch(24, 56, Collections.singletonList((byte) 1))
));
acknowledgeTopics.put(tp2, Arrays.asList(
new ShareAcknowledgementBatch(5, 17, Collections.singletonList((byte) 3)),
new ShareAcknowledgementBatch(19, 26, Collections.singletonList((byte) 1))
));

CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> resultFuture =
sharePartitionManager.acknowledge(memberId2, groupId, acknowledgeTopics);
sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics);
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> result = resultFuture.join();
assertEquals(2, result.size());
assertTrue(result.containsKey(tp1));
assertTrue(result.containsKey(tp2));
assertEquals(0, result.get(tp1).partitionIndex());
assertEquals(Errors.INVALID_REQUEST.code(), result.get(tp1).errorCode());
assertEquals(2, result.get(tp2).partitionIndex());
assertEquals(Errors.INVALID_REQUEST.code(), result.get(tp2).errorCode());
assertEquals(1, result.size());
assertTrue(result.containsKey(tp));
assertEquals(0, result.get(tp).partitionIndex());
assertEquals(Errors.INVALID_REQUEST.code(), result.get(tp).errorCode());
}

@Test
Expand All @@ -1394,12 +1388,6 @@ public void testAcknowledgeEmptyPartitionCacheMap() {

TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo4", 3));

SharePartition sp1 = mock(SharePartition.class);
SharePartition sp2 = mock(SharePartition.class);
when(sp1.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
when(sp2.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(
Optional.of(new InvalidRequestException("Batch record not found. The base offset is not found in the cache."))
));
Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<>();

SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().build();
Expand Down

0 comments on commit 1986472

Please sign in to comment.