-
Notifications
You must be signed in to change notification settings - Fork 14.4k
KAFKA-19144 Move DelayedProduce to server module #19793
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -50,6 +50,7 @@ import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, Trans | |||||||||||
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta} | ||||||||||||
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER | ||||||||||||
import org.apache.kafka.metadata.MetadataCache | ||||||||||||
import org.apache.kafka.server.DelayedProduce.{ProduceMetadata, ProducePartitionStatus} | ||||||||||||
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition} | ||||||||||||
import org.apache.kafka.server.log.remote.TopicPartitionLog | ||||||||||||
import org.apache.kafka.server.config.ReplicationConfigs | ||||||||||||
|
@@ -61,7 +62,7 @@ import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFe | |||||||||||
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData} | ||||||||||||
import org.apache.kafka.server.util.timer.{SystemTimer, TimerTask} | ||||||||||||
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread} | ||||||||||||
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common} | ||||||||||||
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, DelayedProduce, common} | ||||||||||||
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints} | ||||||||||||
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard} | ||||||||||||
import org.apache.kafka.storage.log.metrics.BrokerTopicStats | ||||||||||||
|
@@ -76,6 +77,7 @@ import java.util.{Collections, Optional, OptionalInt, OptionalLong} | |||||||||||
import java.util.function.Consumer | ||||||||||||
import scala.collection.{Map, Seq, Set, immutable, mutable} | ||||||||||||
import scala.jdk.CollectionConverters._ | ||||||||||||
import scala.jdk.FunctionConverters.enrichAsJavaConsumer | ||||||||||||
import scala.jdk.OptionConverters.{RichOption, RichOptional} | ||||||||||||
|
||||||||||||
/* | ||||||||||||
|
@@ -732,7 +734,7 @@ class ReplicaManager(val config: KafkaConfig, | |||||||||||
internalTopicsAllowed: Boolean, | ||||||||||||
origin: AppendOrigin, | ||||||||||||
entriesPerPartition: Map[TopicIdPartition, MemoryRecords], | ||||||||||||
responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit, | ||||||||||||
responseCallback: util.Map[TopicIdPartition, PartitionResponse] => Unit, | ||||||||||||
recordValidationStatsCallback: Map[TopicIdPartition, RecordValidationStats] => Unit = _ => (), | ||||||||||||
requestLocal: RequestLocal = RequestLocal.noCaching, | ||||||||||||
verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty): Unit = { | ||||||||||||
|
@@ -846,8 +848,8 @@ class ReplicaManager(val config: KafkaConfig, | |||||||||||
|
||||||||||||
val preAppendPartitionResponses = buildProducePartitionStatus(errorResults).map { case (k, status) => k -> status.responseStatus } | ||||||||||||
|
||||||||||||
def newResponseCallback(responses: Map[TopicIdPartition, PartitionResponse]): Unit = { | ||||||||||||
responseCallback(preAppendPartitionResponses ++ responses) | ||||||||||||
def newResponseCallback(responses: util.Map[TopicIdPartition, PartitionResponse]): Unit = { | ||||||||||||
responseCallback(preAppendPartitionResponses ++ responses.asScala) | ||||||||||||
} | ||||||||||||
|
||||||||||||
appendRecords( | ||||||||||||
|
@@ -922,7 +924,7 @@ class ReplicaManager(val config: KafkaConfig, | |||||||||||
results: Map[TopicIdPartition, LogAppendResult] | ||||||||||||
): Map[TopicIdPartition, ProducePartitionStatus] = { | ||||||||||||
results.map { case (topicIdPartition, result) => | ||||||||||||
topicIdPartition -> ProducePartitionStatus( | ||||||||||||
topicIdPartition -> new ProducePartitionStatus( | ||||||||||||
result.info.lastOffset + 1, // required offset | ||||||||||||
new PartitionResponse( | ||||||||||||
result.error, | ||||||||||||
|
@@ -967,12 +969,30 @@ class ReplicaManager(val config: KafkaConfig, | |||||||||||
entriesPerPartition: Map[TopicIdPartition, MemoryRecords], | ||||||||||||
initialAppendResults: Map[TopicIdPartition, LogAppendResult], | ||||||||||||
initialProduceStatus: Map[TopicIdPartition, ProducePartitionStatus], | ||||||||||||
responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit, | ||||||||||||
responseCallback: util.Map[TopicIdPartition, PartitionResponse] => Unit, | ||||||||||||
): Unit = { | ||||||||||||
if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, initialAppendResults)) { | ||||||||||||
// create delayed produce operation | ||||||||||||
val produceMetadata = ProduceMetadata(requiredAcks, initialProduceStatus) | ||||||||||||
val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, this, responseCallback) | ||||||||||||
val produceMetadata = new ProduceMetadata(requiredAcks, initialProduceStatus.asJava) | ||||||||||||
|
||||||||||||
def delegate(tp: TopicPartition, status: ProducePartitionStatus) : Unit = { | ||||||||||||
val (hasEnough, error) = getPartitionOrError(tp) match { | ||||||||||||
case Left(err) => | ||||||||||||
// Case A | ||||||||||||
(false, err) | ||||||||||||
|
||||||||||||
case Right(partition) => | ||||||||||||
partition.checkEnoughReplicasReachOffset(status.requiredOffset) | ||||||||||||
} | ||||||||||||
|
||||||||||||
// Case B || C.1 || C.2 | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||||||||||||
if (error != Errors.NONE || hasEnough) { | ||||||||||||
status.setAcksPending(false) | ||||||||||||
status.responseStatus.error = error | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, delegate, responseCallback.asJava) | ||||||||||||
|
||||||||||||
// create a list of (topic, partition) pairs to use as keys for this delayed produce operation | ||||||||||||
val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toList | ||||||||||||
|
@@ -983,23 +1003,25 @@ class ReplicaManager(val config: KafkaConfig, | |||||||||||
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys.asJava) | ||||||||||||
} else { | ||||||||||||
// we can respond immediately | ||||||||||||
val produceResponseStatus = initialProduceStatus.map { case (k, status) => k -> status.responseStatus } | ||||||||||||
val produceResponseStatus = new util.HashMap[TopicIdPartition, PartitionResponse] | ||||||||||||
initialProduceStatus.foreach { case (k, status) => k -> produceResponseStatus.put(k, status.responseStatus) } | ||||||||||||
Comment on lines
+1006
to
+1007
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's ok to use |
||||||||||||
responseCallback(produceResponseStatus) | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
private def sendInvalidRequiredAcksResponse( | ||||||||||||
entries: Map[TopicIdPartition, MemoryRecords], | ||||||||||||
responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit): Unit = { | ||||||||||||
responseCallback: util.Map[TopicIdPartition, PartitionResponse] => Unit): Unit = { | ||||||||||||
// If required.acks is outside accepted range, something is wrong with the client | ||||||||||||
// Just return an error and don't handle the request at all | ||||||||||||
val responseStatus = entries.map { case (topicIdPartition, _) => | ||||||||||||
topicIdPartition -> new PartitionResponse( | ||||||||||||
Errors.INVALID_REQUIRED_ACKS, | ||||||||||||
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset, | ||||||||||||
RecordBatch.NO_TIMESTAMP, | ||||||||||||
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset | ||||||||||||
) | ||||||||||||
val responseStatus = new util.HashMap[TopicIdPartition, PartitionResponse] | ||||||||||||
entries.foreach { case(topicIdPartition, _) => | ||||||||||||
responseStatus.put(topicIdPartition, new PartitionResponse( | ||||||||||||
Errors.INVALID_REQUIRED_ACKS, | ||||||||||||
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset, | ||||||||||||
RecordBatch.NO_TIMESTAMP, | ||||||||||||
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset) | ||||||||||||
) | ||||||||||||
} | ||||||||||||
responseCallback(responseStatus) | ||||||||||||
} | ||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -40,7 +40,7 @@ import org.junit.jupiter.api.Assertions._ | |||||||||||||||
import org.mockito.Mockito.mock | ||||||||||||||||
|
||||||||||||||||
import java.io.File | ||||||||||||||||
import java.util.{Map => JMap} | ||||||||||||||||
import java.util.{Optional, Map => JMap} | ||||||||||||||||
import scala.collection.Map | ||||||||||||||||
import scala.jdk.CollectionConverters._ | ||||||||||||||||
|
||||||||||||||||
|
@@ -271,9 +271,10 @@ class LocalLeaderEndPointTest extends Logging { | |||||||||||||||
origin: AppendOrigin = AppendOrigin.CLIENT, | ||||||||||||||||
requiredAcks: Short = -1): CallbackResult[PartitionResponse] = { | ||||||||||||||||
val result = new CallbackResult[PartitionResponse]() | ||||||||||||||||
def appendCallback(responses: scala.collection.Map[TopicIdPartition, PartitionResponse]): Unit = { | ||||||||||||||||
val response = responses.get(partition) | ||||||||||||||||
assertTrue(response.isDefined) | ||||||||||||||||
def appendCallback(responses: java.util.Map[TopicIdPartition, PartitionResponse]): Unit = { | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given that we already have an import alias for
Suggested change
|
||||||||||||||||
val response = Optional.ofNullable(responses.get(partition)) | ||||||||||||||||
|
||||||||||||||||
assertTrue(response.isPresent) | ||||||||||||||||
result.fire(response.get) | ||||||||||||||||
Comment on lines
+275
to
278
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1104,7 +1104,7 @@ class TransactionStateManagerTest { | |
capturedAppends: mutable.Map[TopicIdPartition, mutable.Buffer[MemoryRecords]] | ||
): Unit = { | ||
val recordsCapture: ArgumentCaptor[Map[TopicIdPartition, MemoryRecords]] = ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, MemoryRecords]]) | ||
val callbackCapture: ArgumentCaptor[Map[TopicIdPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] => Unit]) | ||
val callbackCapture: ArgumentCaptor[java.util.Map[TopicIdPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[java.util.Map[TopicIdPartition, PartitionResponse] => Unit]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please import |
||
|
||
when(replicaManager.appendRecords( | ||
anyLong(), | ||
|
@@ -1127,7 +1127,7 @@ class TransactionStateManagerTest { | |
batches += records | ||
|
||
topicPartition -> new PartitionResponse(appendError, 0L, RecordBatch.NO_TIMESTAMP, 0L) | ||
}.toMap | ||
}.toMap.asJava | ||
)) | ||
} | ||
|
||
|
@@ -1258,7 +1258,7 @@ class TransactionStateManagerTest { | |
private def prepareForTxnMessageAppend(error: Errors): Unit = { | ||
reset(replicaManager) | ||
|
||
val capturedArgument: ArgumentCaptor[Map[TopicIdPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] => Unit]) | ||
val capturedArgument: ArgumentCaptor[java.util.Map[TopicIdPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[java.util.Map[TopicIdPartition, PartitionResponse] => Unit]) | ||
when(replicaManager.appendRecords(anyLong(), | ||
anyShort(), | ||
internalTopicsAllowed = ArgumentMatchers.eq(true), | ||
|
@@ -1270,7 +1270,7 @@ class TransactionStateManagerTest { | |
any() | ||
)).thenAnswer(_ => capturedArgument.getValue.apply( | ||
Map(new TopicIdPartition(transactionTopicId, partitionId, TRANSACTION_STATE_TOPIC_NAME) -> | ||
new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L))) | ||
new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L)).asJava) | ||
) | ||
when(replicaManager.topicIdPartition(new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, 0))).thenReturn(new TopicIdPartition(transactionTopicId, 0, TRANSACTION_STATE_TOPIC_NAME)) | ||
when(replicaManager.topicIdPartition(new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, 1))).thenReturn(new TopicIdPartition(transactionTopicId, 1, TRANSACTION_STATE_TOPIC_NAME)) | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -2815,7 +2815,7 @@ class KafkaApisTest extends Logging { | |||||
val expectedErrors = Map(tp1 -> Errors.UNKNOWN_TOPIC_OR_PARTITION, tp2 -> Errors.NONE).asJava | ||||||
|
||||||
val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse]) | ||||||
val responseCallback: ArgumentCaptor[Map[TopicIdPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] => Unit]) | ||||||
val responseCallback: ArgumentCaptor[util.Map[TopicIdPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, PartitionResponse] => Unit]) | ||||||
|
||||||
when(replicaManager.onlinePartition(tp1)) | ||||||
.thenReturn(None) | ||||||
|
@@ -2832,7 +2832,7 @@ class KafkaApisTest extends Logging { | |||||
any(), | ||||||
ArgumentMatchers.eq(requestLocal), | ||||||
any() | ||||||
)).thenAnswer(_ => responseCallback.getValue.apply(Map(new TopicIdPartition(topicId,tp2) -> new PartitionResponse(Errors.NONE)))) | ||||||
)).thenAnswer(_ => responseCallback.getValue.apply(Map(new TopicIdPartition(topicId,tp2) -> new PartitionResponse(Errors.NONE)).asJava)) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
kafkaApis = createKafkaApis() | ||||||
kafkaApis.handleWriteTxnMarkersRequest(request, requestLocal) | ||||||
verify(requestChannel).sendResponse( | ||||||
|
@@ -2952,8 +2952,8 @@ class KafkaApisTest extends Logging { | |||||
|
||||||
val entriesPerPartition: ArgumentCaptor[Map[TopicIdPartition, MemoryRecords]] = | ||||||
ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, MemoryRecords]]) | ||||||
val responseCallback: ArgumentCaptor[Map[TopicIdPartition, PartitionResponse] => Unit] = | ||||||
ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] => Unit]) | ||||||
val responseCallback: ArgumentCaptor[util.Map[TopicIdPartition, PartitionResponse] => Unit] = | ||||||
ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, PartitionResponse] => Unit]) | ||||||
|
||||||
when(replicaManager.appendRecords( | ||||||
ArgumentMatchers.eq(ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT.toLong), | ||||||
|
@@ -2969,7 +2969,7 @@ class KafkaApisTest extends Logging { | |||||
responseCallback.getValue.apply( | ||||||
entriesPerPartition.getValue.keySet.map { tp => | ||||||
tp -> new PartitionResponse(Errors.NONE) | ||||||
}.toMap | ||||||
}.toMap.asJava | ||||||
) | ||||||
} | ||||||
kafkaApis = createKafkaApis() | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this comment mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this is relevant to the comment of
DelayedProduce,
but it's confusing that these comments are standalone here.Could you write the whole meaning of these cases, or link these comments to
tryComplete
?