-
Notifications
You must be signed in to change notification settings - Fork 14.4k
KAFKA-17747: [5/N] Replace subscription metadata with metadata hash in stream group (wip) #19802
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
…am group Signed-off-by: PoAn Yang <[email protected]>
this.partitionMetadata.clear(); | ||
this.partitionMetadata.putAll(partitionMetadata); | ||
maybeUpdateConfiguredTopology(); | ||
maybeUpdateGroupState(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To implement KIP-1101 in stream group, we have to remove StreamsGroupPartitionMetadataKey
and StreamsGroupPartitionMetadataValue
. This can save disk storage and the group coordinator still can trigger a rebalance when metadata hash is changed.
In stream group, it tries to calculate configured topology and update group state when partitionMetadata
or topology
data is updated. I wonder why it doesn't save configuredTopology
and state
to records, so it can recover when replaying records?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConfiguredTopology can be derived completely from the records for PartitionMetadata
and Topology
. So we just need to cache it, and can recreate it upon the first heartbeat after failover. We could consider persisting it, but that would just mean storing somewhat duplicate data. state
is also derived from the other records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reply. I will keep working on this PR and change streamsGroupHeartbeat
to rebuild these two fields if they are empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you instead store the group metadata hash along with / inside ConfiguredTopology, and rebuild configuredTopology not only when it is empty, but also when the group metadata hash does not match the current group metadata hash?
Also, that would mean, when we call setTopology
or setPartitionMetadata
inside StreamsGroup
, we will not call maybeUpdateConfiguredTopology
, since it will be only be created in one place - from the streamsGroupHeartbeat
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may not be able to set metadata hash in ConfiguredTopology
. When replaying StreamsGroupMetadataValue
, we need to set metadataHash
back. If we add it to ConfiguredTopology
, that means there will have a non-empty ConfiguredTopology
in the group and the metadataHash
may be the same as latest computed value. Then we don't have a value to compare whether need to compute a new ConfiguredTopology
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I did not mean as a replacement for the TimelineLong
inside, StreamsGroup
. We need an additional hash to be stored inside ConfiguredTopology
, to remember for which state of the topics it was calculated.
…dataValue Signed-off-by: PoAn Yang <[email protected]>
StreamsGroupPartitionMetadataKey
andStreamsGroupPartitionMetadataValue
.configuredTopology
is empty. If it's, callInternalTopicManager.configureTopics
and set the result to the group.