Skip to content

KAFKA-19322: Remove the DelayedOperation constructor that accepts an external lock #19798

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 26, 2025

Conversation

Mirai1129
Copy link
Contributor

@Mirai1129 Mirai1129 commented May 23, 2025

Remove the DelayedOperation constructor that accepts an external lock.

According to this PR.

Reviewers: Ken Huang [email protected], PoAn Yang
[email protected], TengYao Chi [email protected], Chia-Ping Tsai
[email protected]

@github-actions github-actions bot added triage PRs from the community core Kafka Broker small Small PRs labels May 23, 2025
@chia7712
Copy link
Member

@Mirai1129 please fix the build by removing the usages of the removed constructor

@github-actions github-actions bot added the KIP-932 Queues for Kafka label May 23, 2025
@Mirai1129
Copy link
Contributor Author

@chia7712 Thank you so much! Fixed!

@github-actions github-actions bot removed the triage PRs from the community label May 25, 2025
@chia7712
Copy link
Member

@Mirai1129 please rebase the code to include #19759

# Conflicts:
#	server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java
@Mirai1129
Copy link
Contributor Author

Done!

public DelayedOperation(long delayMs, Optional<Lock> lockOpt) {
this(delayMs, lockOpt.orElse(new ReentrantLock()));
}
protected final Lock lock = new ReentrantLock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please change the type from Lock to ReentrantLock? This class requires some features of ReentrantLock.

@@ -977,7 +977,7 @@ class ReplicaManager(val config: KafkaConfig,
if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, initialAppendResults)) {
// create delayed produce operation
val produceMetadata = ProduceMetadata(requiredAcks, initialProduceStatus)
val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, this, responseCallback, delayedProduceLock)
val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, this, responseCallback)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove delayedProduceLock from method signature too

@chia7712
Copy link
Member

Please cleanup appendRecords as well

Copy link
Contributor

@frankvicky frankvicky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Mirai1129: LGTM

@chia7712 chia7712 merged commit 6e380fb into apache:trunk May 26, 2025
24 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker KIP-932 Queues for Kafka small Small PRs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants