Skip to content

Commit

Permalink
Use Utils.closeQuietly to replace CoreUtils.swallow: all the closeables
Browse files Browse the repository at this point in the history
  • Loading branch information
bboyleonp666 committed Sep 4, 2024
1 parent 5944230 commit cbd1e54
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 49 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/AclCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ object AclCommand extends Logging {
authZ.configure(authorizerProperties.asJava)
f(authZ)
}
finally CoreUtils.swallow(authZ.close(), this)
finally Utils.closeQuietly(authZ, "authorizer")
}

def addAcls(): Unit = {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,8 @@ object SocketServer {
channel: SocketChannel,
logging: Logging
): Unit = {
CoreUtils.swallow(channel.socket().close(), logging, Level.ERROR)
CoreUtils.swallow(channel.close(), logging, Level.ERROR)
Utils.closeQuietly(channel.socket(), "channel socket")
Utils.closeQuietly(channel, "channel")
}
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import kafka.raft.KafkaMetadataLog.RetentionMsBreach
import kafka.raft.KafkaMetadataLog.RetentionSizeBreach
import kafka.raft.KafkaMetadataLog.SnapshotDeletionReason
import kafka.raft.KafkaMetadataLog.UnknownReason
import kafka.utils.{CoreUtils, Logging}
import kafka.utils.Logging
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.record.{MemoryRecords, Records}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch}
import org.apache.kafka.server.common.RequestLocal
Expand Down Expand Up @@ -682,7 +682,7 @@ object KafkaMetadataLog extends Logging {
): Unit = {
expiredSnapshots.foreach { case (snapshotId, snapshotReader) =>
snapshotReader.foreach { reader =>
CoreUtils.swallow(reader.close(), logging)
Utils.closeQuietly(reader, "reader")
}
Snapshots.deleteIfExists(logDir, snapshotId)
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ class KafkaRaftManager[T](

def shutdown(): Unit = {
CoreUtils.swallow(expirationService.shutdown(), this)
CoreUtils.swallow(expirationTimer.close(), this)
Utils.closeQuietly(expirationTimer, "expiration timer")
CoreUtils.swallow(clientDriver.shutdown(), this)
CoreUtils.swallow(scheduler.shutdown(), this)
CoreUtils.swallow(netChannel.close(), this)
CoreUtils.swallow(replicatedLog.close(), this)
Utils.closeQuietly(netChannel, "net channel")
Utils.closeQuietly(replicatedLog, "replicated log")
CoreUtils.swallow(dataDirLock.foreach(_.destroy()), this)
}

Expand Down
19 changes: 6 additions & 13 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.LogContext
import org.apache.kafka.common.utils.{LogContext, Utils}
import org.apache.kafka.common.{ClusterResource, Endpoint, Uuid}
import org.apache.kafka.controller.metrics.{ControllerMetadataMetricsPublisher, QuorumControllerMetrics}
import org.apache.kafka.controller.{Controller, QuorumController, QuorumFeatures}
Expand Down Expand Up @@ -66,15 +66,11 @@ case class ControllerMigrationSupport(
brokersRpcClient: LegacyPropagator
) {
def shutdown(logging: Logging): Unit = {
if (zkClient != null) {
CoreUtils.swallow(zkClient.close(), logging)
}
Utils.closeQuietly(zkClient, "zk client")
if (brokersRpcClient != null) {
CoreUtils.swallow(brokersRpcClient.shutdown(), logging)
}
if (migrationDriver != null) {
CoreUtils.swallow(migrationDriver.close(), logging)
}
Utils.closeQuietly(migrationDriver, "migration driver")
}
}

Expand Down Expand Up @@ -474,10 +470,8 @@ class ControllerServer(
// smoother transition.
sharedServer.ensureNotRaftLeader()
incarnationId = null
if (registrationManager != null) {
CoreUtils.swallow(registrationManager.close(), this)
registrationManager = null
}
Utils.closeQuietly(registrationManager, "registration manager")
registrationManager = null
if (registrationChannelManager != null) {
CoreUtils.swallow(registrationChannelManager.shutdown(), this)
registrationChannelManager = null
Expand Down Expand Up @@ -514,8 +508,7 @@ class ControllerServer(
CoreUtils.swallow(quotaManagers.shutdown(), this)
if (controller != null)
controller.close()
if (quorumControllerMetrics != null)
CoreUtils.swallow(quorumControllerMetrics.close(), this)
Utils.closeQuietly(quorumControllerMetrics, "quorum controller metrics")
CoreUtils.swallow(authorizer.foreach(_.close()), this)
createTopicPolicy.foreach(policy => CoreUtils.swallow(policy.close(), this))
alterConfigPolicy.foreach(policy => CoreUtils.swallow(policy.close(), this))
Expand Down
38 changes: 13 additions & 25 deletions core/src/main/scala/kafka/server/SharedServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import kafka.server.metadata.BrokerServerMetrics
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
import org.apache.kafka.controller.metrics.ControllerMetadataMetrics
import org.apache.kafka.image.MetadataProvenance
import org.apache.kafka.image.loader.MetadataLoader
Expand Down Expand Up @@ -366,34 +366,22 @@ class SharedServer(
if (snapshotGenerator != null) {
CoreUtils.swallow(snapshotGenerator.beginShutdown(), this)
}
if (loader != null) {
CoreUtils.swallow(loader.close(), this)
loader = null
}
if (metadataLoaderMetrics != null) {
CoreUtils.swallow(metadataLoaderMetrics.close(), this)
metadataLoaderMetrics = null
}
if (snapshotGenerator != null) {
CoreUtils.swallow(snapshotGenerator.close(), this)
snapshotGenerator = null
}
Utils.closeQuietly(loader, "loader")
loader = null
Utils.closeQuietly(metadataLoaderMetrics, "metadata loader metrics")
metadataLoaderMetrics = null
Utils.closeQuietly(snapshotGenerator, "snapshot generator")
snapshotGenerator = null
if (raftManager != null) {
CoreUtils.swallow(raftManager.shutdown(), this)
raftManager = null
}
if (controllerServerMetrics != null) {
CoreUtils.swallow(controllerServerMetrics.close(), this)
controllerServerMetrics = null
}
if (brokerMetrics != null) {
CoreUtils.swallow(brokerMetrics.close(), this)
brokerMetrics = null
}
if (metrics != null) {
CoreUtils.swallow(metrics.close(), this)
metrics = null
}
Utils.closeQuietly(controllerServerMetrics, "controller server metrics")
controllerServerMetrics = null
Utils.closeQuietly(brokerMetrics, "broker metrics")
brokerMetrics = null
Utils.closeQuietly(metrics, "metrics")
metrics = null
CoreUtils.swallow(AppInfoParser.unregisterAppInfo(MetricsPrefix, sharedServerConfig.nodeId.toString, metrics), this)
started = false
}
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/kafka/tools/TestRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ class TestRaftServer(
CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
if (socketServer != null)
CoreUtils.swallow(socketServer.shutdown(), this)
if (metrics != null)
CoreUtils.swallow(metrics.close(), this)
Utils.closeQuietly(metrics, "metrics")
shutdownLatch.countDown()
}

Expand Down
4 changes: 4 additions & 0 deletions core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ import java.nio.file.Files
import java.text.MessageFormat
import java.util.{Locale, Properties, UUID}

<<<<<<< HEAD
import kafka.utils.{Exit, Logging}
=======
import kafka.utils.Logging
>>>>>>> 96e277cded (Use Utils.closeQuietly to replace CoreUtils.swallow: all the closeables)

import scala.jdk.CollectionConverters._
import org.apache.commons.lang.text.StrSubstitutor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,13 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, RecordValidationStats}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
<<<<<<< HEAD
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.common.RequestLocal
=======
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.common.utils.{Time, Utils}
>>>>>>> 96e277cded (Use Utils.closeQuietly to replace CoreUtils.swallow: all the closeables)
import org.apache.kafka.server.util.timer.{MockTimer, Timer}
import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, VerificationGuard}
Expand Down

0 comments on commit cbd1e54

Please sign in to comment.