-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16909 Refactor GroupCoordinatorConfig with AbstractConfig #16458
Changes from 1 commit
202a950
a761752
1231164
a23ea99
ffee5fc
524bd55
4c21247
bad7334
9e75d88
773effc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
*/ | ||
package org.apache.kafka.coordinator.group; | ||
|
||
import org.apache.kafka.common.config.AbstractConfig; | ||
import org.apache.kafka.common.config.ConfigDef; | ||
import org.apache.kafka.common.record.CompressionType; | ||
import org.apache.kafka.common.utils.Utils; | ||
|
@@ -26,6 +27,7 @@ | |
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; | ||
|
@@ -211,77 +213,111 @@ public class GroupCoordinatorConfig { | |
*/ | ||
public static final int CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS = 5 * 60 * 1000; | ||
|
||
private final AbstractConfig config; | ||
|
||
public GroupCoordinatorConfig(AbstractConfig config) { | ||
this.config = config; | ||
} | ||
|
||
/** | ||
* The number of threads or event loops running. | ||
*/ | ||
public final int numThreads; | ||
public int numThreads() { | ||
return config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG); | ||
} | ||
|
||
/** | ||
* The duration in milliseconds that the coordinator will wait for writes to | ||
* accumulate before flushing them to disk. | ||
*/ | ||
public final int appendLingerMs; | ||
public int appendLingerMs() { | ||
return config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG); | ||
} | ||
|
||
/** | ||
* The consumer group session timeout in milliseconds. | ||
*/ | ||
public final int consumerGroupSessionTimeoutMs; | ||
public int consumerGroupSessionTimeoutMs() { | ||
return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG); | ||
} | ||
|
||
/** | ||
* The consumer group heartbeat interval in milliseconds. | ||
*/ | ||
public final int consumerGroupHeartbeatIntervalMs; | ||
public int consumerGroupHeartbeatIntervalMs() { | ||
return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG); | ||
} | ||
|
||
/** | ||
* The consumer group maximum size. | ||
*/ | ||
public final int consumerGroupMaxSize; | ||
public int consumerGroupMaxSize() { | ||
return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG); | ||
} | ||
|
||
/** | ||
* The consumer group assignors. | ||
*/ | ||
public final List<ConsumerGroupPartitionAssignor> consumerGroupAssignors; | ||
public List<ConsumerGroupPartitionAssignor> consumerGroupAssignors() { | ||
return config.getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, ConsumerGroupPartitionAssignor.class); | ||
} | ||
|
||
/** | ||
* The offsets topic segment bytes should be kept relatively small to facilitate faster | ||
* log compaction and faster offset loads. | ||
*/ | ||
public final int offsetsTopicSegmentBytes; | ||
public int offsetsTopicSegmentBytes() { | ||
return config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG); | ||
} | ||
|
||
/** | ||
* The maximum size for a metadata entry associated with an offset commit. | ||
*/ | ||
public final int offsetMetadataMaxSize; | ||
public int offsetMetadataMaxSize() { | ||
return config.getInt(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG); | ||
} | ||
|
||
/** | ||
* The classic group maximum size. | ||
*/ | ||
public final int classicGroupMaxSize; | ||
public int classicGroupMaxSize() { | ||
return config.getInt(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG); | ||
} | ||
|
||
/** | ||
* The delay in milliseconds introduced for the first rebalance of a classic group. | ||
*/ | ||
public final int classicGroupInitialRebalanceDelayMs; | ||
public int classicGroupInitialRebalanceDelayMs() { | ||
return config.getInt(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG); | ||
} | ||
|
||
/** | ||
* The timeout used to wait for a new member in milliseconds. | ||
*/ | ||
public final int classicGroupNewMemberJoinTimeoutMs; | ||
public int classicGroupNewMemberJoinTimeoutMs() { | ||
return CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS; | ||
} | ||
|
||
/** | ||
* The classic group minimum session timeout. | ||
*/ | ||
public final int classicGroupMinSessionTimeoutMs; | ||
public int classicGroupMinSessionTimeoutMs() { | ||
return config.getInt(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG); | ||
} | ||
|
||
/** | ||
* The classic group maximum session timeout. | ||
*/ | ||
public final int classicGroupMaxSessionTimeoutMs; | ||
public int classicGroupMaxSessionTimeoutMs() { | ||
return config.getInt(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG); | ||
} | ||
|
||
/** | ||
* Frequency at which to check for expired offsets. | ||
*/ | ||
public final long offsetsRetentionCheckIntervalMs; | ||
public long offsetsRetentionCheckIntervalMs() { | ||
return config.getLong(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG); | ||
} | ||
|
||
/** | ||
* For subscribed consumers, committed offset of a specific partition will be expired and discarded when: | ||
|
@@ -297,61 +333,32 @@ public class GroupCoordinatorConfig { | |
* Also, when a topic is deleted via the delete-topic request, upon propagated metadata update any group's | ||
* committed offsets for that topic will also be deleted without extra retention period. | ||
*/ | ||
public final long offsetsRetentionMs; | ||
public long offsetsRetentionMs() { | ||
return config.getInt(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG) * 60L * 1000L; | ||
} | ||
|
||
/** | ||
* Offset commit will be delayed until all replicas for the offsets topic receive the commit | ||
* or this timeout is reached | ||
*/ | ||
public final int offsetCommitTimeoutMs; | ||
public int offsetCommitTimeoutMs() { | ||
return config.getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); | ||
} | ||
Comment on lines
+344
to
+346
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an example of the config which is accessed extremely frequently (there are other like this too). I think that having attributes would be better as it avoid having to look it up in the config every time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a good question! The approach of this PR is to make sure
WDYT? BTW, we had a related discussion in https://issues.apache.org/jira/browse/KAFKA-17001 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. None of the configs in this class are dynamic so I am not sure that it is worth it. We could perhaps mention it in the javadoc of the class. If we introduce a dynamic config, we should indeed not use an attribute for it. For the context, in KafkaConfig, we always had the distinction between val (static values) and def (dynamic ones). We could do the same here, I suppose. |
||
|
||
/** | ||
* The config indicating whether group protocol upgrade/downgrade are allowed. | ||
*/ | ||
public final ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy; | ||
public ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy() { | ||
return ConsumerGroupMigrationPolicy.parse( | ||
config.getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG)); | ||
} | ||
Comment on lines
+351
to
+354
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would like to point out that this change is a bit risky in my opinion because it does not ensure during the startup that the migration policy is really correct. If it fails somehow, it will fail later on when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The validation is addressed by the https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala#L182 Hence, the string value is valid in constructing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed. I was thinking about the case where the validation has a bug or is not good enough. |
||
|
||
/** | ||
* The compression type used to compress records in batches. | ||
*/ | ||
public final CompressionType compressionType; | ||
|
||
public GroupCoordinatorConfig( | ||
int numThreads, | ||
int appendLingerMs, | ||
int consumerGroupSessionTimeoutMs, | ||
int consumerGroupHeartbeatIntervalMs, | ||
int consumerGroupMaxSize, | ||
List<ConsumerGroupPartitionAssignor> consumerGroupAssignors, | ||
int offsetsTopicSegmentBytes, | ||
int offsetMetadataMaxSize, | ||
int classicGroupMaxSize, | ||
int classicGroupInitialRebalanceDelayMs, | ||
int classicGroupNewMemberJoinTimeoutMs, | ||
int classicGroupMinSessionTimeoutMs, | ||
int classicGroupMaxSessionTimeoutMs, | ||
long offsetsRetentionCheckIntervalMs, | ||
long offsetsRetentionMs, | ||
int offsetCommitTimeoutMs, | ||
ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy, | ||
CompressionType compressionType | ||
) { | ||
this.numThreads = numThreads; | ||
this.appendLingerMs = appendLingerMs; | ||
this.consumerGroupSessionTimeoutMs = consumerGroupSessionTimeoutMs; | ||
this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs; | ||
this.consumerGroupMaxSize = consumerGroupMaxSize; | ||
this.consumerGroupAssignors = consumerGroupAssignors; | ||
this.offsetsTopicSegmentBytes = offsetsTopicSegmentBytes; | ||
this.offsetMetadataMaxSize = offsetMetadataMaxSize; | ||
this.classicGroupMaxSize = classicGroupMaxSize; | ||
this.classicGroupInitialRebalanceDelayMs = classicGroupInitialRebalanceDelayMs; | ||
this.classicGroupNewMemberJoinTimeoutMs = classicGroupNewMemberJoinTimeoutMs; | ||
this.classicGroupMinSessionTimeoutMs = classicGroupMinSessionTimeoutMs; | ||
this.classicGroupMaxSessionTimeoutMs = classicGroupMaxSessionTimeoutMs; | ||
this.offsetsRetentionCheckIntervalMs = offsetsRetentionCheckIntervalMs; | ||
this.offsetsRetentionMs = offsetsRetentionMs; | ||
this.offsetCommitTimeoutMs = offsetCommitTimeoutMs; | ||
this.consumerGroupMigrationPolicy = consumerGroupMigrationPolicy; | ||
this.compressionType = compressionType; | ||
public CompressionType compressionType() { | ||
return Optional.ofNullable(config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG)) | ||
.map(CompressionType::forId) | ||
.orElse(null); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add a method to
KafkaConfig
to returnGroupCoordinatorConfig
? With that change, we can remove all GroupCoordinatorConfig-related getters fromKafkaConfig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the suggestion! I have added the method to KafkaConfig to return GroupCoordinatorConfig and removed all related getters. This change is included in the latest commits.