Skip to content

KAFKA-19322: Remove the DelayedOperation constructor that accepts an external lock #19798

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 26, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public DelayedShareFetch(
Uuid fetchId,
long remoteFetchMaxWaitMs
) {
super(shareFetch.fetchParams().maxWaitMs, Optional.empty());
super(shareFetch.fetchParams().maxWaitMs);
this.shareFetch = shareFetch;
this.replicaManager = replicaManager;
this.partitionsAcquired = new LinkedHashMap<>();
Expand Down
7 changes: 2 additions & 5 deletions core/src/main/scala/kafka/server/DelayedProduce.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package kafka.server

import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent.locks.Lock
import com.typesafe.scalalogging.Logger
import com.yammer.metrics.core.Meter
import kafka.utils.Logging
Expand All @@ -30,7 +29,6 @@ import org.apache.kafka.server.purgatory.DelayedOperation

import scala.collection._
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption

case class ProducePartitionStatus(requiredOffset: Long, responseStatus: PartitionResponse) {
@volatile var acksPending = false
Expand Down Expand Up @@ -59,9 +57,8 @@ object DelayedProduce {
class DelayedProduce(delayMs: Long,
produceMetadata: ProduceMetadata,
replicaManager: ReplicaManager,
responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit,
lockOpt: Option[Lock])
extends DelayedOperation(delayMs, lockOpt.toJava) with Logging {
responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit)
extends DelayedOperation(delayMs) with Logging {

override lazy val logger: Logger = DelayedProduce.logger

Expand Down
7 changes: 1 addition & 6 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ import java.lang.{Long => JLong}
import java.nio.file.{Files, Paths}
import java.util
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.Lock
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, Future, RejectedExecutionException, TimeUnit}
import java.util.{Collections, Optional, OptionalInt, OptionalLong}
import java.util.function.Consumer
Expand Down Expand Up @@ -723,7 +722,6 @@ class ReplicaManager(val config: KafkaConfig,
* If topic partition contains Uuid.ZERO_UUID as topicId the method
* will fall back to the old behaviour and rely on topic name.
* @param responseCallback callback for sending the response
* @param delayedProduceLock lock for the delayed actions
* @param recordValidationStatsCallback callback for updating stats on record conversions
* @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the
* thread calling this method
Expand All @@ -735,7 +733,6 @@ class ReplicaManager(val config: KafkaConfig,
origin: AppendOrigin,
entriesPerPartition: Map[TopicIdPartition, MemoryRecords],
responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock] = None,
recordValidationStatsCallback: Map[TopicIdPartition, RecordValidationStats] => Unit = _ => (),
requestLocal: RequestLocal = RequestLocal.noCaching,
verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty): Unit = {
Expand All @@ -762,7 +759,6 @@ class ReplicaManager(val config: KafkaConfig,

maybeAddDelayedProduce(
requiredAcks,
delayedProduceLock,
timeout,
entriesPerPartition,
localProduceResults,
Expand Down Expand Up @@ -967,7 +963,6 @@ class ReplicaManager(val config: KafkaConfig,

private def maybeAddDelayedProduce(
requiredAcks: Short,
delayedProduceLock: Option[Lock],
timeoutMs: Long,
entriesPerPartition: Map[TopicIdPartition, MemoryRecords],
initialAppendResults: Map[TopicIdPartition, LogAppendResult],
Expand All @@ -977,7 +972,7 @@ class ReplicaManager(val config: KafkaConfig,
if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, initialAppendResults)) {
// create delayed produce operation
val produceMetadata = ProduceMetadata(requiredAcks, initialProduceStatus)
val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, this, responseCallback, delayedProduceLock)
val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, this, responseCallback)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove delayedProduceLock from method signature too


// create a list of (topic, partition) pairs to use as keys for this delayed produce operation
val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package kafka.coordinator
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Executors}
import java.util.{Collections, Random}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.Lock
import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
import kafka.cluster.Partition
import kafka.log.LogManager
Expand Down Expand Up @@ -216,7 +215,6 @@ object AbstractCoordinatorConcurrencyTest {
origin: AppendOrigin,
entriesPerPartition: Map[TopicIdPartition, MemoryRecords],
responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock] = None,
processingStatsCallback: Map[TopicIdPartition, RecordValidationStats] => Unit = _ => (),
requestLocal: RequestLocal = RequestLocal.noCaching,
verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty): Unit = {
Expand All @@ -227,7 +225,7 @@ object AbstractCoordinatorConcurrencyTest {
case (tp, _) =>
(tp, ProducePartitionStatus(0L, new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
})
val delayedProduce = new DelayedProduce(5, produceMetadata, this, responseCallback, delayedProduceLock) {
val delayedProduce = new DelayedProduce(5, produceMetadata, this, responseCallback) {
// Complete produce requests after a few attempts to trigger delayed produce from different threads
val completeAttempts = new AtomicInteger
override def tryComplete(): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package kafka.coordinator.transaction
import java.lang.management.ManagementFactory
import java.nio.ByteBuffer
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
import java.util.concurrent.locks.ReentrantLock
import javax.management.ObjectName
import kafka.server.ReplicaManager
import kafka.utils.TestUtils
Expand Down Expand Up @@ -758,7 +757,6 @@ class TransactionStateManagerTest {
ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any(),
any(),
any[Option[ReentrantLock]],
any(),
any(),
any()
Expand Down Expand Up @@ -803,7 +801,6 @@ class TransactionStateManagerTest {
ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any(),
any(),
any[Option[ReentrantLock]],
any(),
any(),
any()
Expand Down Expand Up @@ -847,7 +844,6 @@ class TransactionStateManagerTest {
ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any(),
any(),
any[Option[ReentrantLock]],
any(),
any(),
any())
Expand Down Expand Up @@ -901,7 +897,6 @@ class TransactionStateManagerTest {
ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any(),
any(),
any[Option[ReentrantLock]],
any(),
any(),
any()
Expand Down Expand Up @@ -1118,7 +1113,6 @@ class TransactionStateManagerTest {
ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
recordsCapture.capture(),
callbackCapture.capture(),
any[Option[ReentrantLock]],
any(),
any(),
any()
Expand Down Expand Up @@ -1271,7 +1265,6 @@ class TransactionStateManagerTest {
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any[Map[TopicIdPartition, MemoryRecords]],
capturedArgument.capture(),
any[Option[ReentrantLock]],
any(),
any(),
any()
Expand Down
3 changes: 0 additions & 3 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2830,7 +2830,6 @@ class KafkaApisTest extends Logging {
any(),
responseCallback.capture(),
any(),
any(),
ArgumentMatchers.eq(requestLocal),
any()
)).thenAnswer(_ => responseCallback.getValue.apply(Map(new TopicIdPartition(topicId,tp2) -> new PartitionResponse(Errors.NONE))))
Expand Down Expand Up @@ -2886,7 +2885,6 @@ class KafkaApisTest extends Logging {
any(),
any(),
any(),
any(),
ArgumentMatchers.eq(requestLocal),
any())
}
Expand Down Expand Up @@ -2965,7 +2963,6 @@ class KafkaApisTest extends Logging {
entriesPerPartition.capture(),
responseCallback.capture(),
any(),
any(),
ArgumentMatchers.eq(RequestLocal.noCaching),
any()
)).thenAnswer { _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

import org.apache.kafka.server.util.timer.TimerTask;

import java.util.Optional;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
Expand All @@ -41,19 +39,10 @@ public abstract class DelayedOperation extends TimerTask {

private volatile boolean completed = false;

protected final Lock lock;

public DelayedOperation(long delayMs, Optional<Lock> lockOpt) {
this(delayMs, lockOpt.orElse(new ReentrantLock()));
}
protected final ReentrantLock lock = new ReentrantLock();

public DelayedOperation(long delayMs) {
this(delayMs, new ReentrantLock());
}

public DelayedOperation(long delayMs, Lock lock) {
super(delayMs);
this.lock = lock;
}

/*
Expand Down