-
Notifications
You must be signed in to change notification settings - Fork 14.4k
KAFKA-19144 Move DelayedProduce to server module #19793
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
KAFKA-19144 Move DelayedProduce to server module #19793
Conversation
03e76dc
to
9f2951e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch. Leave some minor comments.
val produceResponseStatus = initialProduceStatus.map { case (k, status) => k -> status.responseStatus } | ||
responseCallback(produceResponseStatus) | ||
responseCallback(produceResponseStatus.asJava) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use initialProduceStatus
to build a Scala map and transfer to Java. Probably, we can build a Java map directly, so we can avoid asJava
here.
val produceResponseStatus = new util.HashMap[TopicIdPartition, PartitionResponse]()
initialProduceStatus.foreach { case (k, status) => produceResponseStatus.put(k, status.responseStatus()) }
responseCallback(produceResponseStatus)
@@ -1006,7 +1026,7 @@ class ReplicaManager(val config: KafkaConfig, | |||
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset | |||
) | |||
} | |||
responseCallback(responseStatus) | |||
responseCallback(responseStatus.asJava) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
val responseStatus = new util.HashMap[TopicIdPartition, PartitionResponse]()
entries.foreach { case (topicIdPartition, _) =>
responseStatus.put(topicIdPartition, new PartitionResponse(
Errors.INVALID_REQUIRED_ACKS,
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset,
RecordBatch.NO_TIMESTAMP,
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset
))
}
responseCallback(responseStatus)
I will review this PR after #19798 gets merged. |
This PR moves
DelayedProduce
to the server module. One notable change is that the type of theresponseCallback
parameter inReplicaManager#appendRecords()
has been changed to a JavaMap
. Other related type changes have been made accordingly.