Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17285: Consider using Utils.closeQuietly to replace CoreUtils.swallow when handling Closeable objects #16843

Merged
merged 14 commits into from
Oct 3, 2024
Merged
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/AclCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ object AclCommand extends Logging {
authZ.configure(authorizerProperties.asJava)
f(authZ)
}
finally CoreUtils.swallow(authZ.close(), this)
finally Utils.closeQuietly(authZ, "authorizer")
}

def addAcls(): Unit = {
Expand Down
10 changes: 4 additions & 6 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,8 @@ object SocketServer {
channel: SocketChannel,
logging: Logging
): Unit = {
CoreUtils.swallow(channel.socket().close(), logging, Level.ERROR)
CoreUtils.swallow(channel.close(), logging, Level.ERROR)
Utils.closeQuietly(channel.socket, "channel socket")
Utils.closeQuietly(channel, "channel")
}
}

Expand Down Expand Up @@ -709,10 +709,8 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
private def closeAll(): Unit = {
debug("Closing server socket, selector, and any throttled sockets.")
// The serverChannel will be null if Acceptor's thread is not started
if (serverChannel != null) {
CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
}
CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
Utils.closeQuietly(serverChannel, "Acceptor serverChannel")
Utils.closeQuietly(nioSelector, "Acceptor nioSelector")
throttledSockets.foreach(throttledSocket => closeSocket(throttledSocket.socket, this))
throttledSockets.clear()
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import kafka.raft.KafkaMetadataLog.RetentionMsBreach
import kafka.raft.KafkaMetadataLog.RetentionSizeBreach
import kafka.raft.KafkaMetadataLog.SnapshotDeletionReason
import kafka.raft.KafkaMetadataLog.UnknownReason
import kafka.utils.{CoreUtils, Logging}
import kafka.utils.Logging
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.record.{MemoryRecords, Records}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch}
import org.apache.kafka.server.common.RequestLocal
Expand Down Expand Up @@ -683,7 +683,7 @@ object KafkaMetadataLog extends Logging {
): Unit = {
expiredSnapshots.foreach { case (snapshotId, snapshotReader) =>
snapshotReader.foreach { reader =>
CoreUtils.swallow(reader.close(), logging)
Utils.closeQuietly(reader, "reader")
}
Snapshots.deleteIfExists(logDir, snapshotId)
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ class KafkaRaftManager[T](

def shutdown(): Unit = {
CoreUtils.swallow(expirationService.shutdown(), this)
CoreUtils.swallow(expirationTimer.close(), this)
Utils.closeQuietly(expirationTimer, "expiration timer")
CoreUtils.swallow(clientDriver.shutdown(), this)
CoreUtils.swallow(scheduler.shutdown(), this)
CoreUtils.swallow(netChannel.close(), this)
CoreUtils.swallow(replicatedLog.close(), this)
Utils.closeQuietly(netChannel, "net channel")
Utils.closeQuietly(replicatedLog, "replicated log")
CoreUtils.swallow(dataDirLock.foreach(_.destroy()), this)
}

Expand Down
14 changes: 6 additions & 8 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid}
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord
import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics}
Expand Down Expand Up @@ -712,7 +712,7 @@ class BrokerServer(
CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
if (dataPlaneRequestProcessor != null)
CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
CoreUtils.swallow(authorizer.foreach(_.close()), this)
authorizer.foreach(Utils.closeQuietly(_, "authorizer"))

/**
* We must shutdown the scheduler early because otherwise, the scheduler could touch other
Expand Down Expand Up @@ -757,23 +757,21 @@ class BrokerServer(

// Close remote log manager to give a chance to any of its underlying clients
// (especially in RemoteStorageManager and RemoteLogMetadataManager) to close gracefully.
CoreUtils.swallow(remoteLogManagerOpt.foreach(_.close()), this)
remoteLogManagerOpt.foreach(Utils.closeQuietly(_, "remote log manager"))

if (quotaManagers != null)
CoreUtils.swallow(quotaManagers.shutdown(), this)

if (socketServer != null)
CoreUtils.swallow(socketServer.shutdown(), this)
if (brokerTopicStats != null)
CoreUtils.swallow(brokerTopicStats.close(), this)
if (sharePartitionManager != null)
CoreUtils.swallow(sharePartitionManager.close(), this)
Utils.closeQuietly(brokerTopicStats, "broker topic stats")
Utils.closeQuietly(sharePartitionManager, "share partition manager")

isShuttingDown.set(false)

CoreUtils.swallow(lifecycleManager.close(), this)
CoreUtils.swallow(config.dynamicConfig.clear(), this)
CoreUtils.swallow(clientMetricsManager.close(), this)
Utils.closeQuietly(clientMetricsManager, "client metrics manager")
sharedServer.stopForBroker()
info("shut down completed")
} catch {
Expand Down
46 changes: 16 additions & 30 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.LogContext
import org.apache.kafka.common.utils.{LogContext, Utils}
import org.apache.kafka.common.{ClusterResource, Endpoint, Uuid}
import org.apache.kafka.controller.metrics.{ControllerMetadataMetricsPublisher, QuorumControllerMetrics}
import org.apache.kafka.controller.{Controller, QuorumController, QuorumFeatures}
Expand Down Expand Up @@ -66,15 +66,11 @@ case class ControllerMigrationSupport(
brokersRpcClient: LegacyPropagator
) {
def shutdown(logging: Logging): Unit = {
if (zkClient != null) {
CoreUtils.swallow(zkClient.close(), logging)
}
Utils.closeQuietly(zkClient, "zk client")
if (brokersRpcClient != null) {
CoreUtils.swallow(brokersRpcClient.shutdown(), logging)
}
if (migrationDriver != null) {
CoreUtils.swallow(migrationDriver.close(), logging)
}
Utils.closeQuietly(migrationDriver, "migration driver")
}
}

Expand Down Expand Up @@ -475,10 +471,8 @@ class ControllerServer(
// smoother transition.
sharedServer.ensureNotRaftLeader()
incarnationId = null
if (registrationManager != null) {
CoreUtils.swallow(registrationManager.close(), this)
registrationManager = null
}
Utils.closeQuietly(registrationManager, "registration manager")
registrationManager = null
if (registrationChannelManager != null) {
CoreUtils.swallow(registrationChannelManager.shutdown(), this)
registrationChannelManager = null
Expand All @@ -488,18 +482,12 @@ class ControllerServer(
if (metadataCache != null) {
metadataCache = null
}
if (metadataCachePublisher != null) {
metadataCachePublisher.close()
metadataCachePublisher = null
}
if (featuresPublisher != null) {
featuresPublisher.close()
featuresPublisher = null
}
if (registrationsPublisher != null) {
registrationsPublisher.close()
registrationsPublisher = null
}
Utils.closeQuietly(metadataCachePublisher, "metadata cache publisher")
metadataCachePublisher = null
Utils.closeQuietly(featuresPublisher, "features publisher")
featuresPublisher = null
Utils.closeQuietly(registrationsPublisher, "registrations publisher")
registrationsPublisher = null
bboyleonp666 marked this conversation as resolved.
Show resolved Hide resolved
if (socketServer != null)
CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
migrationSupport.foreach(_.shutdown(this))
Expand All @@ -513,13 +501,11 @@ class ControllerServer(
CoreUtils.swallow(controllerApis.close(), this)
if (quotaManagers != null)
CoreUtils.swallow(quotaManagers.shutdown(), this)
if (controller != null)
controller.close()
if (quorumControllerMetrics != null)
CoreUtils.swallow(quorumControllerMetrics.close(), this)
CoreUtils.swallow(authorizer.foreach(_.close()), this)
createTopicPolicy.foreach(policy => CoreUtils.swallow(policy.close(), this))
alterConfigPolicy.foreach(policy => CoreUtils.swallow(policy.close(), this))
Utils.closeQuietly(controller, "controller")
Utils.closeQuietly(quorumControllerMetrics, "quorum controller metrics")
authorizer.foreach(Utils.closeQuietly(_, "authorizer"))
createTopicPolicy.foreach(policy => Utils.closeQuietly(policy, "create topic policy"))
alterConfigPolicy.foreach(policy => Utils.closeQuietly(policy, "alter config policy"))
socketServerFirstBoundPortFuture.completeExceptionally(new RuntimeException("shutting down"))
CoreUtils.swallow(config.dynamicConfig.clear(), this)
sharedServer.stopForController()
Expand Down
13 changes: 5 additions & 8 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,7 @@ class KafkaServer(
CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
if (controlPlaneRequestProcessor != null)
CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
CoreUtils.swallow(authorizer.foreach(_.close()), this)
authorizer.foreach(Utils.closeQuietly(_, "authorizer"))
if (adminManager != null)
CoreUtils.swallow(adminManager.shutdown(), this)

Expand Down Expand Up @@ -1048,13 +1048,12 @@ class KafkaServer(
// Close remote log manager before stopping processing requests, to give a chance to any
// of its underlying clients (especially in RemoteStorageManager and RemoteLogMetadataManager)
// to close gracefully.
CoreUtils.swallow(remoteLogManagerOpt.foreach(_.close()), this)
remoteLogManagerOpt.foreach(Utils.closeQuietly(_, "remote log manager"))

if (featureChangeListener != null)
CoreUtils.swallow(featureChangeListener.close(), this)

if (zkClient != null)
CoreUtils.swallow(zkClient.close(), this)
Utils.closeQuietly(zkClient, "zk client")

if (quotaManagers != null)
CoreUtils.swallow(quotaManagers.shutdown(), this)
Expand All @@ -1065,10 +1064,8 @@ class KafkaServer(
if (socketServer != null)
CoreUtils.swallow(socketServer.shutdown(), this)
unregisterCurrentControllerIdMetric()
if (metrics != null)
CoreUtils.swallow(metrics.close(), this)
if (brokerTopicStats != null)
CoreUtils.swallow(brokerTopicStats.close(), this)
Utils.closeQuietly(metrics, "metrics")
Utils.closeQuietly(brokerTopicStats, "broker topic stats")

// Clear all reconfigurable instances stored in DynamicBrokerConfig
config.dynamicConfig.clear()
Expand Down
38 changes: 13 additions & 25 deletions core/src/main/scala/kafka/server/SharedServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import kafka.server.Server.MetricsPrefix
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
import org.apache.kafka.controller.metrics.ControllerMetadataMetrics
import org.apache.kafka.image.MetadataProvenance
import org.apache.kafka.image.loader.MetadataLoader
Expand Down Expand Up @@ -365,34 +365,22 @@ class SharedServer(
if (snapshotGenerator != null) {
CoreUtils.swallow(snapshotGenerator.beginShutdown(), this)
}
if (loader != null) {
CoreUtils.swallow(loader.close(), this)
loader = null
}
if (metadataLoaderMetrics != null) {
CoreUtils.swallow(metadataLoaderMetrics.close(), this)
metadataLoaderMetrics = null
}
if (snapshotGenerator != null) {
CoreUtils.swallow(snapshotGenerator.close(), this)
snapshotGenerator = null
}
Utils.closeQuietly(loader, "loader")
loader = null
Utils.closeQuietly(metadataLoaderMetrics, "metadata loader metrics")
metadataLoaderMetrics = null
Utils.closeQuietly(snapshotGenerator, "snapshot generator")
snapshotGenerator = null
bboyleonp666 marked this conversation as resolved.
Show resolved Hide resolved
if (raftManager != null) {
CoreUtils.swallow(raftManager.shutdown(), this)
raftManager = null
}
if (controllerServerMetrics != null) {
CoreUtils.swallow(controllerServerMetrics.close(), this)
controllerServerMetrics = null
}
if (brokerMetrics != null) {
CoreUtils.swallow(brokerMetrics.close(), this)
brokerMetrics = null
}
if (metrics != null) {
CoreUtils.swallow(metrics.close(), this)
metrics = null
}
Utils.closeQuietly(controllerServerMetrics, "controller server metrics")
controllerServerMetrics = null
Utils.closeQuietly(brokerMetrics, "broker metrics")
brokerMetrics = null
Utils.closeQuietly(metrics, "metrics")
metrics = null
bboyleonp666 marked this conversation as resolved.
Show resolved Hide resolved
CoreUtils.swallow(AppInfoParser.unregisterAppInfo(MetricsPrefix, sharedServerConfig.nodeId.toString, metrics), this)
started = false
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/ZkAdminManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity,
import org.apache.kafka.common.requests.CreateTopicsRequest._
import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError}
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter}
import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.common.utils.{Sanitizer, Utils}
import org.apache.kafka.server.common.AdminOperationException
import org.apache.kafka.server.config.{ConfigType, QuotaConfigs, ZooKeeperInternals}
import org.apache.kafka.server.config.ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG
Expand Down Expand Up @@ -547,8 +547,8 @@ class ZkAdminManager(val config: KafkaConfig,

def shutdown(): Unit = {
topicPurgatory.shutdown()
CoreUtils.swallow(createTopicPolicy.foreach(_.close()), this)
CoreUtils.swallow(alterConfigPolicy.foreach(_.close()), this)
createTopicPolicy.foreach(Utils.closeQuietly(_, "create topic policy"))
alterConfigPolicy.foreach(Utils.closeQuietly(_, "alter config policy"))
}

private def resourceNameToBrokerId(resourceName: String): Int = {
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/kafka/tools/TestRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ class TestRaftServer(
CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
if (socketServer != null)
CoreUtils.swallow(socketServer.shutdown(), this)
if (metrics != null)
CoreUtils.swallow(metrics.close(), this)
Utils.closeQuietly(metrics, "metrics")
shutdownLatch.countDown()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.kafka.clients.consumer.GroupProtocol
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{Exit, Time}
import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.common.{DirectoryId, Uuid}
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR}
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion}
Expand Down Expand Up @@ -79,7 +79,7 @@ class ZooKeeperQuorumImplementation(
}

override def shutdown(): Unit = {
CoreUtils.swallow(zkClient.close(), log)
Utils.closeQuietly(zkClient, "zk client")
CoreUtils.swallow(zookeeper.shutdown(), log)
}
}
Expand Down Expand Up @@ -422,7 +422,7 @@ abstract class QuorumTestHarness extends Logging {
} catch {
case t: Throwable =>
CoreUtils.swallow(zookeeper.shutdown(), this)
if (zkClient != null) CoreUtils.swallow(zkClient.close(), this)
Utils.closeQuietly(zkClient, "zk client")
throw t
}
new ZooKeeperQuorumImplementation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, RecordValidationStats}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.util.timer.{MockTimer, Timer}
import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
Expand Down Expand Up @@ -71,7 +71,7 @@ abstract class AbstractCoordinatorConcurrencyTest[M <: CoordinatorMember] extend
def tearDown(): Unit = {
CoreUtils.swallow(replicaManager.shutdown(false), this)
CoreUtils.swallow(executor.shutdownNow(), this)
CoreUtils.swallow(timer.close(), this)
Utils.closeQuietly(timer, "mock timer")
CoreUtils.swallow(scheduler.shutdown(), this)
CoreUtils.swallow(time.scheduler.shutdown(), this)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetFetchResponse}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.server.common.RequestLocal
import org.junit.jupiter.api.Assertions._
Expand Down Expand Up @@ -97,7 +97,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
override def tearDown(): Unit = {
try {
CoreUtils.swallow(groupCoordinator.shutdown(), this)
CoreUtils.swallow(metrics.close(), this)
Utils.closeQuietly(metrics, "metrics")
} finally {
super.tearDown()
}
Expand Down
Loading