Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16909 Refactor GroupCoordinatorConfig with AbstractConfig #16458

Merged
merged 10 commits into from
Jul 1, 2024
21 changes: 1 addition & 20 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -568,26 +568,7 @@ class BrokerServer(
if (config.isNewGroupCoordinatorEnabled) {
val time = Time.SYSTEM
val serde = new CoordinatorRecordSerde
val groupCoordinatorConfig = new GroupCoordinatorConfig(
config.groupCoordinatorNumThreads,
config.groupCoordinatorAppendLingerMs,
config.consumerGroupSessionTimeoutMs,
config.consumerGroupHeartbeatIntervalMs,
config.consumerGroupMaxSize,
config.consumerGroupAssignors,
config.offsetsTopicSegmentBytes,
config.offsetMetadataMaxSize,
config.groupMaxSize,
config.groupInitialRebalanceDelay,
GroupCoordinatorConfig.CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS,
config.groupMinSessionTimeoutMs,
config.groupMaxSessionTimeoutMs,
config.offsetsRetentionCheckIntervalMs,
config.offsetsRetentionMinutes * 60 * 1000L,
config.offsetCommitTimeoutMs,
config.consumerGroupMigrationPolicy,
config.offsetsTopicCompressionType
)
val groupCoordinatorConfig = new GroupCoordinatorConfig(config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a method to KafkaConfig to return GroupCoordinatorConfig? With that change, we can remove all GroupCoordinatorConfig-related getters from KafkaConfig

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the suggestion! I have added the method to KafkaConfig to return GroupCoordinatorConfig and removed all related getters. This change is included in the latest commits.

val timer = new SystemTimerReaper(
"group-coordinator-reaper",
new SystemTimer("group-coordinator")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.group;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.Utils;
Expand All @@ -26,6 +27,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
Expand Down Expand Up @@ -211,77 +213,111 @@ public class GroupCoordinatorConfig {
*/
public static final int CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS = 5 * 60 * 1000;

private final AbstractConfig config;

public GroupCoordinatorConfig(AbstractConfig config) {
this.config = config;
}

/**
* The number of threads or event loops running.
*/
public final int numThreads;
public int numThreads() {
return config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG);
}

/**
* The duration in milliseconds that the coordinator will wait for writes to
* accumulate before flushing them to disk.
*/
public final int appendLingerMs;
public int appendLingerMs() {
return config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG);
}

/**
* The consumer group session timeout in milliseconds.
*/
public final int consumerGroupSessionTimeoutMs;
public int consumerGroupSessionTimeoutMs() {
return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG);
}

/**
* The consumer group heartbeat interval in milliseconds.
*/
public final int consumerGroupHeartbeatIntervalMs;
public int consumerGroupHeartbeatIntervalMs() {
return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG);
}

/**
* The consumer group maximum size.
*/
public final int consumerGroupMaxSize;
public int consumerGroupMaxSize() {
return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG);
}

/**
* The consumer group assignors.
*/
public final List<ConsumerGroupPartitionAssignor> consumerGroupAssignors;
public List<ConsumerGroupPartitionAssignor> consumerGroupAssignors() {
return config.getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, ConsumerGroupPartitionAssignor.class);
}

/**
* The offsets topic segment bytes should be kept relatively small to facilitate faster
* log compaction and faster offset loads.
*/
public final int offsetsTopicSegmentBytes;
public int offsetsTopicSegmentBytes() {
return config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG);
}

/**
* The maximum size for a metadata entry associated with an offset commit.
*/
public final int offsetMetadataMaxSize;
public int offsetMetadataMaxSize() {
return config.getInt(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG);
}

/**
* The classic group maximum size.
*/
public final int classicGroupMaxSize;
public int classicGroupMaxSize() {
return config.getInt(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG);
}

/**
* The delay in milliseconds introduced for the first rebalance of a classic group.
*/
public final int classicGroupInitialRebalanceDelayMs;
public int classicGroupInitialRebalanceDelayMs() {
return config.getInt(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG);
}

/**
* The timeout used to wait for a new member in milliseconds.
*/
public final int classicGroupNewMemberJoinTimeoutMs;
public int classicGroupNewMemberJoinTimeoutMs() {
return CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS;
}

/**
* The classic group minimum session timeout.
*/
public final int classicGroupMinSessionTimeoutMs;
public int classicGroupMinSessionTimeoutMs() {
return config.getInt(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
}

/**
* The classic group maximum session timeout.
*/
public final int classicGroupMaxSessionTimeoutMs;
public int classicGroupMaxSessionTimeoutMs() {
return config.getInt(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
}

/**
* Frequency at which to check for expired offsets.
*/
public final long offsetsRetentionCheckIntervalMs;
public long offsetsRetentionCheckIntervalMs() {
return config.getLong(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG);
}

/**
* For subscribed consumers, committed offset of a specific partition will be expired and discarded when:
Expand All @@ -297,61 +333,32 @@ public class GroupCoordinatorConfig {
* Also, when a topic is deleted via the delete-topic request, upon propagated metadata update any group's
* committed offsets for that topic will also be deleted without extra retention period.
*/
public final long offsetsRetentionMs;
public long offsetsRetentionMs() {
return config.getInt(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG) * 60L * 1000L;
}

/**
* Offset commit will be delayed until all replicas for the offsets topic receive the commit
* or this timeout is reached
*/
public final int offsetCommitTimeoutMs;
public int offsetCommitTimeoutMs() {
return config.getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
}
Comment on lines +344 to +346
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of the config which is accessed extremely frequently (there are other like this too). I think that having attributes would be better as it avoid having to look it up in the config every time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good question!

The approach of this PR is to make sure GroupCoordinatorConfig can see the latest configs to avoid potential bugs (#16394). However, it has side effect which brings extra cost: "volatile" and "lookup/parse".

  • The cost of "volatile" (or other similar sync trick) is required if we make GroupCoordinatorConfig see latest configs.
  • The cost of "lookup/parse" could be eliminated if we do a bit refactor for it. For example, we pass Supplier<GroupCoordinatorConfig> instead of GroupCoordinatorConfig to the callers. By that changes, we can make GroupCoordinatorConfig have all immutable pre-created local attributes. The impl of Supplier<GroupCoordinatorConfig> will be generated by KafkaConfig and it looks like AtomicReference::get. However, the side effect is that the usage will get a little ugly: "config.numThreads()" -> "config.get().numThreads()"

WDYT? BTW, we had a related discussion in https://issues.apache.org/jira/browse/KAFKA-17001

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of the configs in this class are dynamic so I am not sure that it is worth it. We could perhaps mention it in the javadoc of the class. If we introduce a dynamic config, we should indeed not use an attribute for it.

For the context, in KafkaConfig, we always had the distinction between val (static values) and def (dynamic ones). We could do the same here, I suppose.


/**
* The config indicating whether group protocol upgrade/downgrade are allowed.
*/
public final ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy;
public ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy() {
return ConsumerGroupMigrationPolicy.parse(
config.getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG));
}
Comment on lines +351 to +354
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to point out that this change is a bit risky in my opinion because it does not ensure during the startup that the migration policy is really correct. If it fails somehow, it will fail later on when consumerGroupMigrationPolicy is accessed for the first time. I wonder if we should keep local attributes and initialize them in the constructor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The validation is addressed by the GroupCoordinatorConfig's config definition.

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala#L182

Hence, the string value is valid in constructing GroupCoordinatorConfig

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. I was thinking about the case where the validation has a bug or is not good enough.


/**
* The compression type used to compress records in batches.
*/
public final CompressionType compressionType;

public GroupCoordinatorConfig(
int numThreads,
int appendLingerMs,
int consumerGroupSessionTimeoutMs,
int consumerGroupHeartbeatIntervalMs,
int consumerGroupMaxSize,
List<ConsumerGroupPartitionAssignor> consumerGroupAssignors,
int offsetsTopicSegmentBytes,
int offsetMetadataMaxSize,
int classicGroupMaxSize,
int classicGroupInitialRebalanceDelayMs,
int classicGroupNewMemberJoinTimeoutMs,
int classicGroupMinSessionTimeoutMs,
int classicGroupMaxSessionTimeoutMs,
long offsetsRetentionCheckIntervalMs,
long offsetsRetentionMs,
int offsetCommitTimeoutMs,
ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy,
CompressionType compressionType
) {
this.numThreads = numThreads;
this.appendLingerMs = appendLingerMs;
this.consumerGroupSessionTimeoutMs = consumerGroupSessionTimeoutMs;
this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs;
this.consumerGroupMaxSize = consumerGroupMaxSize;
this.consumerGroupAssignors = consumerGroupAssignors;
this.offsetsTopicSegmentBytes = offsetsTopicSegmentBytes;
this.offsetMetadataMaxSize = offsetMetadataMaxSize;
this.classicGroupMaxSize = classicGroupMaxSize;
this.classicGroupInitialRebalanceDelayMs = classicGroupInitialRebalanceDelayMs;
this.classicGroupNewMemberJoinTimeoutMs = classicGroupNewMemberJoinTimeoutMs;
this.classicGroupMinSessionTimeoutMs = classicGroupMinSessionTimeoutMs;
this.classicGroupMaxSessionTimeoutMs = classicGroupMaxSessionTimeoutMs;
this.offsetsRetentionCheckIntervalMs = offsetsRetentionCheckIntervalMs;
this.offsetsRetentionMs = offsetsRetentionMs;
this.offsetCommitTimeoutMs = offsetCommitTimeoutMs;
this.consumerGroupMigrationPolicy = consumerGroupMigrationPolicy;
this.compressionType = compressionType;
public CompressionType compressionType() {
return Optional.ofNullable(config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG))
.map(CompressionType::forId)
.orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public GroupCoordinatorService build() {
CoordinatorEventProcessor processor = new MultiThreadedEventProcessor(
logContext,
"group-coordinator-event-processor-",
config.numThreads,
config.numThreads(),
time,
coordinatorRuntimeMetrics
);
Expand All @@ -183,12 +183,12 @@ public GroupCoordinatorService build() {
.withPartitionWriter(writer)
.withLoader(loader)
.withCoordinatorShardBuilderSupplier(supplier)
.withDefaultWriteTimeOut(Duration.ofMillis(config.offsetCommitTimeoutMs))
.withDefaultWriteTimeOut(Duration.ofMillis(config.offsetCommitTimeoutMs()))
.withCoordinatorRuntimeMetrics(coordinatorRuntimeMetrics)
.withCoordinatorMetrics(groupCoordinatorMetrics)
.withSerializer(new CoordinatorRecordSerde())
.withCompression(Compression.of(config.compressionType).build())
.withAppendLingerMs(config.appendLingerMs)
.withCompression(Compression.of(config.compressionType()).build())
.withAppendLingerMs(config.appendLingerMs())
.build();

return new GroupCoordinatorService(
Expand Down Expand Up @@ -296,7 +296,7 @@ public CompletableFuture<ConsumerGroupHeartbeatResponseData> consumerGroupHeartb
return runtime.scheduleWriteOperation(
"consumer-group-heartbeat",
topicPartitionFor(request.groupId()),
Duration.ofMillis(config.offsetCommitTimeoutMs),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.consumerGroupHeartbeat(context, request)
).exceptionally(exception -> handleOperationException(
"consumer-group-heartbeat",
Expand Down Expand Up @@ -331,8 +331,8 @@ public CompletableFuture<JoinGroupResponseData> joinGroup(
);
}

if (request.sessionTimeoutMs() < config.classicGroupMinSessionTimeoutMs ||
request.sessionTimeoutMs() > config.classicGroupMaxSessionTimeoutMs) {
if (request.sessionTimeoutMs() < config.classicGroupMinSessionTimeoutMs() ||
request.sessionTimeoutMs() > config.classicGroupMaxSessionTimeoutMs()) {
return CompletableFuture.completedFuture(new JoinGroupResponseData()
.setMemberId(request.memberId())
.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
Expand All @@ -344,7 +344,7 @@ public CompletableFuture<JoinGroupResponseData> joinGroup(
runtime.scheduleWriteOperation(
"classic-group-join",
topicPartitionFor(request.groupId()),
Duration.ofMillis(config.offsetCommitTimeoutMs),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.classicGroupJoin(context, request, responseFuture)
).exceptionally(exception -> {
if (!responseFuture.isDone()) {
Expand Down Expand Up @@ -387,7 +387,7 @@ public CompletableFuture<SyncGroupResponseData> syncGroup(
runtime.scheduleWriteOperation(
"classic-group-sync",
topicPartitionFor(request.groupId()),
Duration.ofMillis(config.offsetCommitTimeoutMs),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.classicGroupSync(context, request, responseFuture)
).exceptionally(exception -> {
if (!responseFuture.isDone()) {
Expand Down Expand Up @@ -427,7 +427,7 @@ public CompletableFuture<HeartbeatResponseData> heartbeat(
return runtime.scheduleWriteOperation(
"classic-group-heartbeat",
topicPartitionFor(request.groupId()),
Duration.ofMillis(config.offsetCommitTimeoutMs),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.classicGroupHeartbeat(context, request)
).exceptionally(exception -> handleOperationException(
"classic-group-heartbeat",
Expand Down Expand Up @@ -469,7 +469,7 @@ public CompletableFuture<LeaveGroupResponseData> leaveGroup(
return runtime.scheduleWriteOperation(
"classic-group-leave",
topicPartitionFor(request.groupId()),
Duration.ofMillis(config.offsetCommitTimeoutMs),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.classicGroupLeave(context, request)
).exceptionally(exception -> handleOperationException(
"classic-group-leave",
Expand Down Expand Up @@ -682,7 +682,7 @@ public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
runtime.scheduleWriteOperation(
"delete-groups",
topicPartition,
Duration.ofMillis(config.offsetCommitTimeoutMs),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.deleteGroups(context, groupList)
).exceptionally(exception -> handleOperationException(
"delete-groups",
Expand Down Expand Up @@ -736,7 +736,7 @@ public CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetch
return runtime.scheduleWriteOperation(
"fetch-offsets",
topicPartitionFor(request.groupId()),
Duration.ofMillis(config.offsetCommitTimeoutMs),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> new CoordinatorResult<>(
Collections.emptyList(),
coordinator.fetchOffsets(request, Long.MAX_VALUE)
Expand Down Expand Up @@ -787,7 +787,7 @@ public CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetch
return runtime.scheduleWriteOperation(
"fetch-all-offsets",
topicPartitionFor(request.groupId()),
Duration.ofMillis(config.offsetCommitTimeoutMs),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> new CoordinatorResult<>(
Collections.emptyList(),
coordinator.fetchAllOffsets(request, Long.MAX_VALUE)
Expand Down Expand Up @@ -829,7 +829,7 @@ public CompletableFuture<OffsetCommitResponseData> commitOffsets(
return runtime.scheduleWriteOperation(
"commit-offset",
topicPartitionFor(request.groupId()),
Duration.ofMillis(config.offsetCommitTimeoutMs),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.commitOffset(context, request)
).exceptionally(exception -> handleOperationException(
"commit-offset",
Expand Down Expand Up @@ -868,7 +868,7 @@ public CompletableFuture<TxnOffsetCommitResponseData> commitTransactionalOffsets
request.transactionalId(),
request.producerId(),
request.producerEpoch(),
Duration.ofMillis(config.offsetCommitTimeoutMs),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.commitTransactionalOffset(context, request),
context.apiVersion()
).exceptionally(exception -> handleOperationException(
Expand Down Expand Up @@ -903,7 +903,7 @@ public CompletableFuture<OffsetDeleteResponseData> deleteOffsets(
return runtime.scheduleWriteOperation(
"delete-offsets",
topicPartitionFor(request.groupId()),
Duration.ofMillis(config.offsetCommitTimeoutMs),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.deleteOffsets(context, request)
).exceptionally(exception -> handleOperationException(
"delete-offsets",
Expand Down Expand Up @@ -973,7 +973,7 @@ public void onPartitionsDeleted(
FutureUtils.mapExceptionally(
runtime.scheduleWriteAllOperation(
"on-partition-deleted",
Duration.ofMillis(config.offsetCommitTimeoutMs),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.onPartitionsDeleted(topicPartitions)
),
exception -> {
Expand Down Expand Up @@ -1036,7 +1036,7 @@ public Properties groupMetadataTopicConfigs() {
Properties properties = new Properties();
properties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
properties.put(TopicConfig.COMPRESSION_TYPE_CONFIG, BrokerCompressionType.PRODUCER.name);
properties.put(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(config.offsetsTopicSegmentBytes));
properties.put(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(config.offsetsTopicSegmentBytes()));
return properties;
}

Expand Down
Loading