Skip to content

Commit

Permalink
fix(core): ensure partition closed before metadata listener close (#1067
Browse files Browse the repository at this point in the history
)

Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Mar 31, 2024
1 parent 1dc877b commit 676cec6
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,16 @@ class BrokerServer(
}
}

// AutoMQ for Kafka inject start
// https://github.com/AutoMQ/automq-for-kafka/issues/540
// await partition shutdown:
// 1. after lifecycleManager start shutdown to trigger partitions gracefully reassign.
// 2. before metadataListener start close to ensure S3Stream can read the latest metadata.
if (replicaManager != null) {
CoreUtils.swallow(replicaManager.awaitAllPartitionShutdown(), this)
}
// AutoMQ for Kafka inject end

if (metadataListener != null)
metadataListener.beginShutdown()

Expand All @@ -547,14 +557,6 @@ class BrokerServer(
CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
CoreUtils.swallow(authorizer.foreach(_.close()), this)

// AutoMQ for Kafka inject start
// https://github.com/AutoMQ/automq-for-kafka/issues/540
// await partition shutdown before metadataListener.close()
if (replicaManager != null) {
CoreUtils.swallow(replicaManager.awaitAllPartitionShutdown(), this)
}
// AutoMQ for Kafka inject end

if (metadataListener != null) {
CoreUtils.swallow(metadataListener.close(), this)
}
Expand Down

0 comments on commit 676cec6

Please sign in to comment.