-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16909 Refactor GroupCoordinatorConfig with AbstractConfig #16458
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brandboat thanks for this patch
config.consumerGroupMigrationPolicy, | ||
config.offsetsTopicCompressionType | ||
) | ||
val groupCoordinatorConfig = new GroupCoordinatorConfig(config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add a method to KafkaConfig
to return GroupCoordinatorConfig
? With that change, we can remove all GroupCoordinatorConfig-related getters from KafkaConfig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the suggestion! I have added the method to KafkaConfig to return GroupCoordinatorConfig and removed all related getters. This change is included in the latest commits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brandboat thanks for this patch
@@ -2576,15 +2577,15 @@ public void testCleanupExpiredOffsetsWithPendingTransactionalOffsets() { | |||
|
|||
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() | |||
.withGroupMetadataManager(groupMetadataManager) | |||
.withOffsetsRetentionMs(1000) | |||
.withOffsetsRetentionMinutes(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure why we need this change. Also, the value is changed from 1 second to 1 minute?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the original time unit of config offsets.retention.minutes
is in minutes, and before this refactor we pass offsets.retention
in millis to GroupCoordinatorConfig constructor and use it in OffsetMetadataManagerTest, now we pass AbstractConfig (i.e. KafkaConfig) to GroupCoordinatorConfig, which means we should pass offsets.retention in minutes instead of ms
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DISABLED.name()); | ||
configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, (int) CompressionType.NONE.id); | ||
|
||
return new GroupCoordinatorConfig(new GroupCoordinatorTestConfig(configs)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return new GroupCoordinatorConfig(new AbstractConfig(Utils.mergeConfigs(Arrays.asList(
GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF,
GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF,
GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF)),
configs, false));
WDYT? we don't need the temporary class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do, thanks for the suggestion.
val groupCoordinatorAppendLingerMs = getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG) | ||
|
||
/** Consumer group configs */ | ||
val consumerGroupSessionTimeoutMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@OmniaGM Could you please take a look? Does it follow your idea that we should move getters out of KafkaConfig
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
related to https://issues.apache.org/jira/browse/KAFKA-16909
as title, this pr follows RemoteLogManagerConfig.java, pass AbstractConfig to constructor.
Committer Checklist (excluded from commit message)