diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index 359753cc0a26..978ca2872ae5 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -17,6 +17,7 @@ package kafka.server.share; import kafka.server.DelayedOperation; +import kafka.server.DelayedOperationPurgatory; import kafka.server.LogReadResult; import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; @@ -54,17 +55,20 @@ public class DelayedShareFetch extends DelayedOperation { private final ShareFetchData shareFetchData; private final ReplicaManager replicaManager; private final Map partitionCacheMap; + private final DelayedOperationPurgatory delayedShareFetchPurgatory; private Map topicPartitionDataFromTryComplete; DelayedShareFetch( ShareFetchData shareFetchData, ReplicaManager replicaManager, - Map partitionCacheMap) { + Map partitionCacheMap, + DelayedOperationPurgatory delayedShareFetchPurgatory) { super(shareFetchData.fetchParams().maxWaitMs, Option.empty()); this.shareFetchData = shareFetchData; this.replicaManager = replicaManager; this.partitionCacheMap = partitionCacheMap; + this.delayedShareFetchPurgatory = delayedShareFetchPurgatory; this.topicPartitionDataFromTryComplete = new LinkedHashMap<>(); } @@ -131,6 +135,11 @@ public void onComplete() { } // Releasing the lock to move ahead with the next request in queue. releasePartitionLocks(shareFetchData.groupId(), topicPartitionData.keySet()); + // If we have a fetch request completed for a topic-partition, we release the locks for that partition, + // then we should check if there is a pending share fetch request for the topic-partition and complete it. + // We add the action to delayed actions queue to avoid an infinite call stack, which could happen if + // we directly call delayedShareFetchPurgatory.checkAndComplete + replicaManager.addCompleteDelayedShareFetchPurgatoryAction(CollectionConverters.asScala(result.keySet()).toSeq(), shareFetchData.groupId(), delayedShareFetchPurgatory); }); } catch (Exception e) { diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java b/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java index a4c13f8f1a8f..15c8f727ace2 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java @@ -28,7 +28,7 @@ */ public class DelayedShareFetchKey extends SharePartitionKey implements DelayedOperationKey { - DelayedShareFetchKey(String groupId, TopicIdPartition topicIdPartition) { + public DelayedShareFetchKey(String groupId, TopicIdPartition topicIdPartition) { super(groupId, topicIdPartition); } diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index 09001872174c..eada97191f5f 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -600,7 +600,7 @@ void maybeProcessFetchQueue() { new DelayedShareFetchKey(shareFetchData.groupId(), topicIdPartition))); // Add the share fetch to the delayed share fetch purgatory to process the fetch request. - addDelayedShareFetch(new DelayedShareFetch(shareFetchData, replicaManager, partitionCacheMap), + addDelayedShareFetch(new DelayedShareFetch(shareFetchData, replicaManager, partitionCacheMap, delayedShareFetchPurgatory), delayedShareFetchWatchKeys); // Release the lock so that other threads can process the queue. diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index fee70d8fe371..ed98b139e859 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -25,6 +25,7 @@ import kafka.server.HostedPartition.Online import kafka.server.QuotaFactory.QuotaManagers import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported} import kafka.server.metadata.ZkMetadataCache +import kafka.server.share.{DelayedShareFetch, DelayedShareFetchKey} import kafka.utils.Implicits._ import kafka.utils._ import kafka.zk.KafkaZkClient @@ -748,6 +749,22 @@ class ReplicaManager(val config: KafkaConfig, def tryCompleteActions(): Unit = defaultActionQueue.tryCompleteActions() + /** + * Append an action to the DelayedActionQueue to complete any pending share fetch requests for the already fetched topic partitions. + * @param topicIdPartitions Topic partitions for which we know that the data has been fetched already + * @param groupId Share group's id + * @param delayedShareFetchPurgatory Purgatory storing the pending delayed share fetch requests + */ + def addCompleteDelayedShareFetchPurgatoryAction(topicIdPartitions: Seq[TopicIdPartition], + groupId: String, + delayedShareFetchPurgatory: DelayedOperationPurgatory[DelayedShareFetch]): Unit = { + defaultActionQueue.add { + () => topicIdPartitions.foreach(topicIdPartition => { + delayedShareFetchPurgatory.checkAndComplete(new DelayedShareFetchKey(groupId, topicIdPartition)) + }) + } + } + /** * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas; * the callback function will be triggered either when timeout or the required acks are satisfied; diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 3ddd2a79021e..3cccead0dbbe 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -16,6 +16,7 @@ */ package kafka.server.share; +import kafka.server.DelayedOperationPurgatory; import kafka.server.ReplicaManager; import kafka.server.ReplicaQuota; @@ -29,22 +30,36 @@ import org.apache.kafka.server.share.fetch.ShareFetchData; import org.apache.kafka.server.storage.log.FetchIsolation; import org.apache.kafka.server.storage.log.FetchParams; +import org.apache.kafka.server.util.timer.SystemTimer; +import org.apache.kafka.server.util.timer.SystemTimerReaper; +import org.apache.kafka.server.util.timer.Timer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import scala.jdk.javaapi.CollectionConverters; + +import static kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL; import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES; +import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -53,6 +68,18 @@ public class DelayedShareFetchTest { private static final int MAX_WAIT_MS = 5000; + private static Timer mockTimer; + + @BeforeEach + public void setUp() { + mockTimer = new SystemTimerReaper("DelayedShareFetchTestReaper", + new SystemTimer("DelayedShareFetchTestTimer")); + } + + @AfterEach + public void tearDown() throws Exception { + mockTimer.close(); + } @Test public void testDelayedShareFetchTryCompleteReturnsFalse() { @@ -95,6 +122,7 @@ public void testDelayedShareFetchTryCompleteReturnsFalse() { public void testDelayedShareFetchTryCompleteReturnsTrue() { String groupId = "grp"; Uuid topicId = Uuid.randomUuid(); + ReplicaManager replicaManager = mock(ReplicaManager.class); TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0)); TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1)); Map partitionMaxBytes = new HashMap<>(); @@ -118,9 +146,14 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() { when(sp0.canAcquireRecords()).thenReturn(true); when(sp1.canAcquireRecords()).thenReturn(false); + when(sp0.acquire(any(), any())).thenReturn(CompletableFuture.completedFuture( + Collections.singletonList(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)))); + doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetchData) .withPartitionCacheMap(partitionCacheMap) + .withReplicaManager(replicaManager) .build(); assertFalse(delayedShareFetch.isCompleted()); @@ -200,6 +233,9 @@ public void testReplicaManagerFetchShouldHappenOnComplete() { when(sp0.canAcquireRecords()).thenReturn(true); when(sp1.canAcquireRecords()).thenReturn(false); + when(sp0.acquire(any(), any())).thenReturn(CompletableFuture.completedFuture( + Collections.singletonList(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)))); + doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetchData) .withReplicaManager(replicaManager) @@ -261,10 +297,99 @@ public void testToCompleteAnAlreadyCompletedFuture() { Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions(); } + @Test + public void testForceCompleteTriggersDelayedActionsQueue() { + String groupId = "grp"; + Uuid topicId = Uuid.randomUuid(); + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(topicId, new TopicPartition("foo", 2)); + Map partitionMaxBytes1 = new HashMap<>(); + partitionMaxBytes1.put(tp0, PARTITION_MAX_BYTES); + partitionMaxBytes1.put(tp1, PARTITION_MAX_BYTES); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + // No share partition is available for acquiring initially. + when(sp0.maybeAcquireFetchLock()).thenReturn(false); + when(sp1.maybeAcquireFetchLock()).thenReturn(false); + when(sp2.maybeAcquireFetchLock()).thenReturn(false); + + Map partitionCacheMap = new ConcurrentHashMap<>(); + partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); + partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); + partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2); + + ShareFetchData shareFetchData1 = new ShareFetchData( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), + new CompletableFuture<>(), partitionMaxBytes1); + + DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( + "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + + Set delayedShareFetchWatchKeys = new HashSet<>(); + partitionMaxBytes1.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId, topicIdPartition))); + + DelayedShareFetch delayedShareFetch1 = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetchData1) + .withReplicaManager(replicaManager) + .withPartitionCacheMap(partitionCacheMap) + .build(); + + // We add a delayed share fetch entry to the purgatory which will be waiting for completion since none of the + // partitions in the share fetch request can be acquired. + delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch1, CollectionConverters.asScala(delayedShareFetchWatchKeys).toSeq()); + + assertEquals(2, delayedShareFetchPurgatory.watched()); + assertFalse(shareFetchData1.future().isDone()); + + Map partitionMaxBytes2 = new HashMap<>(); + partitionMaxBytes2.put(tp1, PARTITION_MAX_BYTES); + partitionMaxBytes2.put(tp2, PARTITION_MAX_BYTES); + ShareFetchData shareFetchData2 = new ShareFetchData( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), + new CompletableFuture<>(), partitionMaxBytes2); + + doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + // Counter for checking the number of times replicaManager.addCompleteDelayedShareFetchPurgatoryAction is called in the test. + AtomicInteger counter = new AtomicInteger(); + doAnswer(invocationOnMock -> counter.addAndGet(1)).when(replicaManager).addCompleteDelayedShareFetchPurgatoryAction( + CollectionConverters.asScala(Collections.singleton(tp1)).toSeq(), groupId, delayedShareFetchPurgatory); + + DelayedShareFetch delayedShareFetch2 = DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetchData2) + .withReplicaManager(replicaManager) + .withPartitionCacheMap(partitionCacheMap) + .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) + .build(); + + // sp1 can be acquired now + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp1.acquire(any(), any())).thenReturn(CompletableFuture.completedFuture( + Collections.singletonList(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)))); + + // when forceComplete is called for delayedShareFetch2, since tp1 is common in between delayed share fetch + // requests, it should add a "check and complete" action for request key tp1 on the purgatory. + delayedShareFetch2.forceComplete(); + assertTrue(delayedShareFetch2.isCompleted()); + assertTrue(shareFetchData2.future().isDone()); + Mockito.verify(replicaManager, times(1)).readFromLog( + any(), any(), any(ReplicaQuota.class), anyBoolean()); + assertFalse(delayedShareFetch1.isCompleted()); + assertEquals(1, counter.get()); + } + static class DelayedShareFetchBuilder { ShareFetchData shareFetchData = mock(ShareFetchData.class); private ReplicaManager replicaManager = mock(ReplicaManager.class); private Map partitionCacheMap = new HashMap<>(); + private DelayedOperationPurgatory delayedShareFetchPurgatory = mock(DelayedOperationPurgatory.class); DelayedShareFetchBuilder withShareFetchData(ShareFetchData shareFetchData) { this.shareFetchData = shareFetchData; @@ -281,15 +406,21 @@ DelayedShareFetchBuilder withPartitionCacheMap(Map delayedShareFetchPurgatory) { + this.delayedShareFetchPurgatory = delayedShareFetchPurgatory; + return this; + } + public static DelayedShareFetchBuilder builder() { return new DelayedShareFetchBuilder(); } public DelayedShareFetch build() { return new DelayedShareFetch( - shareFetchData, - replicaManager, - partitionCacheMap); + shareFetchData, + replicaManager, + partitionCacheMap, + delayedShareFetchPurgatory); } } } diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index c6424a04d08f..8f66fca44ca0 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -130,10 +130,10 @@ public class SharePartitionManagerTest { private static final int RECORD_LOCK_DURATION_MS = 30000; private static final int MAX_DELIVERY_COUNT = 5; private static final short MAX_IN_FLIGHT_MESSAGES = 200; - static final int PARTITION_MAX_BYTES = 40000; private static final int DELAYED_SHARE_FETCH_MAX_WAIT_MS = 2000; - private static final int DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL = 1000; private static final int DELAYED_SHARE_FETCH_TIMEOUT_MS = 3000; + static final int PARTITION_MAX_BYTES = 40000; + static final int DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL = 1000; private static Timer mockTimer; @@ -2270,7 +2270,7 @@ private void assertErroneousAndValidTopicIdPartitions( assertEquals(expectedValidSet, actualValidPartitions); } - private Seq> buildLogReadResult(Set topicIdPartitions) { + static Seq> buildLogReadResult(Set topicIdPartitions) { List> logReadResults = new ArrayList<>(); topicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult( new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),