Skip to content

KAFKA-17747: [5/N] Replace subscription metadata with metadata hash in stream group #19802

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 3, 2025

Conversation

FrankYang0529
Copy link
Member

@FrankYang0529 FrankYang0529 commented May 24, 2025

  • Use metadata hash to replace subscription metadata.
  • Remove StreamsGroupPartitionMetadataKey and
    StreamsGroupPartitionMetadataValue.
  • Check whether configuredTopology is empty. If it's, call
    InternalTopicManager.configureTopics and set the result to the group.

Reviewers: Lucas Brutschy [email protected]

Verified

This commit was signed with the committer’s verified signature.
FrankYang0529 PoAn Yang
…am group

Signed-off-by: PoAn Yang <[email protected]>
this.partitionMetadata.clear();
this.partitionMetadata.putAll(partitionMetadata);
maybeUpdateConfiguredTopology();
maybeUpdateGroupState();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To implement KIP-1101 in stream group, we have to remove StreamsGroupPartitionMetadataKey and StreamsGroupPartitionMetadataValue. This can save disk storage and the group coordinator still can trigger a rebalance when metadata hash is changed.

In stream group, it tries to calculate configured topology and update group state when partitionMetadata or topology data is updated. I wonder why it doesn't save configuredTopology and state to records, so it can recover when replaying records?

cc @lucasbru @bbejeck @chia7712

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConfiguredTopology can be derived completely from the records for PartitionMetadata and Topology. So we just need to cache it, and can recreate it upon the first heartbeat after failover. We could consider persisting it, but that would just mean storing somewhat duplicate data. state is also derived from the other records.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reply. I will keep working on this PR and change streamsGroupHeartbeat to rebuild these two fields if they are empty.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you instead store the group metadata hash along with / inside ConfiguredTopology, and rebuild configuredTopology not only when it is empty, but also when the group metadata hash does not match the current group metadata hash?

Also, that would mean, when we call setTopology or setPartitionMetadata inside StreamsGroup, we will not call maybeUpdateConfiguredTopology, since it will be only be created in one place - from the streamsGroupHeartbeat.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may not be able to set metadata hash in ConfiguredTopology. When replaying StreamsGroupMetadataValue, we need to set metadataHash back. If we add it to ConfiguredTopology, that means there will have a non-empty ConfiguredTopology in the group and the metadataHash may be the same as latest computed value. Then we don't have a value to compare whether need to compute a new ConfiguredTopology.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I did not mean as a replacement for the TimelineLong inside, StreamsGroup. We need an additional hash to be stored inside ConfiguredTopology, to remember for which state of the topics it was calculated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lucasbru, I add metadataHash to ConfiguredTopology and keep TimelineLong in StreamsGroup. Could you review again when you have time? Thank you.

Verified

This commit was signed with the committer’s verified signature.
FrankYang0529 PoAn Yang

Verified

This commit was signed with the committer’s verified signature.
FrankYang0529 PoAn Yang
…dataValue

Signed-off-by: PoAn Yang <[email protected]>
@FrankYang0529 FrankYang0529 marked this pull request as ready for review May 26, 2025 15:03

Verified

This commit was signed with the committer’s verified signature.
FrankYang0529 PoAn Yang

Verified

This commit was signed with the committer’s verified signature.
FrankYang0529 PoAn Yang
Signed-off-by: PoAn Yang <[email protected]>
@FrankYang0529 FrankYang0529 requested a review from lucasbru May 30, 2025 05:46
@FrankYang0529 FrankYang0529 changed the title KAFKA-17747: [5/N] Replace subscription metadata with metadata hash in stream group (wip) KAFKA-17747: [5/N] Replace subscription metadata with metadata hash in stream group May 30, 2025
Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly looking good to me! Just very minor comments on the testing side. I also just kicked off a system test run to check if things are still working as intended (but they should).

)
.build();

context.groupMetadataManager.getStreamsGroupOrThrow(groupId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. If possible, let's make the ConfiguredTopology explicit here.

)
.build();

context.groupMetadataManager.getStreamsGroupOrThrow(groupId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

)
.build();

context.groupMetadataManager.getStreamsGroupOrThrow(groupId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

)
.build();

context.groupMetadataManager.getStreamsGroupOrThrow(groupId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3)
)
));
context.groupMetadataManager.getStreamsGroupOrThrow(groupId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

@lucasbru lucasbru added the KIP-1071 PRs related to KIP-1071 label Jun 2, 2025

Verified

This commit was signed with the committer’s verified signature.
FrankYang0529 PoAn Yang
@lucasbru
Copy link
Member

lucasbru commented Jun 3, 2025

https://confluent-open-source-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/trunk/2025-06-02--001.c311e62d-9e3f-4594-b4e6-3fac3fa04638--1748907426--FrankYang0529--KAFKA-17747-5--1cc843467c/report.html

System test results look good, the failing tests are a problem already fixed on trunk.

Verified

This commit was signed with the committer’s verified signature.
FrankYang0529 PoAn Yang
Signed-off-by: PoAn Yang <[email protected]>
Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks

@lucasbru lucasbru merged commit 425f028 into apache:trunk Jun 3, 2025
24 checks passed
@FrankYang0529 FrankYang0529 deleted the KAFKA-17747-5 branch June 3, 2025 11:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
KIP-1071 PRs related to KIP-1071
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants