Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17509: Introduce a delayed action queue to complete purgatory actions outside purgatory. #17177

Merged
merged 9 commits into from
Oct 4, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import kafka.log.remote.RemoteLogManager;
import kafka.server.AddPartitionsToTxnManager;
import kafka.server.AlterPartitionManager;
import kafka.server.DelayedActionQueue;
import kafka.server.DelayedDeleteRecords;
import kafka.server.DelayedElectLeader;
import kafka.server.DelayedFetch;
Expand Down Expand Up @@ -216,6 +217,7 @@ public ReplicaManager build() {
OptionConverters.toScala(threadNamePrefix),
() -> brokerEpoch,
OptionConverters.toScala(addPartitionsToTxnManager),
directoryEventHandler);
directoryEventHandler,
new DelayedActionQueue());
}
}
20 changes: 19 additions & 1 deletion core/src/main/java/kafka/server/share/DelayedShareFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package kafka.server.share;

import kafka.server.ActionQueue;
import kafka.server.DelayedOperation;
import kafka.server.DelayedOperationPurgatory;
import kafka.server.LogReadResult;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
Expand Down Expand Up @@ -54,17 +56,23 @@ public class DelayedShareFetch extends DelayedOperation {
private final ShareFetchData shareFetchData;
private final ReplicaManager replicaManager;
private final Map<SharePartitionKey, SharePartition> partitionCacheMap;
private final ActionQueue delayedActionQueue;
private final DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory;

private Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionDataFromTryComplete;

DelayedShareFetch(
ShareFetchData shareFetchData,
ReplicaManager replicaManager,
Map<SharePartitionKey, SharePartition> partitionCacheMap) {
Map<SharePartitionKey, SharePartition> partitionCacheMap,
ActionQueue delayedActionQueue,
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory) {
super(shareFetchData.fetchParams().maxWaitMs, Option.empty());
this.shareFetchData = shareFetchData;
this.replicaManager = replicaManager;
this.partitionCacheMap = partitionCacheMap;
this.delayedActionQueue = delayedActionQueue;
this.delayedShareFetchPurgatory = delayedShareFetchPurgatory;
adixitconfluent marked this conversation as resolved.
Show resolved Hide resolved
this.topicPartitionDataFromTryComplete = new LinkedHashMap<>();
}

Expand Down Expand Up @@ -131,6 +139,16 @@ public void onComplete() {
}
// Releasing the lock to move ahead with the next request in queue.
releasePartitionLocks(shareFetchData.groupId(), topicPartitionData.keySet());
// If we have a fetch request completed for a topic-partition, we release the locks for that partition,
// then we should check if there is a pending share fetch request for the topic-partition and complete it.
// We add the action to delayed actions queue to avoid an infinite call stack, which could happen if
// we directly call delayedShareFetchPurgatory.checkAndComplete
delayedActionQueue.add(() -> {
adixitconfluent marked this conversation as resolved.
Show resolved Hide resolved
result.keySet().forEach(topicIdPartition ->
delayedShareFetchPurgatory.checkAndComplete(
new DelayedShareFetchKey(shareFetchData.groupId(), topicIdPartition)));
return BoxedUnit.UNIT;
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about moving this logic to SharePartitionManager as discussed in #17177 (comment)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @junrao @mumrah , yes, I think we can move it outside DelayedShareFetch class to SharePartitionManager. Also, I'll also add TopicPartition as a key for delayed share fetch along with SharePartition (that is already present right now). I have compiled these details in JIRA https://issues.apache.org/jira/browse/KAFKA-17703. Will it be fine if we can merge this PR and I'll take up that JIRA in a future PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Sounds good to me.

});

} catch (Exception e) {
Expand Down
16 changes: 14 additions & 2 deletions core/src/main/java/kafka/server/share/SharePartitionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package kafka.server.share;

import kafka.server.ActionQueue;
import kafka.server.DelayedOperationPurgatory;
import kafka.server.ReplicaManager;

Expand Down Expand Up @@ -147,6 +148,11 @@ public class SharePartitionManager implements AutoCloseable {
*/
private final DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory;

/**
* The delayed actions queue is used to complete any pending delayed share fetch actions.
*/
private final ActionQueue delayedActionsQueue;

public SharePartitionManager(
ReplicaManager replicaManager,
Time time,
Expand All @@ -156,6 +162,7 @@ public SharePartitionManager(
int maxInFlightMessages,
int shareFetchPurgatoryPurgeIntervalRequests,
Persister persister,
ActionQueue delayedActionsQueue,
Metrics metrics
) {
this(replicaManager,
Expand All @@ -167,6 +174,7 @@ public SharePartitionManager(
maxInFlightMessages,
shareFetchPurgatoryPurgeIntervalRequests,
persister,
delayedActionsQueue,
metrics
);
}
Expand All @@ -181,6 +189,7 @@ private SharePartitionManager(
int maxInFlightMessages,
int shareFetchPurgatoryPurgeIntervalRequests,
Persister persister,
ActionQueue delayedActionsQueue,
Metrics metrics
) {
this.replicaManager = replicaManager;
Expand All @@ -197,6 +206,7 @@ private SharePartitionManager(
this.persister = persister;
this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time);
this.delayedShareFetchPurgatory = new DelayedOperationPurgatory<>("ShareFetch", this.timer, this.replicaManager.localBrokerId(), shareFetchPurgatoryPurgeIntervalRequests, true, true);
this.delayedActionsQueue = delayedActionsQueue;
}

// Visible for testing.
Expand All @@ -212,7 +222,8 @@ private SharePartitionManager(
int maxInFlightMessages,
Persister persister,
Metrics metrics,
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory,
ActionQueue delayedActionsQueue
) {
this.replicaManager = replicaManager;
this.time = time;
Expand All @@ -227,6 +238,7 @@ private SharePartitionManager(
this.persister = persister;
this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time);
this.delayedShareFetchPurgatory = delayedShareFetchPurgatory;
this.delayedActionsQueue = delayedActionsQueue;
}

/**
Expand Down Expand Up @@ -600,7 +612,7 @@ void maybeProcessFetchQueue() {
new DelayedShareFetchKey(shareFetchData.groupId(), topicIdPartition)));

// Add the share fetch to the delayed share fetch purgatory to process the fetch request.
addDelayedShareFetch(new DelayedShareFetch(shareFetchData, replicaManager, partitionCacheMap),
addDelayedShareFetch(new DelayedShareFetch(shareFetchData, replicaManager, partitionCacheMap, delayedActionsQueue, delayedShareFetchPurgatory),
delayedShareFetchWatchKeys);

// Release the lock so that other threads can process the queue.
Expand Down
9 changes: 8 additions & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,11 @@ class BrokerServer(
lifecycleManager.propagateDirectoryFailure(directoryId, config.logDirFailureTimeoutMs)
}

/**
* TODO: move this action queue to handle thread so we can simplify concurrency handling
*/
val defaultActionQueue = new DelayedActionQueue
adixitconfluent marked this conversation as resolved.
Show resolved Hide resolved

this._replicaManager = new ReplicaManager(
config = config,
metrics = metrics,
Expand All @@ -338,7 +343,8 @@ class BrokerServer(
delayedRemoteFetchPurgatoryParam = None,
brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
addPartitionsToTxnManager = Some(addPartitionsToTxnManager),
directoryEventHandler = directoryEventHandler
directoryEventHandler = directoryEventHandler,
defaultActionQueue = defaultActionQueue
)

/* start token manager */
Expand Down Expand Up @@ -423,6 +429,7 @@ class BrokerServer(
config.shareGroupConfig.shareGroupPartitionMaxRecordLocks,
config.shareGroupConfig.shareFetchPurgatoryPurgeIntervalRequests,
persister,
defaultActionQueue,
new Metrics()
)

Expand Down
8 changes: 2 additions & 6 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ class ReplicaManager(val config: KafkaConfig,
threadNamePrefix: Option[String] = None,
val brokerEpochSupplier: () => Long = () => -1,
addPartitionsToTxnManager: Option[AddPartitionsToTxnManager] = None,
val directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP
val directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP,
val defaultActionQueue: ActionQueue = new DelayedActionQueue
) extends Logging {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)

Expand Down Expand Up @@ -741,11 +742,6 @@ class ReplicaManager(val config: KafkaConfig,
localLog(topicPartition).map(_.parentDir)
}

/**
* TODO: move this action queue to handle thread so we can simplify concurrency handling
*/
private val defaultActionQueue = new DelayedActionQueue

def tryCompleteActions(): Unit = defaultActionQueue.tryCompleteActions()

/**
Expand Down
Loading