Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16909 Refactor GroupCoordinatorConfig with AbstractConfig #16458

Merged
merged 10 commits into from
Jul 1, 2024
28 changes: 14 additions & 14 deletions core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1770,16 +1770,16 @@ object GroupCoordinator {

@nowarn("cat=deprecation")
private[group] def offsetConfig(config: KafkaConfig) = new OffsetConfig(
config.offsetMetadataMaxSize,
config.offsetsLoadBufferSize,
config.offsetsRetentionMinutes * 60L * 1000L,
config.offsetsRetentionCheckIntervalMs,
config.offsetsTopicPartitions,
config.offsetsTopicSegmentBytes,
config.offsetsTopicReplicationFactor,
config.offsetsTopicCompressionType,
config.offsetCommitTimeoutMs,
config.offsetCommitRequiredAcks
config.groupCoordinatorConfig.offsetMetadataMaxSize,
config.groupCoordinatorConfig.offsetsLoadBufferSize,
config.groupCoordinatorConfig.offsetsRetentionMs,
config.groupCoordinatorConfig.offsetsRetentionCheckIntervalMs,
config.groupCoordinatorConfig.offsetsTopicPartitions,
config.groupCoordinatorConfig.offsetsTopicSegmentBytes,
config.groupCoordinatorConfig.offsetsTopicReplicationFactor,
config.groupCoordinatorConfig.offsetTopicCompressionType,
config.groupCoordinatorConfig.offsetCommitTimeoutMs,
config.groupCoordinatorConfig.offsetCommitRequiredAcks
)

private[group] def apply(
Expand All @@ -1791,10 +1791,10 @@ object GroupCoordinator {
metrics: Metrics
): GroupCoordinator = {
val offsetConfig = this.offsetConfig(config)
val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs,
groupMaxSize = config.groupMaxSize,
groupInitialRebalanceDelayMs = config.groupInitialRebalanceDelay)
val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupCoordinatorConfig.classicGroupMinSessionTimeoutMs,
groupMaxSessionTimeoutMs = config.groupCoordinatorConfig.classicGroupMaxSessionTimeoutMs,
groupMaxSize = config.groupCoordinatorConfig.classicGroupMaxSize,
groupInitialRebalanceDelayMs = config.groupCoordinatorConfig.classicGroupInitialRebalanceDelayMs)

val groupMetadataManager = new GroupMetadataManager(config.brokerId, config.interBrokerProtocolVersion,
offsetConfig, replicaManager, time, metrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ class DefaultAutoTopicCreationManager(
case GROUP_METADATA_TOPIC_NAME =>
new CreatableTopic()
.setName(topic)
.setNumPartitions(config.offsetsTopicPartitions)
.setReplicationFactor(config.offsetsTopicReplicationFactor)
.setNumPartitions(config.groupCoordinatorConfig.offsetsTopicPartitions)
.setReplicationFactor(config.groupCoordinatorConfig.offsetsTopicReplicationFactor)
.setConfigs(convertToTopicConfigCollections(groupCoordinator.groupMetadataTopicConfigs))
case TRANSACTION_STATE_TOPIC_NAME =>
new CreatableTopic()
Expand Down
26 changes: 3 additions & 23 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics}
import org.apache.kafka.coordinator.group.{CoordinatorRecord, GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, CoordinatorRecordSerde}
import org.apache.kafka.coordinator.group.{CoordinatorRecord, GroupCoordinator, GroupCoordinatorService, CoordinatorRecordSerde}
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, VersionRange}
import org.apache.kafka.security.CredentialProvider
Expand Down Expand Up @@ -568,26 +568,6 @@ class BrokerServer(
if (config.isNewGroupCoordinatorEnabled) {
val time = Time.SYSTEM
val serde = new CoordinatorRecordSerde
val groupCoordinatorConfig = new GroupCoordinatorConfig(
config.groupCoordinatorNumThreads,
config.groupCoordinatorAppendLingerMs,
config.consumerGroupSessionTimeoutMs,
config.consumerGroupHeartbeatIntervalMs,
config.consumerGroupMaxSize,
config.consumerGroupAssignors,
config.offsetsTopicSegmentBytes,
config.offsetMetadataMaxSize,
config.groupMaxSize,
config.groupInitialRebalanceDelay,
GroupCoordinatorConfig.CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS,
config.groupMinSessionTimeoutMs,
config.groupMaxSessionTimeoutMs,
config.offsetsRetentionCheckIntervalMs,
config.offsetsRetentionMinutes * 60 * 1000L,
config.offsetCommitTimeoutMs,
config.consumerGroupMigrationPolicy,
config.offsetsTopicCompressionType
)
val timer = new SystemTimerReaper(
"group-coordinator-reaper",
new SystemTimer("group-coordinator")
Expand All @@ -596,12 +576,12 @@ class BrokerServer(
time,
replicaManager,
serde,
config.offsetsLoadBufferSize
config.groupCoordinatorConfig.offsetsLoadBufferSize
)
val writer = new CoordinatorPartitionWriter(
replicaManager
)
new GroupCoordinatorService.Builder(config.brokerId, groupCoordinatorConfig)
new GroupCoordinatorService.Builder(config.brokerId, config.groupCoordinatorConfig)
.withTime(time)
.withTimer(timer)
.withLoader(loader)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ class KafkaApis(val requestChannel: RequestChannel,
authorizedTopicsRequest.foreach { topic =>
topic.partitions.forEach { partition =>
val error = try {
if (partition.committedMetadata != null && partition.committedMetadata.length > config.offsetMetadataMaxSize) {
if (partition.committedMetadata != null && partition.committedMetadata.length > config.groupCoordinatorConfig.offsetMetadataMaxSize) {
Errors.OFFSET_METADATA_TOO_LARGE
} else {
zkSupport.zkClient.setOrCreateConsumerOffset(
Expand Down
53 changes: 11 additions & 42 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@ import org.apache.kafka.common.record.{CompressionType, TimestampType}
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
Expand Down Expand Up @@ -234,6 +232,9 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this)
def remoteLogManagerConfig = _remoteLogManagerConfig

private val _groupCoordinatorConfig = new GroupCoordinatorConfig(this)
def groupCoordinatorConfig: GroupCoordinatorConfig = _groupCoordinatorConfig

private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey: String): Boolean = {
// Use the system property if it exists and the Kafka config value was defaulted rather than actually provided
// Need to translate any system property value from true/false (String) to true/false (Boolean)
Expand Down Expand Up @@ -439,8 +440,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val logCleanupIntervalMs = getLong(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG)
def logCleanupPolicy = getList(ServerLogConfigs.LOG_CLEANUP_POLICY_CONFIG)

val offsetsRetentionMinutes = getInt(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG)
val offsetsRetentionCheckIntervalMs = getLong(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG)
def logRetentionBytes = getLong(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG)
val logCleanerDedupeBufferSize = getLong(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP)
val logCleanerDedupeBufferLoadFactor = getDouble(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP)
Expand Down Expand Up @@ -562,12 +561,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
/** ********* Feature configuration ***********/
def isFeatureVersioningSupported = interBrokerProtocolVersion.isFeatureVersioningSupported

/** ********* Group coordinator configuration ***********/
val groupMinSessionTimeoutMs = getInt(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)
val groupMaxSessionTimeoutMs = getInt(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG)
val groupInitialRebalanceDelay = getInt(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG)
val groupMaxSize = getInt(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG)

/** New group coordinator configs */
val groupCoordinatorRebalanceProtocols = {
val protocols = getList(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG)
Expand All @@ -588,19 +581,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
// it is explicitly set; or 2) the consumer rebalance protocol is enabled.
val isNewGroupCoordinatorEnabled = getBoolean(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG) ||
groupCoordinatorRebalanceProtocols.contains(GroupType.CONSUMER)
val groupCoordinatorNumThreads = getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG)
val groupCoordinatorAppendLingerMs = getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG)

/** Consumer group configs */
val consumerGroupSessionTimeoutMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@OmniaGM Could you please take a look? Does it follow your idea that we should move getters out of KafkaConfig?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for late response here, this looks great! I think the validators could also be moved out as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we will file a MINOR for it

val consumerGroupMinSessionTimeoutMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)
val consumerGroupMaxSessionTimeoutMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG)
val consumerGroupHeartbeatIntervalMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)
val consumerGroupMinHeartbeatIntervalMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG)
val consumerGroupMaxHeartbeatIntervalMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG)
val consumerGroupMaxSize = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG)
val consumerGroupAssignors = getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, classOf[ConsumerGroupPartitionAssignor])
val consumerGroupMigrationPolicy = ConsumerGroupMigrationPolicy.parse(getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG))

/** Share group configuration **/
val isShareGroupEnabled = getBoolean(ShareGroupConfigs.SHARE_GROUP_ENABLE_CONFIG)
Expand All @@ -618,17 +598,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val shareGroupMaxRecordLockDurationMs = getInt(ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG)
val shareGroupMinRecordLockDurationMs = getInt(ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG)

/** ********* Offset management configuration ***********/
val offsetMetadataMaxSize = getInt(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG)
val offsetsLoadBufferSize = getInt(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG)
val offsetsTopicReplicationFactor = getShort(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG)
val offsetsTopicPartitions = getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG)
val offsetCommitTimeoutMs = getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG)
@deprecated("3.8")
val offsetCommitRequiredAcks = getShort(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG)
val offsetsTopicSegmentBytes = getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG)
val offsetsTopicCompressionType = Option(getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG)).map(value => CompressionType.forId(value)).orNull

/** ********* Transaction management configuration ***********/
val transactionalIdExpirationMs = getInt(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG)
val transactionMaxTimeoutMs = getInt(TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG)
Expand Down Expand Up @@ -923,7 +892,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
" to prevent unnecessary socket timeouts")
require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs, "replica.fetch.wait.max.ms should always be less than or equal to replica.lag.time.max.ms" +
" to prevent frequent changes in ISR")
require(offsetCommitRequiredAcks >= -1 && offsetCommitRequiredAcks <= offsetsTopicReplicationFactor,
require(groupCoordinatorConfig.offsetCommitRequiredAcks >= -1 && groupCoordinatorConfig.offsetCommitRequiredAcks <= groupCoordinatorConfig.offsetsTopicReplicationFactor,
"offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor")
val advertisedBrokerListenerNames = effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet

Expand Down Expand Up @@ -1076,7 +1045,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
s"log.message.format.version $logMessageFormatVersionString can only be used when inter.broker.protocol.version " +
s"is set to version ${MetadataVersion.minSupportedFor(recordVersion).shortVersion} or higher")

if (offsetsTopicCompressionType == CompressionType.ZSTD)
if (groupCoordinatorConfig.offsetTopicCompressionType == CompressionType.ZSTD)
require(interBrokerProtocolVersion.highestSupportedRecordVersion().value >= IBP_2_1_IV0.highestSupportedRecordVersion().value,
"offsets.topic.compression.codec zstd can only be used when inter.broker.protocol.version " +
s"is set to version ${IBP_2_1_IV0.shortVersion} or higher")
Expand Down Expand Up @@ -1109,23 +1078,23 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement KafkaPrincipalSerde")

// New group coordinator configs validation.
require(consumerGroupMaxHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs,
require(groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs >= groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}")
require(consumerGroupHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs,
require(groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs >= groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}")
require(consumerGroupHeartbeatIntervalMs <= consumerGroupMaxHeartbeatIntervalMs,
require(groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs <= groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be less than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG}")

require(consumerGroupMaxSessionTimeoutMs >= consumerGroupMinSessionTimeoutMs,
require(groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs >= groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}")
require(consumerGroupSessionTimeoutMs >= consumerGroupMinSessionTimeoutMs,
require(groupCoordinatorConfig.consumerGroupSessionTimeoutMs >= groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}")
require(consumerGroupSessionTimeoutMs <= consumerGroupMaxSessionTimeoutMs,
require(groupCoordinatorConfig.consumerGroupSessionTimeoutMs <= groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be less than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG}")

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ class KafkaServer(
Time.SYSTEM,
metrics
)
groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions))
groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions))

/* create producer ids manager */
val producerIdManager = if (config.interBrokerProtocolVersion.isAllocateProducerIdsSupported) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ class BrokerMetadataPublisher(
try {
// Start the group coordinator.
groupCoordinator.startup(() => metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME)
.getOrElse(config.offsetsTopicPartitions))
.getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions))
} catch {
case t: Throwable => fatalFaultHandler.handleFault("Error starting GroupCoordinator", t)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup

TestUtils.createTopicWithAdmin(adminClients.head, topic, servers, controllerServers, numPartitions, replicationFactor = numServers)
TestUtils.createTopicWithAdmin(adminClients.head, Topic.GROUP_METADATA_TOPIC_NAME, servers, controllerServers,
numPartitions = servers.head.config.offsetsTopicPartitions,
numPartitions = servers.head.config.groupCoordinatorConfig.offsetsTopicPartitions,
replicationFactor = numServers,
topicConfig = servers.head.groupCoordinator.groupMetadataTopicConfigs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest

metrics = new Metrics
groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, timer.time, metrics)
groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions),
groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions),
enableMetadataExpiration = false)

// Transactional appends attempt to schedule to the request handler thread using
Expand Down Expand Up @@ -155,7 +155,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
groupCoordinator.shutdown()
groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory,
rebalancePurgatory, timer.time, new Metrics())
groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions),
groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions),
enableMetadataExpiration = false)

val members = new Group(s"group", nMembersPerGroup, groupCoordinator, replicaManager)
Expand Down
Loading