Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into trunk
Browse files Browse the repository at this point in the history
  • Loading branch information
LoganZhuZzz committed Jun 28, 2024
2 parents 2d06142 + e57cbe0 commit 1ad9d29
Showing 1 changed file with 8 additions and 9 deletions.
17 changes: 8 additions & 9 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -936,9 +936,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
erroneous.foreach { case (tp, data) => partitions.put(tp, data) }

var unconvertedFetchResponse: FetchResponse = null

def createResponse(throttleTimeMs: Int): FetchResponse = {
def createResponse(throttleTimeMs: Int, unconvertedFetchResponse: FetchResponse): FetchResponse = {
// Down-convert messages for each partition if required
val convertedData = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData]
unconvertedFetchResponse.data().responses().forEach { topicResponse =>
Expand Down Expand Up @@ -982,13 +980,13 @@ class KafkaApis(val requestChannel: RequestChannel,

if (fetchRequest.isFromFollower) {
// We've already evaluated against the quota and are good to go. Just need to record it now.
unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
val unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
val responseSize = KafkaApis.sizeOfThrottledPartitions(versionId, unconvertedFetchResponse, quotas.leader)
quotas.leader.record(responseSize)
val responsePartitionsSize = unconvertedFetchResponse.data().responses().stream().mapToInt(_.partitions().size()).sum()
trace(s"Sending Fetch response with partitions.size=$responsePartitionsSize, " +
s"metadata=${unconvertedFetchResponse.sessionId}")
requestHelper.sendResponseExemptThrottle(request, createResponse(0), Some(updateConversionStats))
requestHelper.sendResponseExemptThrottle(request, createResponse(0, unconvertedFetchResponse), Some(updateConversionStats))
} else {
// Fetch size used to determine throttle time is calculated before any down conversions.
// This may be slightly different from the actual response size. But since down conversions
Expand All @@ -1003,7 +1001,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val bandwidthThrottleTimeMs = quotas.fetch.maybeRecordAndGetThrottleTimeMs(request, responseSize, timeMs)

val maxThrottleTimeMs = math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)
if (maxThrottleTimeMs > 0) {
val unconvertedFetchResponse = if (maxThrottleTimeMs > 0) {
request.apiThrottleTimeMs = maxThrottleTimeMs
// Even if we need to throttle for request quota violation, we should "unrecord" the already recorded value
// from the fetch quota because we are going to return an empty response.
Expand All @@ -1014,17 +1012,18 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.throttle(quotas.request, request, requestThrottleTimeMs)
}
// If throttling is required, return an empty response.
unconvertedFetchResponse = fetchContext.getThrottledResponse(maxThrottleTimeMs)
fetchContext.getThrottledResponse(maxThrottleTimeMs)
} else {
// Get the actual response. This will update the fetch context.
unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
val unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
val responsePartitionsSize = unconvertedFetchResponse.data().responses().stream().mapToInt(_.partitions().size()).sum()
trace(s"Sending Fetch response with partitions.size=$responsePartitionsSize, " +
s"metadata=${unconvertedFetchResponse.sessionId}")
unconvertedFetchResponse
}

// Send the response immediately.
requestChannel.sendResponse(request, createResponse(maxThrottleTimeMs), Some(updateConversionStats))
requestChannel.sendResponse(request, createResponse(maxThrottleTimeMs, unconvertedFetchResponse), Some(updateConversionStats))
}
}

Expand Down

0 comments on commit 1ad9d29

Please sign in to comment.