Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17509: Introduce a delayed action queue to complete purgatory actions outside purgatory. #17177

Merged
merged 9 commits into from
Oct 4, 2024
11 changes: 10 additions & 1 deletion core/src/main/java/kafka/server/share/DelayedShareFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package kafka.server.share;

import kafka.server.DelayedOperation;
import kafka.server.DelayedOperationPurgatory;
import kafka.server.LogReadResult;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
Expand Down Expand Up @@ -54,17 +55,20 @@ public class DelayedShareFetch extends DelayedOperation {
private final ShareFetchData shareFetchData;
private final ReplicaManager replicaManager;
private final Map<SharePartitionKey, SharePartition> partitionCacheMap;
private final DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory;

private Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionDataFromTryComplete;

DelayedShareFetch(
ShareFetchData shareFetchData,
ReplicaManager replicaManager,
Map<SharePartitionKey, SharePartition> partitionCacheMap) {
Map<SharePartitionKey, SharePartition> partitionCacheMap,
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory) {
super(shareFetchData.fetchParams().maxWaitMs, Option.empty());
this.shareFetchData = shareFetchData;
this.replicaManager = replicaManager;
this.partitionCacheMap = partitionCacheMap;
this.delayedShareFetchPurgatory = delayedShareFetchPurgatory;
adixitconfluent marked this conversation as resolved.
Show resolved Hide resolved
this.topicPartitionDataFromTryComplete = new LinkedHashMap<>();
}

Expand Down Expand Up @@ -131,6 +135,11 @@ public void onComplete() {
}
// Releasing the lock to move ahead with the next request in queue.
releasePartitionLocks(shareFetchData.groupId(), topicPartitionData.keySet());
// If we have a fetch request completed for a topic-partition, we release the locks for that partition,
// then we should check if there is a pending share fetch request for the topic-partition and complete it.
// We add the action to delayed actions queue to avoid an infinite call stack, which could happen if
// we directly call delayedShareFetchPurgatory.checkAndComplete
replicaManager.addCompleteDelayedShareFetchPurgatoryAction(CollectionConverters.asScala(result.keySet()).toSeq(), shareFetchData.groupId(), delayedShareFetchPurgatory);
adixitconfluent marked this conversation as resolved.
Show resolved Hide resolved
});

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
*/
public class DelayedShareFetchKey extends SharePartitionKey implements DelayedOperationKey {

DelayedShareFetchKey(String groupId, TopicIdPartition topicIdPartition) {
public DelayedShareFetchKey(String groupId, TopicIdPartition topicIdPartition) {
super(groupId, topicIdPartition);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ void maybeProcessFetchQueue() {
new DelayedShareFetchKey(shareFetchData.groupId(), topicIdPartition)));

// Add the share fetch to the delayed share fetch purgatory to process the fetch request.
addDelayedShareFetch(new DelayedShareFetch(shareFetchData, replicaManager, partitionCacheMap),
addDelayedShareFetch(new DelayedShareFetch(shareFetchData, replicaManager, partitionCacheMap, delayedShareFetchPurgatory),
delayedShareFetchWatchKeys);

// Release the lock so that other threads can process the queue.
Expand Down
17 changes: 17 additions & 0 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import kafka.server.HostedPartition.Online
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported}
import kafka.server.metadata.ZkMetadataCache
import kafka.server.share.{DelayedShareFetch, DelayedShareFetchKey}
import kafka.utils.Implicits._
import kafka.utils._
import kafka.zk.KafkaZkClient
Expand Down Expand Up @@ -748,6 +749,22 @@ class ReplicaManager(val config: KafkaConfig,

def tryCompleteActions(): Unit = defaultActionQueue.tryCompleteActions()

/**
* Append an action to the DelayedActionQueue to complete any pending share fetch requests for the already fetched topic partitions.
* @param topicIdPartitions Topic partitions for which we know that the data has been fetched already
* @param groupId Share group's id
* @param delayedShareFetchPurgatory Purgatory storing the pending delayed share fetch requests
*/
def addCompleteDelayedShareFetchPurgatoryAction(topicIdPartitions: Seq[TopicIdPartition],
groupId: String,
delayedShareFetchPurgatory: DelayedOperationPurgatory[DelayedShareFetch]): Unit = {
adixitconfluent marked this conversation as resolved.
Show resolved Hide resolved
defaultActionQueue.add {
() => topicIdPartitions.foreach(topicIdPartition => {
delayedShareFetchPurgatory.checkAndComplete(new DelayedShareFetchKey(groupId, topicIdPartition))
})
}
}

/**
* Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
* the callback function will be triggered either when timeout or the required acks are satisfied;
Expand Down
137 changes: 134 additions & 3 deletions core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package kafka.server.share;

import kafka.server.DelayedOperationPurgatory;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;

Expand All @@ -29,22 +30,36 @@
import org.apache.kafka.server.share.fetch.ShareFetchData;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import scala.jdk.javaapi.CollectionConverters;

import static kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL;
import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES;
import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
Expand All @@ -53,6 +68,18 @@

public class DelayedShareFetchTest {
private static final int MAX_WAIT_MS = 5000;
private static Timer mockTimer;

@BeforeEach
public void setUp() {
mockTimer = new SystemTimerReaper("DelayedShareFetchTestReaper",
new SystemTimer("DelayedShareFetchTestTimer"));
}

@AfterEach
public void tearDown() throws Exception {
mockTimer.close();
}

@Test
public void testDelayedShareFetchTryCompleteReturnsFalse() {
Expand Down Expand Up @@ -95,6 +122,7 @@ public void testDelayedShareFetchTryCompleteReturnsFalse() {
public void testDelayedShareFetchTryCompleteReturnsTrue() {
String groupId = "grp";
Uuid topicId = Uuid.randomUuid();
ReplicaManager replicaManager = mock(ReplicaManager.class);
TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
Expand All @@ -118,9 +146,14 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() {

when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
when(sp0.acquire(any(), any())).thenReturn(CompletableFuture.completedFuture(
Collections.singletonList(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))));
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());

DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetchData)
.withPartitionCacheMap(partitionCacheMap)
.withReplicaManager(replicaManager)
.build();
assertFalse(delayedShareFetch.isCompleted());

Expand Down Expand Up @@ -200,6 +233,9 @@ public void testReplicaManagerFetchShouldHappenOnComplete() {

when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
when(sp0.acquire(any(), any())).thenReturn(CompletableFuture.completedFuture(
Collections.singletonList(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))));
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetchData)
.withReplicaManager(replicaManager)
Expand Down Expand Up @@ -261,10 +297,99 @@ public void testToCompleteAnAlreadyCompletedFuture() {
Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions();
}

@Test
public void testForceCompleteTriggersDelayedActionsQueue() {
String groupId = "grp";
Uuid topicId = Uuid.randomUuid();
ReplicaManager replicaManager = mock(ReplicaManager.class);
TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
TopicIdPartition tp2 = new TopicIdPartition(topicId, new TopicPartition("foo", 2));
Map<TopicIdPartition, Integer> partitionMaxBytes1 = new HashMap<>();
partitionMaxBytes1.put(tp0, PARTITION_MAX_BYTES);
partitionMaxBytes1.put(tp1, PARTITION_MAX_BYTES);

SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
SharePartition sp2 = mock(SharePartition.class);
// No share partition is available for acquiring initially.
when(sp0.maybeAcquireFetchLock()).thenReturn(false);
when(sp1.maybeAcquireFetchLock()).thenReturn(false);
when(sp2.maybeAcquireFetchLock()).thenReturn(false);

Map<SharePartitionKey, SharePartition> partitionCacheMap = new ConcurrentHashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);

ShareFetchData shareFetchData1 = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes1);

DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);

Set<Object> delayedShareFetchWatchKeys = new HashSet<>();
partitionMaxBytes1.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId, topicIdPartition)));

DelayedShareFetch delayedShareFetch1 = DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetchData1)
.withReplicaManager(replicaManager)
.withPartitionCacheMap(partitionCacheMap)
.build();

// We add a delayed share fetch entry to the purgatory which will be waiting for completion since none of the
adixitconfluent marked this conversation as resolved.
Show resolved Hide resolved
// partitions in the share fetch request can be acquired.
delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch1, CollectionConverters.asScala(delayedShareFetchWatchKeys).toSeq());

assertEquals(2, delayedShareFetchPurgatory.watched());
assertFalse(shareFetchData1.future().isDone());

Map<TopicIdPartition, Integer> partitionMaxBytes2 = new HashMap<>();
partitionMaxBytes2.put(tp1, PARTITION_MAX_BYTES);
partitionMaxBytes2.put(tp2, PARTITION_MAX_BYTES);
ShareFetchData shareFetchData2 = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes2);

doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
// Counter for checking the number of times replicaManager.addCompleteDelayedShareFetchPurgatoryAction is called in the test.
AtomicInteger counter = new AtomicInteger();
doAnswer(invocationOnMock -> counter.addAndGet(1)).when(replicaManager).addCompleteDelayedShareFetchPurgatoryAction(
CollectionConverters.asScala(Collections.singleton(tp1)).toSeq(), groupId, delayedShareFetchPurgatory);

DelayedShareFetch delayedShareFetch2 = DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetchData2)
.withReplicaManager(replicaManager)
.withPartitionCacheMap(partitionCacheMap)
.withDelayedShareFetchPurgatory(delayedShareFetchPurgatory)
.build();

// sp1 can be acquired now
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(true);
when(sp1.acquire(any(), any())).thenReturn(CompletableFuture.completedFuture(
Collections.singletonList(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))));

// when forceComplete is called for delayedShareFetch2, since tp1 is common in between delayed share fetch
// requests, it should add a "check and complete" action for request key tp1 on the purgatory.
delayedShareFetch2.forceComplete();
assertTrue(delayedShareFetch2.isCompleted());
assertTrue(shareFetchData2.future().isDone());
Mockito.verify(replicaManager, times(1)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
assertFalse(delayedShareFetch1.isCompleted());
assertEquals(1, counter.get());
}

static class DelayedShareFetchBuilder {
ShareFetchData shareFetchData = mock(ShareFetchData.class);
private ReplicaManager replicaManager = mock(ReplicaManager.class);
private Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
private DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = mock(DelayedOperationPurgatory.class);

DelayedShareFetchBuilder withShareFetchData(ShareFetchData shareFetchData) {
this.shareFetchData = shareFetchData;
Expand All @@ -281,15 +406,21 @@ DelayedShareFetchBuilder withPartitionCacheMap(Map<SharePartitionKey, ShareParti
return this;
}

DelayedShareFetchBuilder withDelayedShareFetchPurgatory(DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory) {
this.delayedShareFetchPurgatory = delayedShareFetchPurgatory;
return this;
}

public static DelayedShareFetchBuilder builder() {
return new DelayedShareFetchBuilder();
}

public DelayedShareFetch build() {
return new DelayedShareFetch(
shareFetchData,
replicaManager,
partitionCacheMap);
shareFetchData,
replicaManager,
partitionCacheMap,
delayedShareFetchPurgatory);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ public class SharePartitionManagerTest {
private static final int RECORD_LOCK_DURATION_MS = 30000;
private static final int MAX_DELIVERY_COUNT = 5;
private static final short MAX_IN_FLIGHT_MESSAGES = 200;
static final int PARTITION_MAX_BYTES = 40000;
private static final int DELAYED_SHARE_FETCH_MAX_WAIT_MS = 2000;
private static final int DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL = 1000;
private static final int DELAYED_SHARE_FETCH_TIMEOUT_MS = 3000;
static final int PARTITION_MAX_BYTES = 40000;
static final int DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL = 1000;

private static Timer mockTimer;

Expand Down Expand Up @@ -2270,7 +2270,7 @@ private void assertErroneousAndValidTopicIdPartitions(
assertEquals(expectedValidSet, actualValidPartitions);
}

private Seq<Tuple2<TopicIdPartition, LogReadResult>> buildLogReadResult(Set<TopicIdPartition> topicIdPartitions) {
static Seq<Tuple2<TopicIdPartition, LogReadResult>> buildLogReadResult(Set<TopicIdPartition> topicIdPartitions) {
List<Tuple2<TopicIdPartition, LogReadResult>> logReadResults = new ArrayList<>();
topicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult(
new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
Expand Down