Skip to content

Commit 6e380fb

Browse files
authored
KAFKA-19322 Remove the DelayedOperation constructor that accepts an external lock (#19798)
Remove the DelayedOperation constructor that accepts an external lock. According to this [PR](#19759). Reviewers: Ken Huang <[email protected]>, PoAn Yang <[email protected]>, TengYao Chi <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 0600abd commit 6e380fb

File tree

7 files changed

+6
-37
lines changed

7 files changed

+6
-37
lines changed

core/src/main/java/kafka/server/share/DelayedShareFetch.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ public DelayedShareFetch(
172172
Uuid fetchId,
173173
long remoteFetchMaxWaitMs
174174
) {
175-
super(shareFetch.fetchParams().maxWaitMs, Optional.empty());
175+
super(shareFetch.fetchParams().maxWaitMs);
176176
this.shareFetch = shareFetch;
177177
this.replicaManager = replicaManager;
178178
this.partitionsAcquired = new LinkedHashMap<>();

core/src/main/scala/kafka/server/DelayedProduce.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package kafka.server
1919

2020
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
21-
import java.util.concurrent.locks.Lock
2221
import com.typesafe.scalalogging.Logger
2322
import com.yammer.metrics.core.Meter
2423
import kafka.utils.Logging
@@ -30,7 +29,6 @@ import org.apache.kafka.server.purgatory.DelayedOperation
3029

3130
import scala.collection._
3231
import scala.jdk.CollectionConverters._
33-
import scala.jdk.OptionConverters.RichOption
3432

3533
case class ProducePartitionStatus(requiredOffset: Long, responseStatus: PartitionResponse) {
3634
@volatile var acksPending = false
@@ -59,9 +57,8 @@ object DelayedProduce {
5957
class DelayedProduce(delayMs: Long,
6058
produceMetadata: ProduceMetadata,
6159
replicaManager: ReplicaManager,
62-
responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit,
63-
lockOpt: Option[Lock])
64-
extends DelayedOperation(delayMs, lockOpt.toJava) with Logging {
60+
responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit)
61+
extends DelayedOperation(delayMs) with Logging {
6562

6663
override lazy val logger: Logger = DelayedProduce.logger
6764

core/src/main/scala/kafka/server/ReplicaManager.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ import java.lang.{Long => JLong}
7171
import java.nio.file.{Files, Paths}
7272
import java.util
7373
import java.util.concurrent.atomic.AtomicBoolean
74-
import java.util.concurrent.locks.Lock
7574
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, Future, RejectedExecutionException, TimeUnit}
7675
import java.util.{Collections, Optional, OptionalInt, OptionalLong}
7776
import java.util.function.Consumer
@@ -723,7 +722,6 @@ class ReplicaManager(val config: KafkaConfig,
723722
* If topic partition contains Uuid.ZERO_UUID as topicId the method
724723
* will fall back to the old behaviour and rely on topic name.
725724
* @param responseCallback callback for sending the response
726-
* @param delayedProduceLock lock for the delayed actions
727725
* @param recordValidationStatsCallback callback for updating stats on record conversions
728726
* @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the
729727
* thread calling this method
@@ -735,7 +733,6 @@ class ReplicaManager(val config: KafkaConfig,
735733
origin: AppendOrigin,
736734
entriesPerPartition: Map[TopicIdPartition, MemoryRecords],
737735
responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit,
738-
delayedProduceLock: Option[Lock] = None,
739736
recordValidationStatsCallback: Map[TopicIdPartition, RecordValidationStats] => Unit = _ => (),
740737
requestLocal: RequestLocal = RequestLocal.noCaching,
741738
verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty): Unit = {
@@ -762,7 +759,6 @@ class ReplicaManager(val config: KafkaConfig,
762759

763760
maybeAddDelayedProduce(
764761
requiredAcks,
765-
delayedProduceLock,
766762
timeout,
767763
entriesPerPartition,
768764
localProduceResults,
@@ -967,7 +963,6 @@ class ReplicaManager(val config: KafkaConfig,
967963

968964
private def maybeAddDelayedProduce(
969965
requiredAcks: Short,
970-
delayedProduceLock: Option[Lock],
971966
timeoutMs: Long,
972967
entriesPerPartition: Map[TopicIdPartition, MemoryRecords],
973968
initialAppendResults: Map[TopicIdPartition, LogAppendResult],
@@ -977,7 +972,7 @@ class ReplicaManager(val config: KafkaConfig,
977972
if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, initialAppendResults)) {
978973
// create delayed produce operation
979974
val produceMetadata = ProduceMetadata(requiredAcks, initialProduceStatus)
980-
val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, this, responseCallback, delayedProduceLock)
975+
val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, this, responseCallback)
981976

982977
// create a list of (topic, partition) pairs to use as keys for this delayed produce operation
983978
val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toList

core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package kafka.coordinator
2020
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Executors}
2121
import java.util.{Collections, Random}
2222
import java.util.concurrent.atomic.AtomicInteger
23-
import java.util.concurrent.locks.Lock
2423
import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
2524
import kafka.cluster.Partition
2625
import kafka.log.LogManager
@@ -216,7 +215,6 @@ object AbstractCoordinatorConcurrencyTest {
216215
origin: AppendOrigin,
217216
entriesPerPartition: Map[TopicIdPartition, MemoryRecords],
218217
responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit,
219-
delayedProduceLock: Option[Lock] = None,
220218
processingStatsCallback: Map[TopicIdPartition, RecordValidationStats] => Unit = _ => (),
221219
requestLocal: RequestLocal = RequestLocal.noCaching,
222220
verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty): Unit = {
@@ -227,7 +225,7 @@ object AbstractCoordinatorConcurrencyTest {
227225
case (tp, _) =>
228226
(tp, ProducePartitionStatus(0L, new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
229227
})
230-
val delayedProduce = new DelayedProduce(5, produceMetadata, this, responseCallback, delayedProduceLock) {
228+
val delayedProduce = new DelayedProduce(5, produceMetadata, this, responseCallback) {
231229
// Complete produce requests after a few attempts to trigger delayed produce from different threads
232230
val completeAttempts = new AtomicInteger
233231
override def tryComplete(): Boolean = {

core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package kafka.coordinator.transaction
1919
import java.lang.management.ManagementFactory
2020
import java.nio.ByteBuffer
2121
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
22-
import java.util.concurrent.locks.ReentrantLock
2322
import javax.management.ObjectName
2423
import kafka.server.ReplicaManager
2524
import kafka.utils.TestUtils
@@ -758,7 +757,6 @@ class TransactionStateManagerTest {
758757
ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
759758
any(),
760759
any(),
761-
any[Option[ReentrantLock]],
762760
any(),
763761
any(),
764762
any()
@@ -803,7 +801,6 @@ class TransactionStateManagerTest {
803801
ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
804802
any(),
805803
any(),
806-
any[Option[ReentrantLock]],
807804
any(),
808805
any(),
809806
any()
@@ -847,7 +844,6 @@ class TransactionStateManagerTest {
847844
ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
848845
any(),
849846
any(),
850-
any[Option[ReentrantLock]],
851847
any(),
852848
any(),
853849
any())
@@ -901,7 +897,6 @@ class TransactionStateManagerTest {
901897
ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
902898
any(),
903899
any(),
904-
any[Option[ReentrantLock]],
905900
any(),
906901
any(),
907902
any()
@@ -1118,7 +1113,6 @@ class TransactionStateManagerTest {
11181113
ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
11191114
recordsCapture.capture(),
11201115
callbackCapture.capture(),
1121-
any[Option[ReentrantLock]],
11221116
any(),
11231117
any(),
11241118
any()
@@ -1271,7 +1265,6 @@ class TransactionStateManagerTest {
12711265
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
12721266
any[Map[TopicIdPartition, MemoryRecords]],
12731267
capturedArgument.capture(),
1274-
any[Option[ReentrantLock]],
12751268
any(),
12761269
any(),
12771270
any()

core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2830,7 +2830,6 @@ class KafkaApisTest extends Logging {
28302830
any(),
28312831
responseCallback.capture(),
28322832
any(),
2833-
any(),
28342833
ArgumentMatchers.eq(requestLocal),
28352834
any()
28362835
)).thenAnswer(_ => responseCallback.getValue.apply(Map(new TopicIdPartition(topicId,tp2) -> new PartitionResponse(Errors.NONE))))
@@ -2886,7 +2885,6 @@ class KafkaApisTest extends Logging {
28862885
any(),
28872886
any(),
28882887
any(),
2889-
any(),
28902888
ArgumentMatchers.eq(requestLocal),
28912889
any())
28922890
}
@@ -2965,7 +2963,6 @@ class KafkaApisTest extends Logging {
29652963
entriesPerPartition.capture(),
29662964
responseCallback.capture(),
29672965
any(),
2968-
any(),
29692966
ArgumentMatchers.eq(RequestLocal.noCaching),
29702967
any()
29712968
)).thenAnswer { _ =>

server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
import org.apache.kafka.server.util.timer.TimerTask;
2020

21-
import java.util.Optional;
22-
import java.util.concurrent.locks.Lock;
2321
import java.util.concurrent.locks.ReentrantLock;
2422

2523
/**
@@ -41,19 +39,10 @@ public abstract class DelayedOperation extends TimerTask {
4139

4240
private volatile boolean completed = false;
4341

44-
protected final Lock lock;
45-
46-
public DelayedOperation(long delayMs, Optional<Lock> lockOpt) {
47-
this(delayMs, lockOpt.orElse(new ReentrantLock()));
48-
}
42+
protected final ReentrantLock lock = new ReentrantLock();
4943

5044
public DelayedOperation(long delayMs) {
51-
this(delayMs, new ReentrantLock());
52-
}
53-
54-
public DelayedOperation(long delayMs, Lock lock) {
5545
super(delayMs);
56-
this.lock = lock;
5746
}
5847

5948
/*

0 commit comments

Comments
 (0)