From 016f4bfbb1173ba8e00aa72be692b8fae35771f0 Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Thu, 12 Sep 2024 15:53:48 +0530 Subject: [PATCH 1/8] Create branch From 7a40b26409cde2204147a1809bcbe503f8b0549f Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Thu, 12 Sep 2024 18:04:43 +0530 Subject: [PATCH 2/8] Added a delayed actions queue to complete pending actions of purgatory --- .../kafka/server/share/DelayedShareFetch.java | 25 ++- .../server/share/SharePartitionManager.java | 14 +- .../server/share/DelayedShareFetchTest.java | 146 +++++++++++++++++- .../share/SharePartitionManagerTest.java | 11 +- 4 files changed, 186 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index ef80047f222e..bf0fbd6c682b 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -16,7 +16,9 @@ */ package kafka.server.share; +import kafka.server.DelayedActionQueue; import kafka.server.DelayedOperation; +import kafka.server.DelayedOperationPurgatory; import kafka.server.LogReadResult; import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; @@ -50,21 +52,32 @@ public class DelayedShareFetch extends DelayedOperation { private final ReplicaManager replicaManager; private final Map partitionCacheMap; private Map topicPartitionDataFromTryComplete = new LinkedHashMap<>(); + private final DelayedActionQueue delayedActionQueue; + private final DelayedOperationPurgatory delayedShareFetchPurgatory; private static final Logger log = LoggerFactory.getLogger(DelayedShareFetch.class); DelayedShareFetch( SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData, ReplicaManager replicaManager, - Map partitionCacheMap) { + Map partitionCacheMap, + DelayedActionQueue delayedActionQueue, + DelayedOperationPurgatory delayedShareFetchPurgatory) { super(shareFetchPartitionData.fetchParams().maxWaitMs, Option.empty()); this.shareFetchPartitionData = shareFetchPartitionData; this.replicaManager = replicaManager; this.partitionCacheMap = partitionCacheMap; + this.delayedActionQueue = delayedActionQueue; + this.delayedShareFetchPurgatory = delayedShareFetchPurgatory; } + /** + * Complete the delayed share fetch actions that were added to the queue. Since onExpiration serves as a callback for + * forceComplete, it should not lead to infinite call stack. + */ @Override public void onExpiration() { + delayedActionQueue.tryCompleteActions(); } /** @@ -126,6 +139,16 @@ public void onComplete() { } // Releasing the lock to move ahead with the next request in queue. releasePartitionLocks(shareFetchPartitionData.groupId(), topicPartitionData.keySet()); + // If we have a fetch request completed for a topic-partition, we release the locks for that partition, + // then we should check if there is a pending share fetch request for the topic-partition and complete it. + // We add the action to delayed actions queue to avoid an infinite call stack, which could happen if + // we directly call delayedShareFetchPurgatory.checkAndComplete + delayedActionQueue.add(() -> { + result.keySet().forEach(topicIdPartition -> + delayedShareFetchPurgatory.checkAndComplete( + new DelayedShareFetchKey(shareFetchPartitionData.groupId(), topicIdPartition))); + return BoxedUnit.UNIT; + }); }); } catch (Exception e) { diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index d6d43170215f..e93aac3681e6 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -16,6 +16,7 @@ */ package kafka.server.share; +import kafka.server.DelayedActionQueue; import kafka.server.DelayedOperationPurgatory; import kafka.server.ReplicaManager; @@ -145,6 +146,11 @@ public class SharePartitionManager implements AutoCloseable { */ private final DelayedOperationPurgatory delayedShareFetchPurgatory; + /** + * The delayed actions queue is used to complete any pending delayed share fetch actions. + */ + private final DelayedActionQueue delayedActionsQueue; + public SharePartitionManager( ReplicaManager replicaManager, Time time, @@ -195,6 +201,7 @@ private SharePartitionManager( this.persister = persister; this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time); this.delayedShareFetchPurgatory = new DelayedOperationPurgatory<>("ShareFetch", this.timer, this.replicaManager.localBrokerId(), shareFetchPurgatoryPurgeIntervalRequests, true, true); + this.delayedActionsQueue = new DelayedActionQueue(); } // Visible for testing. @@ -210,7 +217,8 @@ private SharePartitionManager( int maxInFlightMessages, Persister persister, Metrics metrics, - DelayedOperationPurgatory delayedShareFetchPurgatory + DelayedOperationPurgatory delayedShareFetchPurgatory, + DelayedActionQueue delayedActionsQueue ) { this.replicaManager = replicaManager; this.time = time; @@ -225,6 +233,7 @@ private SharePartitionManager( this.persister = persister; this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time); this.delayedShareFetchPurgatory = delayedShareFetchPurgatory; + this.delayedActionsQueue = delayedActionsQueue; } /** @@ -524,6 +533,7 @@ private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, Set partitionMaxBytes = new HashMap<>(); @@ -116,9 +144,14 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() { when(sp0.canAcquireRecords()).thenReturn(true); when(sp1.canAcquireRecords()).thenReturn(false); + when(sp0.acquire(any(), any())).thenReturn(CompletableFuture.completedFuture( + Collections.singletonList(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)))); + doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder() .withShareFetchPartitionData(shareFetchPartitionData) .withPartitionCacheMap(partitionCacheMap) + .withReplicaManager(replicaManager) .build(); assertFalse(delayedShareFetch.isCompleted()); @@ -198,6 +231,9 @@ public void testReplicaManagerFetchShouldHappenOnComplete() { when(sp0.canAcquireRecords()).thenReturn(true); when(sp1.canAcquireRecords()).thenReturn(false); + when(sp0.acquire(any(), any())).thenReturn(CompletableFuture.completedFuture( + Collections.singletonList(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)))); + doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder() .withShareFetchPartitionData(shareFetchPartitionData) .withReplicaManager(replicaManager) @@ -259,10 +295,102 @@ public void testToCompleteAnAlreadyCompletedFuture() { Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions(); } + @Test + public void testForceCompleteTriggersDelayedActionsQueue() { + String groupId = "grp"; + Uuid topicId = Uuid.randomUuid(); + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(topicId, new TopicPartition("foo", 2)); + Map partitionMaxBytes1 = new HashMap<>(); + partitionMaxBytes1.put(tp0, PARTITION_MAX_BYTES); + partitionMaxBytes1.put(tp1, PARTITION_MAX_BYTES); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + // No share partition is available for acquiring initially. + when(sp0.maybeAcquireFetchLock()).thenReturn(false); + when(sp1.maybeAcquireFetchLock()).thenReturn(false); + when(sp2.maybeAcquireFetchLock()).thenReturn(false); + + Map partitionCacheMap = new ConcurrentHashMap<>(); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp2), sp2); + + SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData1 = new SharePartitionManager.ShareFetchPartitionData( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), + new CompletableFuture<>(), partitionMaxBytes1); + + DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( + "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + + Set delayedShareFetchWatchKeys = new HashSet<>(); + partitionMaxBytes1.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId, topicIdPartition))); + + DelayedShareFetch delayedShareFetch1 = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() + .withShareFetchPartitionData(shareFetchPartitionData1) + .withReplicaManager(replicaManager) + .withPartitionCacheMap(partitionCacheMap) + .build(); + + // We add a delayed share fetch entry to the purgatory which will be waiting for completion since none of the + // partitions in the share fetch request can be acquired. + delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch1, CollectionConverters.asScala(delayedShareFetchWatchKeys).toSeq()); + + assertEquals(2, delayedShareFetchPurgatory.watched()); + assertFalse(shareFetchPartitionData1.future().isDone()); + + Map partitionMaxBytes2 = new HashMap<>(); + partitionMaxBytes2.put(tp1, PARTITION_MAX_BYTES); + partitionMaxBytes2.put(tp2, PARTITION_MAX_BYTES); + SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData2 = new SharePartitionManager.ShareFetchPartitionData( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), + new CompletableFuture<>(), partitionMaxBytes2); + + doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + DelayedActionQueue delayedActionQueue = spy(new DelayedActionQueue()); + DelayedShareFetch delayedShareFetch2 = DelayedShareFetchBuilder.builder() + .withShareFetchPartitionData(shareFetchPartitionData2) + .withReplicaManager(replicaManager) + .withPartitionCacheMap(partitionCacheMap) + .withDelayedActionQueue(delayedActionQueue) + .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) + .build(); + + // sp1 can be acquired now + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp1.acquire(any(), any())).thenReturn(CompletableFuture.completedFuture( + Collections.singletonList(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)))); + + // run() method calls forceComplete followed by onExpiration. Since tp1 is common in between delayed share fetch + // requests, it should add a "check and complete" action for request key tp1 on the purgatory. When onExpiration + // gets called, then the delayedShareFetch1 should also be completed. + delayedShareFetch2.run(); + assertTrue(delayedShareFetch2.isCompleted()); + assertTrue(shareFetchPartitionData2.future().isDone()); + Mockito.verify(replicaManager, times(2)).readFromLog( + any(), any(), any(ReplicaQuota.class), anyBoolean()); + assertTrue(delayedShareFetch1.isCompleted()); + Mockito.verify(delayedActionQueue, times(1)).tryCompleteActions(); + assertTrue(shareFetchPartitionData1.future().isDone()); + // Only 1 watch key should be present in the purgatory corresponding to tp0. The entry corresponding to tp2 would have been completed. + assertEquals(1, delayedShareFetchPurgatory.watched()); + } + static class DelayedShareFetchBuilder { SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = mock(SharePartitionManager.ShareFetchPartitionData.class); private ReplicaManager replicaManager = mock(ReplicaManager.class); private Map partitionCacheMap = new HashMap<>(); + private DelayedActionQueue delayedActionsQueue = mock(DelayedActionQueue.class); + private DelayedOperationPurgatory delayedShareFetchPurgatory = mock(DelayedOperationPurgatory.class); DelayedShareFetchBuilder withShareFetchPartitionData(SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData) { this.shareFetchPartitionData = shareFetchPartitionData; @@ -279,15 +407,27 @@ DelayedShareFetchBuilder withPartitionCacheMap(Map delayedShareFetchPurgatory) { + this.delayedShareFetchPurgatory = delayedShareFetchPurgatory; + return this; + } + public static DelayedShareFetchBuilder builder() { return new DelayedShareFetchBuilder(); } public DelayedShareFetch build() { return new DelayedShareFetch( - shareFetchPartitionData, - replicaManager, - partitionCacheMap); + shareFetchPartitionData, + replicaManager, + partitionCacheMap, + delayedActionsQueue, + delayedShareFetchPurgatory); } } } diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index 9494f16c2d70..598d0436e2c9 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -16,6 +16,7 @@ */ package kafka.server.share; +import kafka.server.DelayedActionQueue; import kafka.server.DelayedOperationPurgatory; import kafka.server.LogReadResult; import kafka.server.ReplicaManager; @@ -128,10 +129,10 @@ public class SharePartitionManagerTest { private static final int RECORD_LOCK_DURATION_MS = 30000; private static final int MAX_DELIVERY_COUNT = 5; private static final short MAX_IN_FLIGHT_MESSAGES = 200; - static final int PARTITION_MAX_BYTES = 40000; private static final int DELAYED_SHARE_FETCH_MAX_WAIT_MS = 2000; - private static final int DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL = 1000; private static final int DELAYED_SHARE_FETCH_TIMEOUT_MS = 3000; + static final int PARTITION_MAX_BYTES = 40000; + static final int DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL = 1000; private static Timer mockTimer; @@ -2268,7 +2269,7 @@ private void assertErroneousAndValidTopicIdPartitions( assertEquals(expectedValidSet, actualValidPartitions); } - private Seq> buildLogReadResult(Set topicIdPartitions) { + static Seq> buildLogReadResult(Set topicIdPartitions) { List> logReadResults = new ArrayList<>(); topicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult( new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), @@ -2295,6 +2296,7 @@ private static class SharePartitionManagerBuilder { private Metrics metrics = new Metrics(); private ConcurrentLinkedQueue fetchQueue = new ConcurrentLinkedQueue<>(); private DelayedOperationPurgatory delayedShareFetchPurgatory = mock(DelayedOperationPurgatory.class); + private final DelayedActionQueue delayedActionsQueue = mock(DelayedActionQueue.class); private SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) { this.replicaManager = replicaManager; @@ -2357,7 +2359,8 @@ public SharePartitionManager build() { MAX_IN_FLIGHT_MESSAGES, persister, metrics, - delayedShareFetchPurgatory); + delayedShareFetchPurgatory, + delayedActionsQueue); } } } From 5a57423e3c46ce02f589153269ce252e1ae70339 Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Mon, 30 Sep 2024 14:18:46 +0530 Subject: [PATCH 3/8] Addressed Jun's round 1 comments --- .../kafka/server/share/DelayedShareFetch.java | 16 +-------- .../server/share/DelayedShareFetchKey.java | 2 +- .../server/share/SharePartitionManager.java | 14 ++------ .../scala/kafka/server/ReplicaManager.scala | 17 +++++++++ .../server/share/DelayedShareFetchTest.java | 35 +++++++------------ .../share/SharePartitionManagerTest.java | 5 +-- 6 files changed, 35 insertions(+), 54 deletions(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index f93886bafba0..978ca2872ae5 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -16,7 +16,6 @@ */ package kafka.server.share; -import kafka.server.DelayedActionQueue; import kafka.server.DelayedOperation; import kafka.server.DelayedOperationPurgatory; import kafka.server.LogReadResult; @@ -56,7 +55,6 @@ public class DelayedShareFetch extends DelayedOperation { private final ShareFetchData shareFetchData; private final ReplicaManager replicaManager; private final Map partitionCacheMap; - private final DelayedActionQueue delayedActionQueue; private final DelayedOperationPurgatory delayedShareFetchPurgatory; private Map topicPartitionDataFromTryComplete; @@ -65,24 +63,17 @@ public class DelayedShareFetch extends DelayedOperation { ShareFetchData shareFetchData, ReplicaManager replicaManager, Map partitionCacheMap, - DelayedActionQueue delayedActionQueue, DelayedOperationPurgatory delayedShareFetchPurgatory) { super(shareFetchData.fetchParams().maxWaitMs, Option.empty()); this.shareFetchData = shareFetchData; this.replicaManager = replicaManager; this.partitionCacheMap = partitionCacheMap; - this.delayedActionQueue = delayedActionQueue; this.delayedShareFetchPurgatory = delayedShareFetchPurgatory; this.topicPartitionDataFromTryComplete = new LinkedHashMap<>(); } - /** - * Complete the delayed share fetch actions that were added to the queue. Since onExpiration serves as a callback for - * forceComplete, it should not lead to infinite call stack. - */ @Override public void onExpiration() { - delayedActionQueue.tryCompleteActions(); } /** @@ -148,12 +139,7 @@ public void onComplete() { // then we should check if there is a pending share fetch request for the topic-partition and complete it. // We add the action to delayed actions queue to avoid an infinite call stack, which could happen if // we directly call delayedShareFetchPurgatory.checkAndComplete - delayedActionQueue.add(() -> { - result.keySet().forEach(topicIdPartition -> - delayedShareFetchPurgatory.checkAndComplete( - new DelayedShareFetchKey(shareFetchData.groupId(), topicIdPartition))); - return BoxedUnit.UNIT; - }); + replicaManager.addCompleteDelayedShareFetchPurgatoryAction(CollectionConverters.asScala(result.keySet()).toSeq(), shareFetchData.groupId(), delayedShareFetchPurgatory); }); } catch (Exception e) { diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java b/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java index a4c13f8f1a8f..15c8f727ace2 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java @@ -28,7 +28,7 @@ */ public class DelayedShareFetchKey extends SharePartitionKey implements DelayedOperationKey { - DelayedShareFetchKey(String groupId, TopicIdPartition topicIdPartition) { + public DelayedShareFetchKey(String groupId, TopicIdPartition topicIdPartition) { super(groupId, topicIdPartition); } diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index 98cb25e24a6d..eada97191f5f 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -16,7 +16,6 @@ */ package kafka.server.share; -import kafka.server.DelayedActionQueue; import kafka.server.DelayedOperationPurgatory; import kafka.server.ReplicaManager; @@ -148,11 +147,6 @@ public class SharePartitionManager implements AutoCloseable { */ private final DelayedOperationPurgatory delayedShareFetchPurgatory; - /** - * The delayed actions queue is used to complete any pending delayed share fetch actions. - */ - private final DelayedActionQueue delayedActionsQueue; - public SharePartitionManager( ReplicaManager replicaManager, Time time, @@ -203,7 +197,6 @@ private SharePartitionManager( this.persister = persister; this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time); this.delayedShareFetchPurgatory = new DelayedOperationPurgatory<>("ShareFetch", this.timer, this.replicaManager.localBrokerId(), shareFetchPurgatoryPurgeIntervalRequests, true, true); - this.delayedActionsQueue = new DelayedActionQueue(); } // Visible for testing. @@ -219,8 +212,7 @@ private SharePartitionManager( int maxInFlightMessages, Persister persister, Metrics metrics, - DelayedOperationPurgatory delayedShareFetchPurgatory, - DelayedActionQueue delayedActionsQueue + DelayedOperationPurgatory delayedShareFetchPurgatory ) { this.replicaManager = replicaManager; this.time = time; @@ -235,7 +227,6 @@ private SharePartitionManager( this.persister = persister; this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time); this.delayedShareFetchPurgatory = delayedShareFetchPurgatory; - this.delayedActionsQueue = delayedActionsQueue; } /** @@ -536,7 +527,6 @@ private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, Set topicIdPartitions.foreach(topicIdPartition => { + delayedShareFetchPurgatory.checkAndComplete(new DelayedShareFetchKey(groupId, topicIdPartition)) + }) + } + } + /** * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas; * the callback function will be triggered either when timeout or the required acks are satisfied; diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index dfac1de57925..3cccead0dbbe 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -16,7 +16,6 @@ */ package kafka.server.share; -import kafka.server.DelayedActionQueue; import kafka.server.DelayedOperationPurgatory; import kafka.server.ReplicaManager; import kafka.server.ReplicaQuota; @@ -29,11 +28,11 @@ import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.server.share.fetch.ShareFetchData; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.server.util.timer.SystemTimer; import org.apache.kafka.server.util.timer.SystemTimerReaper; import org.apache.kafka.server.util.timer.Timer; -import org.apache.kafka.server.storage.log.FetchIsolation; -import org.apache.kafka.server.storage.log.FetchParams; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -48,6 +47,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import scala.jdk.javaapi.CollectionConverters; @@ -356,13 +356,15 @@ public void testForceCompleteTriggersDelayedActionsQueue() { new CompletableFuture<>(), partitionMaxBytes2); doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + // Counter for checking the number of times replicaManager.addCompleteDelayedShareFetchPurgatoryAction is called in the test. + AtomicInteger counter = new AtomicInteger(); + doAnswer(invocationOnMock -> counter.addAndGet(1)).when(replicaManager).addCompleteDelayedShareFetchPurgatoryAction( + CollectionConverters.asScala(Collections.singleton(tp1)).toSeq(), groupId, delayedShareFetchPurgatory); - DelayedActionQueue delayedActionQueue = spy(new DelayedActionQueue()); DelayedShareFetch delayedShareFetch2 = DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetchData2) .withReplicaManager(replicaManager) .withPartitionCacheMap(partitionCacheMap) - .withDelayedActionQueue(delayedActionQueue) .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) .build(); @@ -372,26 +374,21 @@ public void testForceCompleteTriggersDelayedActionsQueue() { when(sp1.acquire(any(), any())).thenReturn(CompletableFuture.completedFuture( Collections.singletonList(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)))); - // run() method calls forceComplete followed by onExpiration. Since tp1 is common in between delayed share fetch - // requests, it should add a "check and complete" action for request key tp1 on the purgatory. When onExpiration - // gets called, then the delayedShareFetch1 should also be completed. - delayedShareFetch2.run(); + // when forceComplete is called for delayedShareFetch2, since tp1 is common in between delayed share fetch + // requests, it should add a "check and complete" action for request key tp1 on the purgatory. + delayedShareFetch2.forceComplete(); assertTrue(delayedShareFetch2.isCompleted()); assertTrue(shareFetchData2.future().isDone()); - Mockito.verify(replicaManager, times(2)).readFromLog( + Mockito.verify(replicaManager, times(1)).readFromLog( any(), any(), any(ReplicaQuota.class), anyBoolean()); - assertTrue(delayedShareFetch1.isCompleted()); - Mockito.verify(delayedActionQueue, times(1)).tryCompleteActions(); - assertTrue(shareFetchData1.future().isDone()); - // Only 1 watch key should be present in the purgatory corresponding to tp0. The entry corresponding to tp2 would have been completed. - assertEquals(1, delayedShareFetchPurgatory.watched()); + assertFalse(delayedShareFetch1.isCompleted()); + assertEquals(1, counter.get()); } static class DelayedShareFetchBuilder { ShareFetchData shareFetchData = mock(ShareFetchData.class); private ReplicaManager replicaManager = mock(ReplicaManager.class); private Map partitionCacheMap = new HashMap<>(); - private DelayedActionQueue delayedActionsQueue = mock(DelayedActionQueue.class); private DelayedOperationPurgatory delayedShareFetchPurgatory = mock(DelayedOperationPurgatory.class); DelayedShareFetchBuilder withShareFetchData(ShareFetchData shareFetchData) { @@ -409,11 +406,6 @@ DelayedShareFetchBuilder withPartitionCacheMap(Map delayedShareFetchPurgatory) { this.delayedShareFetchPurgatory = delayedShareFetchPurgatory; return this; @@ -428,7 +420,6 @@ public DelayedShareFetch build() { shareFetchData, replicaManager, partitionCacheMap, - delayedActionsQueue, delayedShareFetchPurgatory); } } diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index a9035534edb0..8f66fca44ca0 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -16,7 +16,6 @@ */ package kafka.server.share; -import kafka.server.DelayedActionQueue; import kafka.server.DelayedOperationPurgatory; import kafka.server.LogReadResult; import kafka.server.ReplicaManager; @@ -2298,7 +2297,6 @@ private static class SharePartitionManagerBuilder { private Metrics metrics = new Metrics(); private ConcurrentLinkedQueue fetchQueue = new ConcurrentLinkedQueue<>(); private DelayedOperationPurgatory delayedShareFetchPurgatory = mock(DelayedOperationPurgatory.class); - private final DelayedActionQueue delayedActionsQueue = mock(DelayedActionQueue.class); private SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) { this.replicaManager = replicaManager; @@ -2361,8 +2359,7 @@ public SharePartitionManager build() { MAX_IN_FLIGHT_MESSAGES, persister, metrics, - delayedShareFetchPurgatory, - delayedActionsQueue); + delayedShareFetchPurgatory); } } } From a036d5d7d48d36b662f95ff3535af4b0b3ae9bb7 Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Tue, 1 Oct 2024 12:07:34 +0530 Subject: [PATCH 4/8] Addressed Jun's round 2 review comments --- .../builders/ReplicaManagerBuilder.java | 4 ++- .../kafka/server/share/DelayedShareFetch.java | 11 +++++++- .../server/share/DelayedShareFetchKey.java | 2 +- .../server/share/SharePartitionManager.java | 16 ++++++++++-- .../scala/kafka/server/BrokerServer.scala | 9 ++++++- .../scala/kafka/server/ReplicaManager.scala | 25 ++----------------- .../server/share/DelayedShareFetchTest.java | 18 ++++++++----- .../share/SharePartitionManagerTest.java | 5 +++- 8 files changed, 54 insertions(+), 36 deletions(-) diff --git a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java index 4ef1b871ec19..a694aa2a75ff 100644 --- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java +++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java @@ -21,6 +21,7 @@ import kafka.log.remote.RemoteLogManager; import kafka.server.AddPartitionsToTxnManager; import kafka.server.AlterPartitionManager; +import kafka.server.DelayedActionQueue; import kafka.server.DelayedDeleteRecords; import kafka.server.DelayedElectLeader; import kafka.server.DelayedFetch; @@ -216,6 +217,7 @@ public ReplicaManager build() { OptionConverters.toScala(threadNamePrefix), () -> brokerEpoch, OptionConverters.toScala(addPartitionsToTxnManager), - directoryEventHandler); + directoryEventHandler, + new DelayedActionQueue()); } } diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index 978ca2872ae5..1b3f3a50133f 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -16,6 +16,7 @@ */ package kafka.server.share; +import kafka.server.ActionQueue; import kafka.server.DelayedOperation; import kafka.server.DelayedOperationPurgatory; import kafka.server.LogReadResult; @@ -55,6 +56,7 @@ public class DelayedShareFetch extends DelayedOperation { private final ShareFetchData shareFetchData; private final ReplicaManager replicaManager; private final Map partitionCacheMap; + private final ActionQueue delayedActionQueue; private final DelayedOperationPurgatory delayedShareFetchPurgatory; private Map topicPartitionDataFromTryComplete; @@ -63,11 +65,13 @@ public class DelayedShareFetch extends DelayedOperation { ShareFetchData shareFetchData, ReplicaManager replicaManager, Map partitionCacheMap, + ActionQueue delayedActionQueue, DelayedOperationPurgatory delayedShareFetchPurgatory) { super(shareFetchData.fetchParams().maxWaitMs, Option.empty()); this.shareFetchData = shareFetchData; this.replicaManager = replicaManager; this.partitionCacheMap = partitionCacheMap; + this.delayedActionQueue = delayedActionQueue; this.delayedShareFetchPurgatory = delayedShareFetchPurgatory; this.topicPartitionDataFromTryComplete = new LinkedHashMap<>(); } @@ -139,7 +143,12 @@ public void onComplete() { // then we should check if there is a pending share fetch request for the topic-partition and complete it. // We add the action to delayed actions queue to avoid an infinite call stack, which could happen if // we directly call delayedShareFetchPurgatory.checkAndComplete - replicaManager.addCompleteDelayedShareFetchPurgatoryAction(CollectionConverters.asScala(result.keySet()).toSeq(), shareFetchData.groupId(), delayedShareFetchPurgatory); + delayedActionQueue.add(() -> { + result.keySet().forEach(topicIdPartition -> + delayedShareFetchPurgatory.checkAndComplete( + new DelayedShareFetchKey(shareFetchData.groupId(), topicIdPartition))); + return BoxedUnit.UNIT; + }); }); } catch (Exception e) { diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java b/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java index 15c8f727ace2..a4c13f8f1a8f 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java @@ -28,7 +28,7 @@ */ public class DelayedShareFetchKey extends SharePartitionKey implements DelayedOperationKey { - public DelayedShareFetchKey(String groupId, TopicIdPartition topicIdPartition) { + DelayedShareFetchKey(String groupId, TopicIdPartition topicIdPartition) { super(groupId, topicIdPartition); } diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index eada97191f5f..1a4656b23455 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -16,6 +16,7 @@ */ package kafka.server.share; +import kafka.server.ActionQueue; import kafka.server.DelayedOperationPurgatory; import kafka.server.ReplicaManager; @@ -147,6 +148,11 @@ public class SharePartitionManager implements AutoCloseable { */ private final DelayedOperationPurgatory delayedShareFetchPurgatory; + /** + * The delayed actions queue is used to complete any pending delayed share fetch actions. + */ + private final ActionQueue delayedActionsQueue; + public SharePartitionManager( ReplicaManager replicaManager, Time time, @@ -156,6 +162,7 @@ public SharePartitionManager( int maxInFlightMessages, int shareFetchPurgatoryPurgeIntervalRequests, Persister persister, + ActionQueue delayedActionsQueue, Metrics metrics ) { this(replicaManager, @@ -167,6 +174,7 @@ public SharePartitionManager( maxInFlightMessages, shareFetchPurgatoryPurgeIntervalRequests, persister, + delayedActionsQueue, metrics ); } @@ -181,6 +189,7 @@ private SharePartitionManager( int maxInFlightMessages, int shareFetchPurgatoryPurgeIntervalRequests, Persister persister, + ActionQueue delayedActionsQueue, Metrics metrics ) { this.replicaManager = replicaManager; @@ -197,6 +206,7 @@ private SharePartitionManager( this.persister = persister; this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time); this.delayedShareFetchPurgatory = new DelayedOperationPurgatory<>("ShareFetch", this.timer, this.replicaManager.localBrokerId(), shareFetchPurgatoryPurgeIntervalRequests, true, true); + this.delayedActionsQueue = delayedActionsQueue; } // Visible for testing. @@ -212,7 +222,8 @@ private SharePartitionManager( int maxInFlightMessages, Persister persister, Metrics metrics, - DelayedOperationPurgatory delayedShareFetchPurgatory + DelayedOperationPurgatory delayedShareFetchPurgatory, + ActionQueue delayedActionsQueue ) { this.replicaManager = replicaManager; this.time = time; @@ -227,6 +238,7 @@ private SharePartitionManager( this.persister = persister; this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time); this.delayedShareFetchPurgatory = delayedShareFetchPurgatory; + this.delayedActionsQueue = delayedActionsQueue; } /** @@ -600,7 +612,7 @@ void maybeProcessFetchQueue() { new DelayedShareFetchKey(shareFetchData.groupId(), topicIdPartition))); // Add the share fetch to the delayed share fetch purgatory to process the fetch request. - addDelayedShareFetch(new DelayedShareFetch(shareFetchData, replicaManager, partitionCacheMap, delayedShareFetchPurgatory), + addDelayedShareFetch(new DelayedShareFetch(shareFetchData, replicaManager, partitionCacheMap, delayedActionsQueue, delayedShareFetchPurgatory), delayedShareFetchWatchKeys); // Release the lock so that other threads can process the queue. diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index f59b6bcae395..91b44ac4dd16 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -320,6 +320,11 @@ class BrokerServer( lifecycleManager.propagateDirectoryFailure(directoryId, config.logDirFailureTimeoutMs) } + /** + * TODO: move this action queue to handle thread so we can simplify concurrency handling + */ + val defaultActionQueue = new DelayedActionQueue + this._replicaManager = new ReplicaManager( config = config, metrics = metrics, @@ -338,7 +343,8 @@ class BrokerServer( delayedRemoteFetchPurgatoryParam = None, brokerEpochSupplier = () => lifecycleManager.brokerEpoch, addPartitionsToTxnManager = Some(addPartitionsToTxnManager), - directoryEventHandler = directoryEventHandler + directoryEventHandler = directoryEventHandler, + defaultActionQueue = defaultActionQueue ) /* start token manager */ @@ -423,6 +429,7 @@ class BrokerServer( config.shareGroupConfig.shareGroupPartitionMaxRecordLocks, config.shareGroupConfig.shareFetchPurgatoryPurgeIntervalRequests, persister, + defaultActionQueue, new Metrics() ) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index ed98b139e859..58873cad3126 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -25,7 +25,6 @@ import kafka.server.HostedPartition.Online import kafka.server.QuotaFactory.QuotaManagers import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported} import kafka.server.metadata.ZkMetadataCache -import kafka.server.share.{DelayedShareFetch, DelayedShareFetchKey} import kafka.utils.Implicits._ import kafka.utils._ import kafka.zk.KafkaZkClient @@ -291,7 +290,8 @@ class ReplicaManager(val config: KafkaConfig, threadNamePrefix: Option[String] = None, val brokerEpochSupplier: () => Long = () => -1, addPartitionsToTxnManager: Option[AddPartitionsToTxnManager] = None, - val directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP + val directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP, + val defaultActionQueue: ActionQueue = new DelayedActionQueue ) extends Logging { private val metricsGroup = new KafkaMetricsGroup(this.getClass) @@ -742,29 +742,8 @@ class ReplicaManager(val config: KafkaConfig, localLog(topicPartition).map(_.parentDir) } - /** - * TODO: move this action queue to handle thread so we can simplify concurrency handling - */ - private val defaultActionQueue = new DelayedActionQueue - def tryCompleteActions(): Unit = defaultActionQueue.tryCompleteActions() - /** - * Append an action to the DelayedActionQueue to complete any pending share fetch requests for the already fetched topic partitions. - * @param topicIdPartitions Topic partitions for which we know that the data has been fetched already - * @param groupId Share group's id - * @param delayedShareFetchPurgatory Purgatory storing the pending delayed share fetch requests - */ - def addCompleteDelayedShareFetchPurgatoryAction(topicIdPartitions: Seq[TopicIdPartition], - groupId: String, - delayedShareFetchPurgatory: DelayedOperationPurgatory[DelayedShareFetch]): Unit = { - defaultActionQueue.add { - () => topicIdPartitions.foreach(topicIdPartition => { - delayedShareFetchPurgatory.checkAndComplete(new DelayedShareFetchKey(groupId, topicIdPartition)) - }) - } - } - /** * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas; * the callback function will be triggered either when timeout or the required acks are satisfied; diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 3cccead0dbbe..6317c64c51f3 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -16,6 +16,7 @@ */ package kafka.server.share; +import kafka.server.DelayedActionQueue; import kafka.server.DelayedOperationPurgatory; import kafka.server.ReplicaManager; import kafka.server.ReplicaQuota; @@ -47,7 +48,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; import scala.jdk.javaapi.CollectionConverters; @@ -356,15 +356,13 @@ public void testForceCompleteTriggersDelayedActionsQueue() { new CompletableFuture<>(), partitionMaxBytes2); doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); - // Counter for checking the number of times replicaManager.addCompleteDelayedShareFetchPurgatoryAction is called in the test. - AtomicInteger counter = new AtomicInteger(); - doAnswer(invocationOnMock -> counter.addAndGet(1)).when(replicaManager).addCompleteDelayedShareFetchPurgatoryAction( - CollectionConverters.asScala(Collections.singleton(tp1)).toSeq(), groupId, delayedShareFetchPurgatory); + DelayedActionQueue delayedActionQueue = spy(new DelayedActionQueue()); DelayedShareFetch delayedShareFetch2 = DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetchData2) .withReplicaManager(replicaManager) .withPartitionCacheMap(partitionCacheMap) + .withDelayedActionQueue(delayedActionQueue) .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) .build(); @@ -382,13 +380,15 @@ public void testForceCompleteTriggersDelayedActionsQueue() { Mockito.verify(replicaManager, times(1)).readFromLog( any(), any(), any(ReplicaQuota.class), anyBoolean()); assertFalse(delayedShareFetch1.isCompleted()); - assertEquals(1, counter.get()); + Mockito.verify(delayedActionQueue, times(1)).add(any()); + Mockito.verify(delayedActionQueue, times(0)).tryCompleteActions(); } static class DelayedShareFetchBuilder { ShareFetchData shareFetchData = mock(ShareFetchData.class); private ReplicaManager replicaManager = mock(ReplicaManager.class); private Map partitionCacheMap = new HashMap<>(); + private DelayedActionQueue delayedActionsQueue = mock(DelayedActionQueue.class); private DelayedOperationPurgatory delayedShareFetchPurgatory = mock(DelayedOperationPurgatory.class); DelayedShareFetchBuilder withShareFetchData(ShareFetchData shareFetchData) { @@ -406,6 +406,11 @@ DelayedShareFetchBuilder withPartitionCacheMap(Map delayedShareFetchPurgatory) { this.delayedShareFetchPurgatory = delayedShareFetchPurgatory; return this; @@ -420,6 +425,7 @@ public DelayedShareFetch build() { shareFetchData, replicaManager, partitionCacheMap, + delayedActionsQueue, delayedShareFetchPurgatory); } } diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index 8f66fca44ca0..a9035534edb0 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -16,6 +16,7 @@ */ package kafka.server.share; +import kafka.server.DelayedActionQueue; import kafka.server.DelayedOperationPurgatory; import kafka.server.LogReadResult; import kafka.server.ReplicaManager; @@ -2297,6 +2298,7 @@ private static class SharePartitionManagerBuilder { private Metrics metrics = new Metrics(); private ConcurrentLinkedQueue fetchQueue = new ConcurrentLinkedQueue<>(); private DelayedOperationPurgatory delayedShareFetchPurgatory = mock(DelayedOperationPurgatory.class); + private final DelayedActionQueue delayedActionsQueue = mock(DelayedActionQueue.class); private SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) { this.replicaManager = replicaManager; @@ -2359,7 +2361,8 @@ public SharePartitionManager build() { MAX_IN_FLIGHT_MESSAGES, persister, metrics, - delayedShareFetchPurgatory); + delayedShareFetchPurgatory, + delayedActionsQueue); } } } From 14c5bc0dc290eac3eadda76bfcb4f598b15e6019 Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Thu, 3 Oct 2024 10:54:27 +0530 Subject: [PATCH 5/8] Addressed Jun's round 3 review comments --- .../java/kafka/server/builders/KafkaApisBuilder.java | 11 ++++++++++- core/src/main/scala/kafka/server/BrokerServer.scala | 3 ++- core/src/main/scala/kafka/server/KafkaApis.scala | 3 ++- core/src/main/scala/kafka/server/KafkaServer.scala | 3 ++- .../kafka/server/share/DelayedShareFetchTest.java | 2 +- .../test/scala/unit/kafka/server/KafkaApisTest.scala | 3 ++- .../jmh/metadata/KRaftMetadataRequestBenchmark.java | 2 ++ .../kafka/jmh/metadata/MetadataRequestBenchmark.java | 2 ++ 8 files changed, 23 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java index c4bc16ed24ac..e592d0bb6620 100644 --- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java +++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java @@ -19,6 +19,7 @@ import kafka.coordinator.transaction.TransactionCoordinator; import kafka.network.RequestChannel; +import kafka.server.ActionQueue; import kafka.server.ApiVersionManager; import kafka.server.AutoTopicCreationManager; import kafka.server.DelegationTokenManager; @@ -69,6 +70,7 @@ public class KafkaApisBuilder { private ApiVersionManager apiVersionManager = null; private Optional clientMetricsManager = Optional.empty(); private Optional shareCoordinator = Optional.empty(); + private ActionQueue defaultActionQueue = null; public KafkaApisBuilder setRequestChannel(RequestChannel requestChannel) { this.requestChannel = requestChannel; @@ -180,6 +182,11 @@ public KafkaApisBuilder setClientMetricsManager(Optional c return this; } + public KafkaApisBuilder setDefaultActionQueue(ActionQueue defaultActionQueue) { + this.defaultActionQueue = defaultActionQueue; + return this; + } + public KafkaApis build() { if (requestChannel == null) throw new RuntimeException("you must set requestChannel"); if (metadataSupport == null) throw new RuntimeException("you must set metadataSupport"); @@ -196,6 +203,7 @@ public KafkaApis build() { if (fetchManager == null) throw new RuntimeException("You must set fetchManager"); if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig().isRemoteStorageSystemEnabled()); if (apiVersionManager == null) throw new RuntimeException("You must set apiVersionManager"); + if (defaultActionQueue == null) throw new RuntimeException("You must set defaultActionQueue"); return new KafkaApis(requestChannel, metadataSupport, @@ -218,6 +226,7 @@ public KafkaApis build() { time, tokenManager, apiVersionManager, - OptionConverters.toScala(clientMetricsManager)); + OptionConverters.toScala(clientMetricsManager), + defaultActionQueue); } } diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 91b44ac4dd16..89c95fdc1114 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -457,7 +457,8 @@ class BrokerServer( time = time, tokenManager = tokenManager, apiVersionManager = apiVersionManager, - clientMetricsManager = Some(clientMetricsManager)) + clientMetricsManager = Some(clientMetricsManager), + defaultActionQueue = defaultActionQueue) dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 1af0a568c8d7..99efac1c4b3f 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -120,7 +120,8 @@ class KafkaApis(val requestChannel: RequestChannel, time: Time, val tokenManager: DelegationTokenManager, val apiVersionManager: ApiVersionManager, - val clientMetricsManager: Option[ClientMetricsManager] + val clientMetricsManager: Option[ClientMetricsManager], + val defaultActionQueue: ActionQueue ) extends ApiRequestHandler with Logging { type FetchResponseStats = Map[TopicPartition, RecordValidationStats] diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 1ebf6c7e8993..98c30ef3dfb4 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -600,7 +600,8 @@ class KafkaServer( time = time, tokenManager = tokenManager, apiVersionManager = apiVersionManager, - clientMetricsManager = None) + clientMetricsManager = None, + defaultActionQueue = new DelayedActionQueue) dataPlaneRequestProcessor = createKafkaApis(socketServer.dataPlaneRequestChannel) diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 6317c64c51f3..e4d0c84dfcd2 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -340,7 +340,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() { .withPartitionCacheMap(partitionCacheMap) .build(); - // We add a delayed share fetch entry to the purgatory which will be waiting for completion since none of the + // We add a delayed share fetch entry to the purgatory which will be waiting for completion since neither of the // partitions in the share fetch request can be acquired. delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch1, CollectionConverters.asScala(delayedShareFetchWatchKeys).toSeq()); diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 45f0feb9528e..e3a31be7d23f 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -247,7 +247,8 @@ class KafkaApisTest extends Logging { time = time, tokenManager = null, apiVersionManager = apiVersionManager, - clientMetricsManager = clientMetricsManagerOpt) + clientMetricsManager = clientMetricsManagerOpt, + defaultActionQueue = new DelayedActionQueue) } private def setupFeatures(featureVersions: Seq[FeatureVersion]): Unit = { diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java index a56ff3400a7f..9ee1fe43c8dc 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java @@ -23,6 +23,7 @@ import kafka.server.ClientQuotaManager; import kafka.server.ClientRequestQuotaManager; import kafka.server.ControllerMutationQuotaManager; +import kafka.server.DelayedActionQueue; import kafka.server.FetchManager; import kafka.server.ForwardingManager; import kafka.server.KafkaApis; @@ -210,6 +211,7 @@ private KafkaApis createKafkaApis() { false, false, () -> FinalizedFeatures.fromKRaftVersion(MetadataVersion.latestTesting()))). + setDefaultActionQueue(new DelayedActionQueue()). build(); } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java index eeebbfaa1a54..18328cd6f379 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java @@ -25,6 +25,7 @@ import kafka.server.ClientQuotaManager; import kafka.server.ClientRequestQuotaManager; import kafka.server.ControllerMutationQuotaManager; +import kafka.server.DelayedActionQueue; import kafka.server.FetchManager; import kafka.server.KafkaApis; import kafka.server.KafkaConfig; @@ -210,6 +211,7 @@ private KafkaApis createKafkaApis() { false, false, () -> FinalizedFeatures.fromKRaftVersion(MetadataVersion.latestTesting()))). + setDefaultActionQueue(new DelayedActionQueue()). build(); } From 901e26636cb704abd6383fcc9e4de4e83b7a926f Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Thu, 3 Oct 2024 11:13:52 +0530 Subject: [PATCH 6/8] Added usage of defaultActionQueue to KafkaApis --- core/src/main/scala/kafka/server/KafkaApis.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 99efac1c4b3f..4fbba416f98d 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -288,7 +288,7 @@ class KafkaApis(val requestChannel: RequestChannel, // are kept in a queue. We add the logic to check the ReplicaManager queue at the end of KafkaApis.handle() and the // expiration thread for certain delayed operations (e.g. DelayedJoin) // Delayed fetches are also completed by ReplicaFetcherThread. - replicaManager.tryCompleteActions() + defaultActionQueue.tryCompleteActions() // The local completion time may be set while processing the request. Only record it if it's unset. if (request.apiLocalCompleteTimeNanos < 0) request.apiLocalCompleteTimeNanos = time.nanoseconds @@ -296,7 +296,7 @@ class KafkaApis(val requestChannel: RequestChannel, } override def tryCompleteActions(): Unit = { - replicaManager.tryCompleteActions() + defaultActionQueue.tryCompleteActions() } def handleLeaderAndIsrRequest(request: RequestChannel.Request): Unit = { From 395881096d844741e4901fb74fc78216db59f029 Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Thu, 3 Oct 2024 14:55:33 +0530 Subject: [PATCH 7/8] Fixed failing tests --- core/src/main/scala/kafka/server/KafkaApis.scala | 2 +- core/src/main/scala/kafka/server/KafkaServer.scala | 4 +++- core/src/test/scala/unit/kafka/server/KafkaApisTest.scala | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 4fbba416f98d..2f999ec725ff 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -288,7 +288,7 @@ class KafkaApis(val requestChannel: RequestChannel, // are kept in a queue. We add the logic to check the ReplicaManager queue at the end of KafkaApis.handle() and the // expiration thread for certain delayed operations (e.g. DelayedJoin) // Delayed fetches are also completed by ReplicaFetcherThread. - defaultActionQueue.tryCompleteActions() + replicaManager.tryCompleteActions() // The local completion time may be set while processing the request. Only record it if it's unset. if (request.apiLocalCompleteTimeNanos < 0) request.apiLocalCompleteTimeNanos = time.nanoseconds diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 98c30ef3dfb4..f9e4c54664f8 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -375,6 +375,8 @@ class KafkaServer( None ) + val defaultActionQueue = new DelayedActionQueue + // Create and start the socket server acceptor threads so that the bound port is known. // Delay starting processors until the end of the initialization sequence to ensure // that credentials have been loaded before processing authentications. @@ -601,7 +603,7 @@ class KafkaServer( tokenManager = tokenManager, apiVersionManager = apiVersionManager, clientMetricsManager = None, - defaultActionQueue = new DelayedActionQueue) + defaultActionQueue = defaultActionQueue) dataPlaneRequestProcessor = createKafkaApis(socketServer.dataPlaneRequestChannel) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index e3a31be7d23f..56eb0a231240 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -147,6 +147,7 @@ class KafkaApisTest extends Logging { clientControllerQuotaManager, replicaQuotaManager, replicaQuotaManager, replicaQuotaManager, None) private val fetchManager: FetchManager = mock(classOf[FetchManager]) private val sharePartitionManager: SharePartitionManager = mock(classOf[SharePartitionManager]) + private val defaultActionQueue: ActionQueue = mock(classOf[ActionQueue]) private val clientMetricsManager: ClientMetricsManager = mock(classOf[ClientMetricsManager]) private val brokerTopicStats = new BrokerTopicStats private val clusterId = "clusterId" @@ -248,7 +249,7 @@ class KafkaApisTest extends Logging { tokenManager = null, apiVersionManager = apiVersionManager, clientMetricsManager = clientMetricsManagerOpt, - defaultActionQueue = new DelayedActionQueue) + defaultActionQueue = defaultActionQueue) } private def setupFeatures(featureVersions: Seq[FeatureVersion]): Unit = { From 9b76f9b48c5928ad703f69307d102f8bb4a0f134 Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Fri, 4 Oct 2024 00:16:11 +0530 Subject: [PATCH 8/8] Removed ActionQueue from KafkaApis --- .../java/kafka/server/builders/KafkaApisBuilder.java | 11 +---------- core/src/main/scala/kafka/server/BrokerServer.scala | 3 +-- core/src/main/scala/kafka/server/KafkaApis.scala | 5 ++--- core/src/main/scala/kafka/server/KafkaServer.scala | 5 +---- .../test/scala/unit/kafka/server/KafkaApisTest.scala | 4 +--- .../jmh/metadata/KRaftMetadataRequestBenchmark.java | 2 -- .../kafka/jmh/metadata/MetadataRequestBenchmark.java | 2 -- 7 files changed, 6 insertions(+), 26 deletions(-) diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java index e592d0bb6620..c4bc16ed24ac 100644 --- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java +++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java @@ -19,7 +19,6 @@ import kafka.coordinator.transaction.TransactionCoordinator; import kafka.network.RequestChannel; -import kafka.server.ActionQueue; import kafka.server.ApiVersionManager; import kafka.server.AutoTopicCreationManager; import kafka.server.DelegationTokenManager; @@ -70,7 +69,6 @@ public class KafkaApisBuilder { private ApiVersionManager apiVersionManager = null; private Optional clientMetricsManager = Optional.empty(); private Optional shareCoordinator = Optional.empty(); - private ActionQueue defaultActionQueue = null; public KafkaApisBuilder setRequestChannel(RequestChannel requestChannel) { this.requestChannel = requestChannel; @@ -182,11 +180,6 @@ public KafkaApisBuilder setClientMetricsManager(Optional c return this; } - public KafkaApisBuilder setDefaultActionQueue(ActionQueue defaultActionQueue) { - this.defaultActionQueue = defaultActionQueue; - return this; - } - public KafkaApis build() { if (requestChannel == null) throw new RuntimeException("you must set requestChannel"); if (metadataSupport == null) throw new RuntimeException("you must set metadataSupport"); @@ -203,7 +196,6 @@ public KafkaApis build() { if (fetchManager == null) throw new RuntimeException("You must set fetchManager"); if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig().isRemoteStorageSystemEnabled()); if (apiVersionManager == null) throw new RuntimeException("You must set apiVersionManager"); - if (defaultActionQueue == null) throw new RuntimeException("You must set defaultActionQueue"); return new KafkaApis(requestChannel, metadataSupport, @@ -226,7 +218,6 @@ public KafkaApis build() { time, tokenManager, apiVersionManager, - OptionConverters.toScala(clientMetricsManager), - defaultActionQueue); + OptionConverters.toScala(clientMetricsManager)); } } diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 89c95fdc1114..91b44ac4dd16 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -457,8 +457,7 @@ class BrokerServer( time = time, tokenManager = tokenManager, apiVersionManager = apiVersionManager, - clientMetricsManager = Some(clientMetricsManager), - defaultActionQueue = defaultActionQueue) + clientMetricsManager = Some(clientMetricsManager)) dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 2f999ec725ff..1af0a568c8d7 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -120,8 +120,7 @@ class KafkaApis(val requestChannel: RequestChannel, time: Time, val tokenManager: DelegationTokenManager, val apiVersionManager: ApiVersionManager, - val clientMetricsManager: Option[ClientMetricsManager], - val defaultActionQueue: ActionQueue + val clientMetricsManager: Option[ClientMetricsManager] ) extends ApiRequestHandler with Logging { type FetchResponseStats = Map[TopicPartition, RecordValidationStats] @@ -296,7 +295,7 @@ class KafkaApis(val requestChannel: RequestChannel, } override def tryCompleteActions(): Unit = { - defaultActionQueue.tryCompleteActions() + replicaManager.tryCompleteActions() } def handleLeaderAndIsrRequest(request: RequestChannel.Request): Unit = { diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index f9e4c54664f8..1ebf6c7e8993 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -375,8 +375,6 @@ class KafkaServer( None ) - val defaultActionQueue = new DelayedActionQueue - // Create and start the socket server acceptor threads so that the bound port is known. // Delay starting processors until the end of the initialization sequence to ensure // that credentials have been loaded before processing authentications. @@ -602,8 +600,7 @@ class KafkaServer( time = time, tokenManager = tokenManager, apiVersionManager = apiVersionManager, - clientMetricsManager = None, - defaultActionQueue = defaultActionQueue) + clientMetricsManager = None) dataPlaneRequestProcessor = createKafkaApis(socketServer.dataPlaneRequestChannel) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 56eb0a231240..45f0feb9528e 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -147,7 +147,6 @@ class KafkaApisTest extends Logging { clientControllerQuotaManager, replicaQuotaManager, replicaQuotaManager, replicaQuotaManager, None) private val fetchManager: FetchManager = mock(classOf[FetchManager]) private val sharePartitionManager: SharePartitionManager = mock(classOf[SharePartitionManager]) - private val defaultActionQueue: ActionQueue = mock(classOf[ActionQueue]) private val clientMetricsManager: ClientMetricsManager = mock(classOf[ClientMetricsManager]) private val brokerTopicStats = new BrokerTopicStats private val clusterId = "clusterId" @@ -248,8 +247,7 @@ class KafkaApisTest extends Logging { time = time, tokenManager = null, apiVersionManager = apiVersionManager, - clientMetricsManager = clientMetricsManagerOpt, - defaultActionQueue = defaultActionQueue) + clientMetricsManager = clientMetricsManagerOpt) } private def setupFeatures(featureVersions: Seq[FeatureVersion]): Unit = { diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java index 9ee1fe43c8dc..a56ff3400a7f 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java @@ -23,7 +23,6 @@ import kafka.server.ClientQuotaManager; import kafka.server.ClientRequestQuotaManager; import kafka.server.ControllerMutationQuotaManager; -import kafka.server.DelayedActionQueue; import kafka.server.FetchManager; import kafka.server.ForwardingManager; import kafka.server.KafkaApis; @@ -211,7 +210,6 @@ private KafkaApis createKafkaApis() { false, false, () -> FinalizedFeatures.fromKRaftVersion(MetadataVersion.latestTesting()))). - setDefaultActionQueue(new DelayedActionQueue()). build(); } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java index 18328cd6f379..eeebbfaa1a54 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java @@ -25,7 +25,6 @@ import kafka.server.ClientQuotaManager; import kafka.server.ClientRequestQuotaManager; import kafka.server.ControllerMutationQuotaManager; -import kafka.server.DelayedActionQueue; import kafka.server.FetchManager; import kafka.server.KafkaApis; import kafka.server.KafkaConfig; @@ -211,7 +210,6 @@ private KafkaApis createKafkaApis() { false, false, () -> FinalizedFeatures.fromKRaftVersion(MetadataVersion.latestTesting()))). - setDefaultActionQueue(new DelayedActionQueue()). build(); }