Skip to content

KAFKA-19144 Move DelayedProduce to server module #19793

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from

Conversation

johnny94
Copy link
Contributor

This PR moves DelayedProduce to the server module. One notable change is that the type of the responseCallback parameter in ReplicaManager#appendRecords() has been changed to a Java Map. Other related type changes have been made accordingly.

@github-actions github-actions bot added triage PRs from the community core Kafka Broker labels May 23, 2025
@johnny94 johnny94 force-pushed the kafka-19144_move_delayed_produce_to_server_module branch from 03e76dc to 9f2951e Compare May 23, 2025 17:04
Copy link
Member

@FrankYang0529 FrankYang0529 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch. Leave some minor comments.

Comment on lines 1011 to 1012
val produceResponseStatus = initialProduceStatus.map { case (k, status) => k -> status.responseStatus }
responseCallback(produceResponseStatus)
responseCallback(produceResponseStatus.asJava)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use initialProduceStatus to build a Scala map and transfer to Java. Probably, we can build a Java map directly, so we can avoid asJava here.

      val produceResponseStatus = new util.HashMap[TopicIdPartition, PartitionResponse]()
      initialProduceStatus.foreach { case (k, status) => produceResponseStatus.put(k, status.responseStatus()) }
      responseCallback(produceResponseStatus)

@@ -1006,7 +1026,7 @@ class ReplicaManager(val config: KafkaConfig,
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset
)
}
responseCallback(responseStatus)
responseCallback(responseStatus.asJava)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

    val responseStatus = new util.HashMap[TopicIdPartition, PartitionResponse]()
    entries.foreach { case (topicIdPartition, _) =>
      responseStatus.put(topicIdPartition, new PartitionResponse(
        Errors.INVALID_REQUIRED_ACKS,
        LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset,
        RecordBatch.NO_TIMESTAMP,
        LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset
      ))
    }
    responseCallback(responseStatus)

@github-actions github-actions bot removed the triage PRs from the community label May 25, 2025
@chia7712
Copy link
Member

I will review this PR after #19798 gets merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants