From cec1ac8ad7fcfe930ed16e03988021daf5779853 Mon Sep 17 00:00:00 2001 From: YuChia Ma Date: Fri, 23 May 2025 17:26:09 +0000 Subject: [PATCH 1/8] KAFKA-19322: Remove DelayedOperation constructor that accepts external lock --- .../kafka/server/purgatory/DelayedOperation.java | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java index f3c818cb9c6c6..aa40288a39ee1 100644 --- a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java +++ b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java @@ -43,19 +43,10 @@ public abstract class DelayedOperation extends TimerTask { private final AtomicBoolean completed = new AtomicBoolean(false); - protected final Lock lock; - - public DelayedOperation(long delayMs, Optional lockOpt) { - this(delayMs, lockOpt.orElse(new ReentrantLock())); - } + protected final Lock lock = new ReentrantLock(); public DelayedOperation(long delayMs) { - this(delayMs, new ReentrantLock()); - } - - public DelayedOperation(long delayMs, Lock lock) { super(delayMs); - this.lock = lock; } /* From dd5955646f5e15007cdc003aa643f4a50b52d32b Mon Sep 17 00:00:00 2001 From: YuChia Ma Date: Fri, 23 May 2025 18:28:33 +0000 Subject: [PATCH 2/8] remove all usages of DelayedOperation external lock constructors --- .../java/kafka/server/share/DelayedShareFetch.java | 2 +- core/src/main/scala/kafka/server/DelayedProduce.scala | 11 ++++------- core/src/main/scala/kafka/server/ReplicaManager.scala | 2 +- .../AbstractCoordinatorConcurrencyTest.scala | 2 +- .../kafka/server/purgatory/DelayedOperation.java | 1 - 5 files changed, 7 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index 0a46e834eb7f5..8108681f54ed8 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -172,7 +172,7 @@ public DelayedShareFetch( Uuid fetchId, long remoteFetchMaxWaitMs ) { - super(shareFetch.fetchParams().maxWaitMs, Optional.empty()); + super(shareFetch.fetchParams().maxWaitMs); this.shareFetch = shareFetch; this.replicaManager = replicaManager; this.partitionsAcquired = new LinkedHashMap<>(); diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index b60c79125b14f..8f24363eeb49b 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -17,20 +17,18 @@ package kafka.server -import java.util.concurrent.{ConcurrentHashMap, TimeUnit} -import java.util.concurrent.locks.Lock import com.typesafe.scalalogging.Logger import com.yammer.metrics.core.Meter import kafka.utils.Logging -import org.apache.kafka.common.{TopicIdPartition, TopicPartition} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse +import org.apache.kafka.common.{TopicIdPartition, TopicPartition} import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.purgatory.DelayedOperation +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import scala.collection._ import scala.jdk.CollectionConverters._ -import scala.jdk.OptionConverters.RichOption case class ProducePartitionStatus(requiredOffset: Long, responseStatus: PartitionResponse) { @volatile var acksPending = false @@ -59,9 +57,8 @@ object DelayedProduce { class DelayedProduce(delayMs: Long, produceMetadata: ProduceMetadata, replicaManager: ReplicaManager, - responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit, - lockOpt: Option[Lock]) - extends DelayedOperation(delayMs, lockOpt.toJava) with Logging { + responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit) + extends DelayedOperation(delayMs) with Logging { override lazy val logger: Logger = DelayedProduce.logger diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index b0b2e72602a7a..fc7ec79fd5e65 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -977,7 +977,7 @@ class ReplicaManager(val config: KafkaConfig, if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, initialAppendResults)) { // create delayed produce operation val produceMetadata = ProduceMetadata(requiredAcks, initialProduceStatus) - val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, this, responseCallback, delayedProduceLock) + val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, this, responseCallback) // create a list of (topic, partition) pairs to use as keys for this delayed produce operation val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toList diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala index 56ccb8229475d..847ea5ab2abe8 100644 --- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala @@ -227,7 +227,7 @@ object AbstractCoordinatorConcurrencyTest { case (tp, _) => (tp, ProducePartitionStatus(0L, new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L))) }) - val delayedProduce = new DelayedProduce(5, produceMetadata, this, responseCallback, delayedProduceLock) { + val delayedProduce = new DelayedProduce(5, produceMetadata, this, responseCallback) { // Complete produce requests after a few attempts to trigger delayed produce from different threads val completeAttempts = new AtomicInteger override def tryComplete(): Boolean = { diff --git a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java index aa40288a39ee1..881719faa69ac 100644 --- a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java +++ b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java @@ -18,7 +18,6 @@ import org.apache.kafka.server.util.timer.TimerTask; -import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; From 6dd739895bdd78f7d73865443d5502310bc8086b Mon Sep 17 00:00:00 2001 From: YuChia Ma Date: Sat, 24 May 2025 09:21:50 +0000 Subject: [PATCH 3/8] fix imports order --- core/src/main/scala/kafka/server/DelayedProduce.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 8f24363eeb49b..1d21ec78e4c63 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -17,16 +17,16 @@ package kafka.server +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import com.typesafe.scalalogging.Logger import com.yammer.metrics.core.Meter import kafka.utils.Logging +import org.apache.kafka.common.{TopicIdPartition, TopicPartition} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse -import org.apache.kafka.common.{TopicIdPartition, TopicPartition} import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.purgatory.DelayedOperation -import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import scala.collection._ import scala.jdk.CollectionConverters._ From 3e50e6900837d5cdc9464449796be3982d1f6019 Mon Sep 17 00:00:00 2001 From: YuChia Ma Date: Sun, 25 May 2025 07:05:36 +0000 Subject: [PATCH 4/8] Remove DelayedOperation constructor that accepts external lock --- .../kafka/server/purgatory/DelayedOperation.java | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java index 16bfdabdb67a3..922a0285fb758 100644 --- a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java +++ b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java @@ -18,7 +18,6 @@ import org.apache.kafka.server.util.timer.TimerTask; -import java.util.Optional; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -41,19 +40,10 @@ public abstract class DelayedOperation extends TimerTask { private volatile boolean completed = false; - protected final Lock lock; - - public DelayedOperation(long delayMs, Optional lockOpt) { - this(delayMs, lockOpt.orElse(new ReentrantLock())); - } + protected final Lock lock = new ReentrantLock(); public DelayedOperation(long delayMs) { - this(delayMs, new ReentrantLock()); - } - - public DelayedOperation(long delayMs, Lock lock) { super(delayMs); - this.lock = lock; } /* From b436694e488c56f05b44f9ea47190c672e7cb27f Mon Sep 17 00:00:00 2001 From: YuChia Ma Date: Sun, 25 May 2025 18:10:56 +0000 Subject: [PATCH 5/8] change `lock` type and remove signature --- core/src/main/scala/kafka/server/ReplicaManager.scala | 1 - .../org/apache/kafka/server/purgatory/DelayedOperation.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index fc7ec79fd5e65..53da673f5a324 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -967,7 +967,6 @@ class ReplicaManager(val config: KafkaConfig, private def maybeAddDelayedProduce( requiredAcks: Short, - delayedProduceLock: Option[Lock], timeoutMs: Long, entriesPerPartition: Map[TopicIdPartition, MemoryRecords], initialAppendResults: Map[TopicIdPartition, LogAppendResult], diff --git a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java index 922a0285fb758..edf9b55c86512 100644 --- a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java +++ b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java @@ -40,7 +40,7 @@ public abstract class DelayedOperation extends TimerTask { private volatile boolean completed = false; - protected final Lock lock = new ReentrantLock(); + protected final ReentrantLock lock = new ReentrantLock(); public DelayedOperation(long delayMs) { super(delayMs); From 981c60696c19515bef199f8f90d91aac90f47c84 Mon Sep 17 00:00:00 2001 From: YuChia Ma Date: Sun, 25 May 2025 18:12:58 +0000 Subject: [PATCH 6/8] change `lock` type and remove signature --- .../java/org/apache/kafka/server/purgatory/DelayedOperation.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java index edf9b55c86512..82b91c44cca84 100644 --- a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java +++ b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java @@ -18,7 +18,6 @@ import org.apache.kafka.server.util.timer.TimerTask; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** From 65d81e1c2b5741f4aab1feaa673939efe5bf1300 Mon Sep 17 00:00:00 2001 From: YuChia Ma Date: Sun, 25 May 2025 18:19:56 +0000 Subject: [PATCH 7/8] remove method signature --- core/src/main/scala/kafka/server/ReplicaManager.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 53da673f5a324..da66aed2744d5 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -762,7 +762,6 @@ class ReplicaManager(val config: KafkaConfig, maybeAddDelayedProduce( requiredAcks, - delayedProduceLock, timeout, entriesPerPartition, localProduceResults, From df29ab988958646b47d3dcfe9f562545876bd27c Mon Sep 17 00:00:00 2001 From: YuChia Ma Date: Sun, 25 May 2025 19:53:08 +0000 Subject: [PATCH 8/8] cleanup `appendRecords` --- core/src/main/scala/kafka/server/ReplicaManager.scala | 3 --- .../coordinator/AbstractCoordinatorConcurrencyTest.scala | 2 -- .../transaction/TransactionStateManagerTest.scala | 7 ------- core/src/test/scala/unit/kafka/server/KafkaApisTest.scala | 3 --- 4 files changed, 15 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index da66aed2744d5..ba34b2af13256 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -71,7 +71,6 @@ import java.lang.{Long => JLong} import java.nio.file.{Files, Paths} import java.util import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.locks.Lock import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, Future, RejectedExecutionException, TimeUnit} import java.util.{Collections, Optional, OptionalInt, OptionalLong} import java.util.function.Consumer @@ -723,7 +722,6 @@ class ReplicaManager(val config: KafkaConfig, * If topic partition contains Uuid.ZERO_UUID as topicId the method * will fall back to the old behaviour and rely on topic name. * @param responseCallback callback for sending the response - * @param delayedProduceLock lock for the delayed actions * @param recordValidationStatsCallback callback for updating stats on record conversions * @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the * thread calling this method @@ -735,7 +733,6 @@ class ReplicaManager(val config: KafkaConfig, origin: AppendOrigin, entriesPerPartition: Map[TopicIdPartition, MemoryRecords], responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit, - delayedProduceLock: Option[Lock] = None, recordValidationStatsCallback: Map[TopicIdPartition, RecordValidationStats] => Unit = _ => (), requestLocal: RequestLocal = RequestLocal.noCaching, verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty): Unit = { diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala index 847ea5ab2abe8..2e9f95beb512e 100644 --- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala @@ -20,7 +20,6 @@ package kafka.coordinator import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Executors} import java.util.{Collections, Random} import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.locks.Lock import kafka.coordinator.AbstractCoordinatorConcurrencyTest._ import kafka.cluster.Partition import kafka.log.LogManager @@ -216,7 +215,6 @@ object AbstractCoordinatorConcurrencyTest { origin: AppendOrigin, entriesPerPartition: Map[TopicIdPartition, MemoryRecords], responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit, - delayedProduceLock: Option[Lock] = None, processingStatsCallback: Map[TopicIdPartition, RecordValidationStats] => Unit = _ => (), requestLocal: RequestLocal = RequestLocal.noCaching, verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty): Unit = { diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 5923566d92a2a..212a915c80096 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -19,7 +19,6 @@ package kafka.coordinator.transaction import java.lang.management.ManagementFactory import java.nio.ByteBuffer import java.util.concurrent.{ConcurrentHashMap, CountDownLatch} -import java.util.concurrent.locks.ReentrantLock import javax.management.ObjectName import kafka.server.ReplicaManager import kafka.utils.TestUtils @@ -758,7 +757,6 @@ class TransactionStateManagerTest { ArgumentMatchers.eq(AppendOrigin.COORDINATOR), any(), any(), - any[Option[ReentrantLock]], any(), any(), any() @@ -803,7 +801,6 @@ class TransactionStateManagerTest { ArgumentMatchers.eq(AppendOrigin.COORDINATOR), any(), any(), - any[Option[ReentrantLock]], any(), any(), any() @@ -847,7 +844,6 @@ class TransactionStateManagerTest { ArgumentMatchers.eq(AppendOrigin.COORDINATOR), any(), any(), - any[Option[ReentrantLock]], any(), any(), any()) @@ -901,7 +897,6 @@ class TransactionStateManagerTest { ArgumentMatchers.eq(AppendOrigin.COORDINATOR), any(), any(), - any[Option[ReentrantLock]], any(), any(), any() @@ -1118,7 +1113,6 @@ class TransactionStateManagerTest { ArgumentMatchers.eq(AppendOrigin.COORDINATOR), recordsCapture.capture(), callbackCapture.capture(), - any[Option[ReentrantLock]], any(), any(), any() @@ -1271,7 +1265,6 @@ class TransactionStateManagerTest { origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), any[Map[TopicIdPartition, MemoryRecords]], capturedArgument.capture(), - any[Option[ReentrantLock]], any(), any(), any() diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 5155b9f3aed7a..c0e6c7b9c91f4 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -2830,7 +2830,6 @@ class KafkaApisTest extends Logging { any(), responseCallback.capture(), any(), - any(), ArgumentMatchers.eq(requestLocal), any() )).thenAnswer(_ => responseCallback.getValue.apply(Map(new TopicIdPartition(topicId,tp2) -> new PartitionResponse(Errors.NONE)))) @@ -2886,7 +2885,6 @@ class KafkaApisTest extends Logging { any(), any(), any(), - any(), ArgumentMatchers.eq(requestLocal), any()) } @@ -2965,7 +2963,6 @@ class KafkaApisTest extends Logging { entriesPerPartition.capture(), responseCallback.capture(), any(), - any(), ArgumentMatchers.eq(RequestLocal.noCaching), any() )).thenAnswer { _ =>