Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17509: Introduce a delayed action queue to complete purgatory actions outside purgatory. #17177

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion core/src/main/java/kafka/server/share/DelayedShareFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package kafka.server.share;

import kafka.server.DelayedActionQueue;
import kafka.server.DelayedOperation;
import kafka.server.DelayedOperationPurgatory;
import kafka.server.LogReadResult;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
Expand Down Expand Up @@ -50,21 +52,32 @@ public class DelayedShareFetch extends DelayedOperation {
private final ReplicaManager replicaManager;
private final Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap;
private Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionDataFromTryComplete = new LinkedHashMap<>();
private final DelayedActionQueue delayedActionQueue;
private final DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory;

private static final Logger log = LoggerFactory.getLogger(DelayedShareFetch.class);

DelayedShareFetch(
SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData,
ReplicaManager replicaManager,
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap) {
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap,
DelayedActionQueue delayedActionQueue,
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory) {
super(shareFetchPartitionData.fetchParams().maxWaitMs, Option.empty());
this.shareFetchPartitionData = shareFetchPartitionData;
this.replicaManager = replicaManager;
this.partitionCacheMap = partitionCacheMap;
this.delayedActionQueue = delayedActionQueue;
this.delayedShareFetchPurgatory = delayedShareFetchPurgatory;
adixitconfluent marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Complete the delayed share fetch actions that were added to the queue. Since onExpiration serves as a callback for
* forceComplete, it should not lead to infinite call stack.
*/
@Override
public void onExpiration() {
delayedActionQueue.tryCompleteActions();
adixitconfluent marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down Expand Up @@ -126,6 +139,16 @@ public void onComplete() {
}
// Releasing the lock to move ahead with the next request in queue.
releasePartitionLocks(shareFetchPartitionData.groupId(), topicPartitionData.keySet());
// If we have a fetch request completed for a topic-partition, we release the locks for that partition,
// then we should check if there is a pending share fetch request for the topic-partition and complete it.
// We add the action to delayed actions queue to avoid an infinite call stack, which could happen if
// we directly call delayedShareFetchPurgatory.checkAndComplete
delayedActionQueue.add(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adixitconfluent I'm a little confused by the async code here. We are gathering some futures in ShareFetchUtils#processFetchResponse, but when I look down into SharePartition#acquire it's all synchronous/blocking code (it just returns a completed CompletableFuture).

Is this some leftovers from the refactoring? Or do we intend to make SharePartition#acquire async?

I ask this because if we're not keeping the CompletableFuture return type in SharePartition#acquire, we can fix it in this PR and avoid some complexity here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mumrah , we created a JIRA https://issues.apache.org/jira/browse/KAFKA-17522 for tracking this issue earlier. Yes, it makes sense that share partition acquire functionality need not return a future. I am not sure whether I should cover it in this PR itself.
@apoorvmittal10 any thoughts whether we should cover it in this PR or since the JIRA is assigned to you, if you're working on it already, we can have another PR for the resolution?

result.keySet().forEach(topicIdPartition ->
delayedShareFetchPurgatory.checkAndComplete(
new DelayedShareFetchKey(shareFetchPartitionData.groupId(), topicIdPartition)));
return BoxedUnit.UNIT;
});
});

} catch (Exception e) {
Expand Down
14 changes: 12 additions & 2 deletions core/src/main/java/kafka/server/share/SharePartitionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package kafka.server.share;

import kafka.server.DelayedActionQueue;
import kafka.server.DelayedOperationPurgatory;
import kafka.server.ReplicaManager;

Expand Down Expand Up @@ -145,6 +146,11 @@ public class SharePartitionManager implements AutoCloseable {
*/
private final DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory;

/**
* The delayed actions queue is used to complete any pending delayed share fetch actions.
*/
private final DelayedActionQueue delayedActionsQueue;

public SharePartitionManager(
ReplicaManager replicaManager,
Time time,
Expand Down Expand Up @@ -195,6 +201,7 @@ private SharePartitionManager(
this.persister = persister;
this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time);
this.delayedShareFetchPurgatory = new DelayedOperationPurgatory<>("ShareFetch", this.timer, this.replicaManager.localBrokerId(), shareFetchPurgatoryPurgeIntervalRequests, true, true);
this.delayedActionsQueue = new DelayedActionQueue();
}

// Visible for testing.
Expand All @@ -210,7 +217,8 @@ private SharePartitionManager(
int maxInFlightMessages,
Persister persister,
Metrics metrics,
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory,
DelayedActionQueue delayedActionsQueue
) {
this.replicaManager = replicaManager;
this.time = time;
Expand All @@ -225,6 +233,7 @@ private SharePartitionManager(
this.persister = persister;
this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time);
this.delayedShareFetchPurgatory = delayedShareFetchPurgatory;
this.delayedActionsQueue = delayedActionsQueue;
}

/**
Expand Down Expand Up @@ -524,6 +533,7 @@ private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, Set<Objec

@Override
public void close() throws Exception {
this.delayedActionsQueue.tryCompleteActions();
adixitconfluent marked this conversation as resolved.
Show resolved Hide resolved
this.delayedShareFetchPurgatory.shutdown();
this.timer.close();
this.persister.stop();
Expand Down Expand Up @@ -597,7 +607,7 @@ void maybeProcessFetchQueue() {
new DelayedShareFetchKey(shareFetchPartitionData.groupId, topicIdPartition)));

// Add the share fetch to the delayed share fetch purgatory to process the fetch request.
addDelayedShareFetch(new DelayedShareFetch(shareFetchPartitionData, replicaManager, partitionCacheMap),
addDelayedShareFetch(new DelayedShareFetch(shareFetchPartitionData, replicaManager, partitionCacheMap, delayedActionsQueue, delayedShareFetchPurgatory),
delayedShareFetchWatchKeys);

// Release the lock so that other threads can process the queue.
Expand Down
146 changes: 143 additions & 3 deletions core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package kafka.server.share;

import kafka.server.DelayedActionQueue;
import kafka.server.DelayedOperationPurgatory;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;

Expand All @@ -25,24 +27,37 @@
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.storage.internals.log.FetchIsolation;
import org.apache.kafka.storage.internals.log.FetchParams;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

import scala.jdk.javaapi.CollectionConverters;

import static kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL;
import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES;
import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
Expand All @@ -51,6 +66,18 @@

public class DelayedShareFetchTest {
private static final int MAX_WAIT_MS = 5000;
private static Timer mockTimer;

@BeforeEach
public void setUp() {
mockTimer = new SystemTimerReaper("DelayedShareFetchTestReaper",
new SystemTimer("DelayedShareFetchTestTimer"));
}

@AfterEach
public void tearDown() throws Exception {
mockTimer.close();
}

@Test
public void testDelayedShareFetchTryCompleteReturnsFalse() {
Expand Down Expand Up @@ -93,6 +120,7 @@ public void testDelayedShareFetchTryCompleteReturnsFalse() {
public void testDelayedShareFetchTryCompleteReturnsTrue() {
String groupId = "grp";
Uuid topicId = Uuid.randomUuid();
ReplicaManager replicaManager = mock(ReplicaManager.class);
TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
Expand All @@ -116,9 +144,14 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() {

when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
when(sp0.acquire(any(), any())).thenReturn(CompletableFuture.completedFuture(
Collections.singletonList(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))));
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());

DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder()
.withShareFetchPartitionData(shareFetchPartitionData)
.withPartitionCacheMap(partitionCacheMap)
.withReplicaManager(replicaManager)
.build();
assertFalse(delayedShareFetch.isCompleted());

Expand Down Expand Up @@ -198,6 +231,9 @@ public void testReplicaManagerFetchShouldHappenOnComplete() {

when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
when(sp0.acquire(any(), any())).thenReturn(CompletableFuture.completedFuture(
Collections.singletonList(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))));
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder()
.withShareFetchPartitionData(shareFetchPartitionData)
.withReplicaManager(replicaManager)
Expand Down Expand Up @@ -259,10 +295,102 @@ public void testToCompleteAnAlreadyCompletedFuture() {
Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions();
}

@Test
public void testForceCompleteTriggersDelayedActionsQueue() {
String groupId = "grp";
Uuid topicId = Uuid.randomUuid();
ReplicaManager replicaManager = mock(ReplicaManager.class);
TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
TopicIdPartition tp2 = new TopicIdPartition(topicId, new TopicPartition("foo", 2));
Map<TopicIdPartition, Integer> partitionMaxBytes1 = new HashMap<>();
partitionMaxBytes1.put(tp0, PARTITION_MAX_BYTES);
partitionMaxBytes1.put(tp1, PARTITION_MAX_BYTES);

SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
SharePartition sp2 = mock(SharePartition.class);
// No share partition is available for acquiring initially.
when(sp0.maybeAcquireFetchLock()).thenReturn(false);
when(sp1.maybeAcquireFetchLock()).thenReturn(false);
when(sp2.maybeAcquireFetchLock()).thenReturn(false);

Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new ConcurrentHashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);

SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData1 = new SharePartitionManager.ShareFetchPartitionData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes1);

DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);

Set<Object> delayedShareFetchWatchKeys = new HashSet<>();
partitionMaxBytes1.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId, topicIdPartition)));

DelayedShareFetch delayedShareFetch1 = DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
.withShareFetchPartitionData(shareFetchPartitionData1)
.withReplicaManager(replicaManager)
.withPartitionCacheMap(partitionCacheMap)
.build();

// We add a delayed share fetch entry to the purgatory which will be waiting for completion since none of the
// partitions in the share fetch request can be acquired.
delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch1, CollectionConverters.asScala(delayedShareFetchWatchKeys).toSeq());

assertEquals(2, delayedShareFetchPurgatory.watched());
assertFalse(shareFetchPartitionData1.future().isDone());

Map<TopicIdPartition, Integer> partitionMaxBytes2 = new HashMap<>();
partitionMaxBytes2.put(tp1, PARTITION_MAX_BYTES);
partitionMaxBytes2.put(tp2, PARTITION_MAX_BYTES);
SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData2 = new SharePartitionManager.ShareFetchPartitionData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes2);

doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());

DelayedActionQueue delayedActionQueue = spy(new DelayedActionQueue());
DelayedShareFetch delayedShareFetch2 = DelayedShareFetchBuilder.builder()
.withShareFetchPartitionData(shareFetchPartitionData2)
.withReplicaManager(replicaManager)
.withPartitionCacheMap(partitionCacheMap)
.withDelayedActionQueue(delayedActionQueue)
.withDelayedShareFetchPurgatory(delayedShareFetchPurgatory)
.build();

// sp1 can be acquired now
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(true);
when(sp1.acquire(any(), any())).thenReturn(CompletableFuture.completedFuture(
Collections.singletonList(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))));

// run() method calls forceComplete followed by onExpiration. Since tp1 is common in between delayed share fetch
// requests, it should add a "check and complete" action for request key tp1 on the purgatory. When onExpiration
// gets called, then the delayedShareFetch1 should also be completed.
delayedShareFetch2.run();
assertTrue(delayedShareFetch2.isCompleted());
assertTrue(shareFetchPartitionData2.future().isDone());
Mockito.verify(replicaManager, times(2)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
assertTrue(delayedShareFetch1.isCompleted());
Mockito.verify(delayedActionQueue, times(1)).tryCompleteActions();
assertTrue(shareFetchPartitionData1.future().isDone());
// Only 1 watch key should be present in the purgatory corresponding to tp0. The entry corresponding to tp2 would have been completed.
assertEquals(1, delayedShareFetchPurgatory.watched());
}

static class DelayedShareFetchBuilder {
SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = mock(SharePartitionManager.ShareFetchPartitionData.class);
private ReplicaManager replicaManager = mock(ReplicaManager.class);
private Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
private DelayedActionQueue delayedActionsQueue = mock(DelayedActionQueue.class);
private DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = mock(DelayedOperationPurgatory.class);

DelayedShareFetchBuilder withShareFetchPartitionData(SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData) {
this.shareFetchPartitionData = shareFetchPartitionData;
Expand All @@ -279,15 +407,27 @@ DelayedShareFetchBuilder withPartitionCacheMap(Map<SharePartitionManager.SharePa
return this;
}

DelayedShareFetchBuilder withDelayedActionQueue(DelayedActionQueue delayedActionsQueue) {
this.delayedActionsQueue = delayedActionsQueue;
return this;
}

DelayedShareFetchBuilder withDelayedShareFetchPurgatory(DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory) {
this.delayedShareFetchPurgatory = delayedShareFetchPurgatory;
return this;
}

public static DelayedShareFetchBuilder builder() {
return new DelayedShareFetchBuilder();
}

public DelayedShareFetch build() {
return new DelayedShareFetch(
shareFetchPartitionData,
replicaManager,
partitionCacheMap);
shareFetchPartitionData,
replicaManager,
partitionCacheMap,
delayedActionsQueue,
delayedShareFetchPurgatory);
}
}
}
Loading