Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17285: Consider using Utils.closeQuietly to replace CoreUtils.swallow when handling Closeable objects #16843

Merged
merged 14 commits into from
Oct 3, 2024
Merged
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/AclCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ object AclCommand extends Logging {
authZ.configure(authorizerProperties.asJava)
f(authZ)
}
finally CoreUtils.swallow(authZ.close(), this)
finally Utils.closeQuietly(authZ, "authorizer")
}

def addAcls(): Unit = {
Expand Down
10 changes: 4 additions & 6 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,8 @@ object SocketServer {
channel: SocketChannel,
logging: Logging
): Unit = {
CoreUtils.swallow(channel.socket().close(), logging, Level.ERROR)
CoreUtils.swallow(channel.close(), logging, Level.ERROR)
Utils.closeQuietly(channel.socket, "channel socket")
Utils.closeQuietly(channel, "channel")
}
}

Expand Down Expand Up @@ -709,10 +709,8 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
private def closeAll(): Unit = {
debug("Closing server socket, selector, and any throttled sockets.")
// The serverChannel will be null if Acceptor's thread is not started
if (serverChannel != null) {
CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
}
CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
Utils.closeQuietly(serverChannel, "Acceptor serverChannel")
Utils.closeQuietly(nioSelector, "Acceptor nioSelector")
throttledSockets.foreach(throttledSocket => closeSocket(throttledSocket.socket, this))
throttledSockets.clear()
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import kafka.raft.KafkaMetadataLog.RetentionMsBreach
import kafka.raft.KafkaMetadataLog.RetentionSizeBreach
import kafka.raft.KafkaMetadataLog.SnapshotDeletionReason
import kafka.raft.KafkaMetadataLog.UnknownReason
import kafka.utils.{CoreUtils, Logging}
import kafka.utils.Logging
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.record.{MemoryRecords, Records}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch}
import org.apache.kafka.server.common.RequestLocal
Expand Down Expand Up @@ -682,7 +682,7 @@ object KafkaMetadataLog extends Logging {
): Unit = {
expiredSnapshots.foreach { case (snapshotId, snapshotReader) =>
snapshotReader.foreach { reader =>
CoreUtils.swallow(reader.close(), logging)
Utils.closeQuietly(reader, "reader")
}
Snapshots.deleteIfExists(logDir, snapshotId)
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ class KafkaRaftManager[T](

def shutdown(): Unit = {
CoreUtils.swallow(expirationService.shutdown(), this)
CoreUtils.swallow(expirationTimer.close(), this)
Utils.closeQuietly(expirationTimer, "expiration timer")
CoreUtils.swallow(clientDriver.shutdown(), this)
CoreUtils.swallow(scheduler.shutdown(), this)
CoreUtils.swallow(netChannel.close(), this)
CoreUtils.swallow(replicatedLog.close(), this)
Utils.closeQuietly(netChannel, "net channel")
Utils.closeQuietly(replicatedLog, "replicated log")
CoreUtils.swallow(dataDirLock.foreach(_.destroy()), this)
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid}
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord
import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics}
Expand Down Expand Up @@ -673,7 +673,7 @@ class BrokerServer(
CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
if (dataPlaneRequestProcessor != null)
CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
CoreUtils.swallow(authorizer.foreach(_.close()), this)
authorizer.foreach(Utils.closeQuietly(_, "authorizer"))

/**
* We must shutdown the scheduler early because otherwise, the scheduler could touch other
Expand Down Expand Up @@ -716,7 +716,7 @@ class BrokerServer(

// Close remote log manager to give a chance to any of its underlying clients
// (especially in RemoteStorageManager and RemoteLogMetadataManager) to close gracefully.
CoreUtils.swallow(remoteLogManagerOpt.foreach(_.close()), this)
remoteLogManagerOpt.foreach(Utils.closeQuietly(_, "remote log manager option"))
bboyleonp666 marked this conversation as resolved.
Show resolved Hide resolved

if (quotaManagers != null)
CoreUtils.swallow(quotaManagers.shutdown(), this)
Expand Down
16 changes: 6 additions & 10 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.LogContext
import org.apache.kafka.common.utils.{LogContext, Utils}
import org.apache.kafka.common.{ClusterResource, Endpoint, Uuid}
import org.apache.kafka.controller.metrics.{ControllerMetadataMetricsPublisher, QuorumControllerMetrics}
import org.apache.kafka.controller.{Controller, QuorumController, QuorumFeatures}
Expand Down Expand Up @@ -66,15 +66,11 @@ case class ControllerMigrationSupport(
brokersRpcClient: LegacyPropagator
) {
def shutdown(logging: Logging): Unit = {
if (zkClient != null) {
CoreUtils.swallow(zkClient.close(), logging)
}
Utils.closeQuietly(zkClient, "zk client")
if (brokersRpcClient != null) {
CoreUtils.swallow(brokersRpcClient.shutdown(), logging)
}
if (migrationDriver != null) {
CoreUtils.swallow(migrationDriver.close(), logging)
}
Utils.closeQuietly(migrationDriver, "migration driver")
}
}

Expand Down Expand Up @@ -516,9 +512,9 @@ class ControllerServer(
controller.close()
if (quorumControllerMetrics != null)
bboyleonp666 marked this conversation as resolved.
Show resolved Hide resolved
CoreUtils.swallow(quorumControllerMetrics.close(), this)
CoreUtils.swallow(authorizer.foreach(_.close()), this)
createTopicPolicy.foreach(policy => CoreUtils.swallow(policy.close(), this))
alterConfigPolicy.foreach(policy => CoreUtils.swallow(policy.close(), this))
authorizer.foreach(Utils.closeQuietly(_, "authorizer"))
createTopicPolicy.foreach(policy => Utils.closeQuietly(policy, "topic policy"))
bboyleonp666 marked this conversation as resolved.
Show resolved Hide resolved
alterConfigPolicy.foreach(policy => Utils.closeQuietly(policy, "alter config policy"))
socketServerFirstBoundPortFuture.completeExceptionally(new RuntimeException("shutting down"))
CoreUtils.swallow(config.dynamicConfig.clear(), this)
sharedServer.stopForController()
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,7 @@ class KafkaServer(
CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
if (controlPlaneRequestProcessor != null)
CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
CoreUtils.swallow(authorizer.foreach(_.close()), this)
authorizer.foreach(Utils.closeQuietly(_, "authorizer"))
if (adminManager != null)
CoreUtils.swallow(adminManager.shutdown(), this)

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/ZkAdminManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity,
import org.apache.kafka.common.requests.CreateTopicsRequest._
import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError}
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter}
import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.common.utils.{Sanitizer, Utils}
import org.apache.kafka.server.common.AdminOperationException
import org.apache.kafka.server.config.{ConfigType, QuotaConfigs, ZooKeeperInternals}
import org.apache.kafka.server.config.ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG
Expand Down Expand Up @@ -547,8 +547,8 @@ class ZkAdminManager(val config: KafkaConfig,

def shutdown(): Unit = {
topicPurgatory.shutdown()
CoreUtils.swallow(createTopicPolicy.foreach(_.close()), this)
CoreUtils.swallow(alterConfigPolicy.foreach(_.close()), this)
createTopicPolicy.foreach(Utils.closeQuietly(_, "topic policy"))
bboyleonp666 marked this conversation as resolved.
Show resolved Hide resolved
alterConfigPolicy.foreach(Utils.closeQuietly(_, "alter config policy"))
}

private def resourceNameToBrokerId(resourceName: String): Int = {
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/kafka/tools/TestRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ class TestRaftServer(
CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
if (socketServer != null)
CoreUtils.swallow(socketServer.shutdown(), this)
if (metrics != null)
CoreUtils.swallow(metrics.close(), this)
Utils.closeQuietly(metrics, "metrics")
shutdownLatch.countDown()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.kafka.clients.consumer.GroupProtocol
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{Exit, Time}
import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.common.{DirectoryId, Uuid}
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR}
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion}
Expand Down Expand Up @@ -79,7 +79,7 @@ class ZooKeeperQuorumImplementation(
}

override def shutdown(): Unit = {
CoreUtils.swallow(zkClient.close(), log)
Utils.closeQuietly(zkClient, "zk client")
CoreUtils.swallow(zookeeper.shutdown(), log)
}
}
Expand Down Expand Up @@ -422,7 +422,7 @@ abstract class QuorumTestHarness extends Logging {
} catch {
case t: Throwable =>
CoreUtils.swallow(zookeeper.shutdown(), this)
if (zkClient != null) CoreUtils.swallow(zkClient.close(), this)
Utils.closeQuietly(zkClient, "zk client")
throw t
}
new ZooKeeperQuorumImplementation(
Expand Down
8 changes: 4 additions & 4 deletions core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.nio.file.Files
import java.text.MessageFormat
import java.util.{Locale, Properties, UUID}

import kafka.utils.{CoreUtils, Logging}
import kafka.utils.Logging

import scala.jdk.CollectionConverters._
import org.apache.commons.lang.text.StrSubstitutor
Expand Down Expand Up @@ -206,7 +206,7 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
builder.append(line).append("\n")
addEntriesToDirectoryService(StrSubstitutor.replace(builder, map.asJava))
}
finally CoreUtils.swallow(reader.close(), this)
finally Utils.closeQuietly(reader, "miniKdc reader")
}

val bindAddress = config.getProperty(MiniKdc.KdcBindAddress)
Expand Down Expand Up @@ -255,7 +255,7 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
while ({line = reader.readLine(); line != null}) {
stringBuilder.append(line).append("{3}")
}
} finally CoreUtils.swallow(reader.close(), this)
} finally Utils.closeQuietly(reader, "krb5 conf stream reader")
val output = MessageFormat.format(stringBuilder.toString, realm, host, port.toString, System.lineSeparator())
Files.write(krb5conf.toPath, output.getBytes(StandardCharsets.UTF_8))
}
Expand Down Expand Up @@ -344,7 +344,7 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
try {
for (ldifEntry <- reader.asScala)
ds.getAdminSession.add(new DefaultEntry(ds.getSchemaManager, ldifEntry.getEntry))
} finally CoreUtils.swallow(reader.close(), this)
} finally Utils.closeQuietly(reader, "ldif reader")
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, RecordValidationStats}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.util.timer.{MockTimer, Timer}
import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
Expand Down Expand Up @@ -71,7 +71,7 @@ abstract class AbstractCoordinatorConcurrencyTest[M <: CoordinatorMember] extend
def tearDown(): Unit = {
CoreUtils.swallow(replicaManager.shutdown(false), this)
CoreUtils.swallow(executor.shutdownNow(), this)
CoreUtils.swallow(timer.close(), this)
Utils.closeQuietly(timer, "mock timer")
CoreUtils.swallow(scheduler.shutdown(), this)
CoreUtils.swallow(time.scheduler.shutdown(), this)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetFetchResponse}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.server.common.RequestLocal
import org.junit.jupiter.api.Assertions._
Expand Down Expand Up @@ -97,7 +97,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
override def tearDown(): Unit = {
try {
CoreUtils.swallow(groupCoordinator.shutdown(), this)
CoreUtils.swallow(metrics.close(), this)
Utils.closeQuietly(metrics, "metrics")
} finally {
super.tearDown()
}
Expand Down
5 changes: 3 additions & 2 deletions core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package kafka.server

import kafka.utils.{CoreUtils, TestUtils}
import kafka.utils.TestUtils
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.{ReplicationConfigs, ZkConfigs}
Expand Down Expand Up @@ -59,7 +60,7 @@ class KafkaServerTest extends QuorumTestHarness {
"Expected RuntimeException due to address already in use during KafkaServer startup"
)
} finally {
CoreUtils.swallow(serverSocket.close(), this)
Utils.closeQuietly(serverSocket, "server socket")
TestUtils.shutdownServers(kafkaServer.toList)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.kafka.common.record.SimpleRecord
import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.requests.{FetchRequest, ProduceResponse}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
Expand Down Expand Up @@ -75,7 +75,7 @@ class ReplicaManagerConcurrencyTest extends Logging {
CoreUtils.swallow(channel.shutdown(), this)
CoreUtils.swallow(replicaManager.shutdown(checkpointHW = false), this)
CoreUtils.swallow(quotaManagers.shutdown(), this)
CoreUtils.swallow(metrics.close(), this)
Utils.closeQuietly(metrics, "metrics")
CoreUtils.swallow(time.scheduler.shutdown(), this)
}

Expand Down