Skip to content

Commit

Permalink
KAFKA-16909 Refactor GroupCoordinatorConfig with AbstractConfig (#16458)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
brandboat committed Jul 1, 2024
1 parent 7d91bc8 commit 206d0f8
Show file tree
Hide file tree
Showing 21 changed files with 312 additions and 284 deletions.
28 changes: 14 additions & 14 deletions core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1770,16 +1770,16 @@ object GroupCoordinator {

@nowarn("cat=deprecation")
private[group] def offsetConfig(config: KafkaConfig) = new OffsetConfig(
config.offsetMetadataMaxSize,
config.offsetsLoadBufferSize,
config.offsetsRetentionMinutes * 60L * 1000L,
config.offsetsRetentionCheckIntervalMs,
config.offsetsTopicPartitions,
config.offsetsTopicSegmentBytes,
config.offsetsTopicReplicationFactor,
config.offsetsTopicCompressionType,
config.offsetCommitTimeoutMs,
config.offsetCommitRequiredAcks
config.groupCoordinatorConfig.offsetMetadataMaxSize,
config.groupCoordinatorConfig.offsetsLoadBufferSize,
config.groupCoordinatorConfig.offsetsRetentionMs,
config.groupCoordinatorConfig.offsetsRetentionCheckIntervalMs,
config.groupCoordinatorConfig.offsetsTopicPartitions,
config.groupCoordinatorConfig.offsetsTopicSegmentBytes,
config.groupCoordinatorConfig.offsetsTopicReplicationFactor,
config.groupCoordinatorConfig.offsetTopicCompressionType,
config.groupCoordinatorConfig.offsetCommitTimeoutMs,
config.groupCoordinatorConfig.offsetCommitRequiredAcks
)

private[group] def apply(
Expand All @@ -1791,10 +1791,10 @@ object GroupCoordinator {
metrics: Metrics
): GroupCoordinator = {
val offsetConfig = this.offsetConfig(config)
val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs,
groupMaxSize = config.groupMaxSize,
groupInitialRebalanceDelayMs = config.groupInitialRebalanceDelay)
val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupCoordinatorConfig.classicGroupMinSessionTimeoutMs,
groupMaxSessionTimeoutMs = config.groupCoordinatorConfig.classicGroupMaxSessionTimeoutMs,
groupMaxSize = config.groupCoordinatorConfig.classicGroupMaxSize,
groupInitialRebalanceDelayMs = config.groupCoordinatorConfig.classicGroupInitialRebalanceDelayMs)

val groupMetadataManager = new GroupMetadataManager(config.brokerId, config.interBrokerProtocolVersion,
offsetConfig, replicaManager, time, metrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ class DefaultAutoTopicCreationManager(
case GROUP_METADATA_TOPIC_NAME =>
new CreatableTopic()
.setName(topic)
.setNumPartitions(config.offsetsTopicPartitions)
.setReplicationFactor(config.offsetsTopicReplicationFactor)
.setNumPartitions(config.groupCoordinatorConfig.offsetsTopicPartitions)
.setReplicationFactor(config.groupCoordinatorConfig.offsetsTopicReplicationFactor)
.setConfigs(convertToTopicConfigCollections(groupCoordinator.groupMetadataTopicConfigs))
case TRANSACTION_STATE_TOPIC_NAME =>
new CreatableTopic()
Expand Down
26 changes: 3 additions & 23 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics}
import org.apache.kafka.coordinator.group.{CoordinatorRecord, GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, CoordinatorRecordSerde}
import org.apache.kafka.coordinator.group.{CoordinatorRecord, GroupCoordinator, GroupCoordinatorService, CoordinatorRecordSerde}
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, VersionRange}
import org.apache.kafka.security.CredentialProvider
Expand Down Expand Up @@ -568,26 +568,6 @@ class BrokerServer(
if (config.isNewGroupCoordinatorEnabled) {
val time = Time.SYSTEM
val serde = new CoordinatorRecordSerde
val groupCoordinatorConfig = new GroupCoordinatorConfig(
config.groupCoordinatorNumThreads,
config.groupCoordinatorAppendLingerMs,
config.consumerGroupSessionTimeoutMs,
config.consumerGroupHeartbeatIntervalMs,
config.consumerGroupMaxSize,
config.consumerGroupAssignors,
config.offsetsTopicSegmentBytes,
config.offsetMetadataMaxSize,
config.groupMaxSize,
config.groupInitialRebalanceDelay,
GroupCoordinatorConfig.CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS,
config.groupMinSessionTimeoutMs,
config.groupMaxSessionTimeoutMs,
config.offsetsRetentionCheckIntervalMs,
config.offsetsRetentionMinutes * 60 * 1000L,
config.offsetCommitTimeoutMs,
config.consumerGroupMigrationPolicy,
config.offsetsTopicCompressionType
)
val timer = new SystemTimerReaper(
"group-coordinator-reaper",
new SystemTimer("group-coordinator")
Expand All @@ -596,12 +576,12 @@ class BrokerServer(
time,
replicaManager,
serde,
config.offsetsLoadBufferSize
config.groupCoordinatorConfig.offsetsLoadBufferSize
)
val writer = new CoordinatorPartitionWriter(
replicaManager
)
new GroupCoordinatorService.Builder(config.brokerId, groupCoordinatorConfig)
new GroupCoordinatorService.Builder(config.brokerId, config.groupCoordinatorConfig)
.withTime(time)
.withTimer(timer)
.withLoader(loader)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ class KafkaApis(val requestChannel: RequestChannel,
authorizedTopicsRequest.foreach { topic =>
topic.partitions.forEach { partition =>
val error = try {
if (partition.committedMetadata != null && partition.committedMetadata.length > config.offsetMetadataMaxSize) {
if (partition.committedMetadata != null && partition.committedMetadata.length > config.groupCoordinatorConfig.offsetMetadataMaxSize) {
Errors.OFFSET_METADATA_TOO_LARGE
} else {
zkSupport.zkClient.setOrCreateConsumerOffset(
Expand Down
53 changes: 11 additions & 42 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@ import org.apache.kafka.common.record.{CompressionType, TimestampType}
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
Expand Down Expand Up @@ -234,6 +232,9 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this)
def remoteLogManagerConfig = _remoteLogManagerConfig

private val _groupCoordinatorConfig = new GroupCoordinatorConfig(this)
def groupCoordinatorConfig: GroupCoordinatorConfig = _groupCoordinatorConfig

private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey: String): Boolean = {
// Use the system property if it exists and the Kafka config value was defaulted rather than actually provided
// Need to translate any system property value from true/false (String) to true/false (Boolean)
Expand Down Expand Up @@ -439,8 +440,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val logCleanupIntervalMs = getLong(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG)
def logCleanupPolicy = getList(ServerLogConfigs.LOG_CLEANUP_POLICY_CONFIG)

val offsetsRetentionMinutes = getInt(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG)
val offsetsRetentionCheckIntervalMs = getLong(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG)
def logRetentionBytes = getLong(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG)
val logCleanerDedupeBufferSize = getLong(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP)
val logCleanerDedupeBufferLoadFactor = getDouble(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP)
Expand Down Expand Up @@ -562,12 +561,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
/** ********* Feature configuration ***********/
def isFeatureVersioningSupported = interBrokerProtocolVersion.isFeatureVersioningSupported

/** ********* Group coordinator configuration ***********/
val groupMinSessionTimeoutMs = getInt(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)
val groupMaxSessionTimeoutMs = getInt(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG)
val groupInitialRebalanceDelay = getInt(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG)
val groupMaxSize = getInt(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG)

/** New group coordinator configs */
val groupCoordinatorRebalanceProtocols = {
val protocols = getList(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG)
Expand All @@ -588,19 +581,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
// it is explicitly set; or 2) the consumer rebalance protocol is enabled.
val isNewGroupCoordinatorEnabled = getBoolean(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG) ||
groupCoordinatorRebalanceProtocols.contains(GroupType.CONSUMER)
val groupCoordinatorNumThreads = getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG)
val groupCoordinatorAppendLingerMs = getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG)

/** Consumer group configs */
val consumerGroupSessionTimeoutMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG)
val consumerGroupMinSessionTimeoutMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)
val consumerGroupMaxSessionTimeoutMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG)
val consumerGroupHeartbeatIntervalMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)
val consumerGroupMinHeartbeatIntervalMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG)
val consumerGroupMaxHeartbeatIntervalMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG)
val consumerGroupMaxSize = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG)
val consumerGroupAssignors = getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, classOf[ConsumerGroupPartitionAssignor])
val consumerGroupMigrationPolicy = ConsumerGroupMigrationPolicy.parse(getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG))

/** Share group configuration **/
val isShareGroupEnabled = getBoolean(ShareGroupConfigs.SHARE_GROUP_ENABLE_CONFIG)
Expand All @@ -618,17 +598,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val shareGroupMaxRecordLockDurationMs = getInt(ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG)
val shareGroupMinRecordLockDurationMs = getInt(ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG)

/** ********* Offset management configuration ***********/
val offsetMetadataMaxSize = getInt(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG)
val offsetsLoadBufferSize = getInt(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG)
val offsetsTopicReplicationFactor = getShort(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG)
val offsetsTopicPartitions = getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG)
val offsetCommitTimeoutMs = getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG)
@deprecated("3.8")
val offsetCommitRequiredAcks = getShort(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG)
val offsetsTopicSegmentBytes = getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG)
val offsetsTopicCompressionType = Option(getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG)).map(value => CompressionType.forId(value)).orNull

/** ********* Transaction management configuration ***********/
val transactionalIdExpirationMs = getInt(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG)
val transactionMaxTimeoutMs = getInt(TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG)
Expand Down Expand Up @@ -923,7 +892,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
" to prevent unnecessary socket timeouts")
require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs, "replica.fetch.wait.max.ms should always be less than or equal to replica.lag.time.max.ms" +
" to prevent frequent changes in ISR")
require(offsetCommitRequiredAcks >= -1 && offsetCommitRequiredAcks <= offsetsTopicReplicationFactor,
require(groupCoordinatorConfig.offsetCommitRequiredAcks >= -1 && groupCoordinatorConfig.offsetCommitRequiredAcks <= groupCoordinatorConfig.offsetsTopicReplicationFactor,
"offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor")
val advertisedBrokerListenerNames = effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet

Expand Down Expand Up @@ -1076,7 +1045,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
s"log.message.format.version $logMessageFormatVersionString can only be used when inter.broker.protocol.version " +
s"is set to version ${MetadataVersion.minSupportedFor(recordVersion).shortVersion} or higher")

if (offsetsTopicCompressionType == CompressionType.ZSTD)
if (groupCoordinatorConfig.offsetTopicCompressionType == CompressionType.ZSTD)
require(interBrokerProtocolVersion.highestSupportedRecordVersion().value >= IBP_2_1_IV0.highestSupportedRecordVersion().value,
"offsets.topic.compression.codec zstd can only be used when inter.broker.protocol.version " +
s"is set to version ${IBP_2_1_IV0.shortVersion} or higher")
Expand Down Expand Up @@ -1109,23 +1078,23 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement KafkaPrincipalSerde")

// New group coordinator configs validation.
require(consumerGroupMaxHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs,
require(groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs >= groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}")
require(consumerGroupHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs,
require(groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs >= groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}")
require(consumerGroupHeartbeatIntervalMs <= consumerGroupMaxHeartbeatIntervalMs,
require(groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs <= groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be less than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG}")

require(consumerGroupMaxSessionTimeoutMs >= consumerGroupMinSessionTimeoutMs,
require(groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs >= groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}")
require(consumerGroupSessionTimeoutMs >= consumerGroupMinSessionTimeoutMs,
require(groupCoordinatorConfig.consumerGroupSessionTimeoutMs >= groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}")
require(consumerGroupSessionTimeoutMs <= consumerGroupMaxSessionTimeoutMs,
require(groupCoordinatorConfig.consumerGroupSessionTimeoutMs <= groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be less than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG}")

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ class KafkaServer(
Time.SYSTEM,
metrics
)
groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions))
groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions))

/* create producer ids manager */
val producerIdManager = if (config.interBrokerProtocolVersion.isAllocateProducerIdsSupported) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ class BrokerMetadataPublisher(
try {
// Start the group coordinator.
groupCoordinator.startup(() => metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME)
.getOrElse(config.offsetsTopicPartitions))
.getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions))
} catch {
case t: Throwable => fatalFaultHandler.handleFault("Error starting GroupCoordinator", t)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup

TestUtils.createTopicWithAdmin(adminClients.head, topic, servers, controllerServers, numPartitions, replicationFactor = numServers)
TestUtils.createTopicWithAdmin(adminClients.head, Topic.GROUP_METADATA_TOPIC_NAME, servers, controllerServers,
numPartitions = servers.head.config.offsetsTopicPartitions,
numPartitions = servers.head.config.groupCoordinatorConfig.offsetsTopicPartitions,
replicationFactor = numServers,
topicConfig = servers.head.groupCoordinator.groupMetadataTopicConfigs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest

metrics = new Metrics
groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, timer.time, metrics)
groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions),
groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions),
enableMetadataExpiration = false)

// Transactional appends attempt to schedule to the request handler thread using
Expand Down Expand Up @@ -155,7 +155,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
groupCoordinator.shutdown()
groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory,
rebalancePurgatory, timer.time, new Metrics())
groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions),
groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions),
enableMetadataExpiration = false)

val members = new Group(s"group", nMembersPerGroup, groupCoordinator, replicaManager)
Expand Down
Loading

0 comments on commit 206d0f8

Please sign in to comment.