Skip to content

KAFKA-17747: [5/N] Replace subscription metadata with metadata hash in stream group (wip) #19802

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from

Conversation

FrankYang0529
Copy link
Member

@FrankYang0529 FrankYang0529 commented May 24, 2025

  • Use metadata hash to replace subscription metadata.
  • Remove StreamsGroupPartitionMetadataKey and
    StreamsGroupPartitionMetadataValue.
  • Check whether configuredTopology is empty. If it's, call
    InternalTopicManager.configureTopics and set the result to the group.

this.partitionMetadata.clear();
this.partitionMetadata.putAll(partitionMetadata);
maybeUpdateConfiguredTopology();
maybeUpdateGroupState();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To implement KIP-1101 in stream group, we have to remove StreamsGroupPartitionMetadataKey and StreamsGroupPartitionMetadataValue. This can save disk storage and the group coordinator still can trigger a rebalance when metadata hash is changed.

In stream group, it tries to calculate configured topology and update group state when partitionMetadata or topology data is updated. I wonder why it doesn't save configuredTopology and state to records, so it can recover when replaying records?

cc @lucasbru @bbejeck @chia7712

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConfiguredTopology can be derived completely from the records for PartitionMetadata and Topology. So we just need to cache it, and can recreate it upon the first heartbeat after failover. We could consider persisting it, but that would just mean storing somewhat duplicate data. state is also derived from the other records.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reply. I will keep working on this PR and change streamsGroupHeartbeat to rebuild these two fields if they are empty.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you instead store the group metadata hash along with / inside ConfiguredTopology, and rebuild configuredTopology not only when it is empty, but also when the group metadata hash does not match the current group metadata hash?

Also, that would mean, when we call setTopology or setPartitionMetadata inside StreamsGroup, we will not call maybeUpdateConfiguredTopology, since it will be only be created in one place - from the streamsGroupHeartbeat.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may not be able to set metadata hash in ConfiguredTopology. When replaying StreamsGroupMetadataValue, we need to set metadataHash back. If we add it to ConfiguredTopology, that means there will have a non-empty ConfiguredTopology in the group and the metadataHash may be the same as latest computed value. Then we don't have a value to compare whether need to compute a new ConfiguredTopology.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I did not mean as a replacement for the TimelineLong inside, StreamsGroup. We need an additional hash to be stored inside ConfiguredTopology, to remember for which state of the topics it was calculated.

@FrankYang0529 FrankYang0529 marked this pull request as ready for review May 26, 2025 15:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants