Skip to content

KAFKA-20169: Support static membership for Kafka Streams with the streams rebalance protocol at Server Side.#21565

Open
chickenchickenlove wants to merge 8 commits intoapache:trunkfrom
chickenchickenlove:KAFKA-20169-server
Open

KAFKA-20169: Support static membership for Kafka Streams with the streams rebalance protocol at Server Side.#21565
chickenchickenlove wants to merge 8 commits intoapache:trunkfrom
chickenchickenlove:KAFKA-20169-server

Conversation

@chickenchickenlove
Copy link
Copy Markdown
Contributor

@chickenchickenlove chickenchickenlove commented Feb 24, 2026

Introduction

This PR enables "static membership for Streams group heartbeat" and aligns group-epoch behavior with the KIP-1071 intent for Streams rebalancing.

Notification

  1. According to KIP-1071, the group epoch must be bumped only when one of the following member attributes changes: topology epoch, rack ID, client tags, or process ID. However, the previous implementation determined whether to bump the group epoch by comparing the entire member record. In this PR, the static membership path is updated so that the group epoch is bumped only when at least one of topology epoch, rack ID, client tags, or process ID differs.
  2. When a static member leaves with member epoch -2 and later rejoins with the same instance ID, we treat this as not constituting a member join/leave for the purpose of group-epoch bumping. Accordingly, the implementation does not bump the group epoch in this case, since it does not satisfy the “member join/leave” condition described in KIP-1071.
### KIP-1071 says
The group epoch is bumped:
- When a member joins or leaves the group.
- When a member is fenced or removed from the group by the group coordinator.
- When the partition metadata is updated. For instance when a new partition is added or a new topic matching the - subscribed topics is created.
- When a member with an assigned warm-up task reports a task changelog offset and task changelog end offset whose difference is less than acceptable.recovery.lag.
- When a member updates its topology metadata, rack ID, client tags or process ID. Note: Typically, these do not change within the lifetime of a Streams client, so this only happens when a member with static membership rejoins with an updated configuration.
- When an assignment configuration for the group is updated.

Changes

  • Removed service-layer rejection of static membership in Streams heartbeat.
  • Added static-member validation paths in Streams coordinator logic:
    • unreleased instance id
    • fenced instance id
    • unknown static member
  • Added static Streams member subscribe/replace flow:
    • supports join of new static member
    • supports rejoin replacement with new memberId for same instanceId
    • writes replacement records (tombstone old + create new)
  • Added static leave handling for Streams:
    • -2 (temporary leave): keep static identity and write current-assignment epoch -2
    • -1 (actual leave): fence/remove member and bump group epoch
  • Updated max-size handling:
    • If instanceId already maps to an existing static member, skip max-size rejection for replacement flow
  • Updated assignment continuity for static rejoin:
    • when needed, resolve target assignment from previous static member id
  • Updated epoch bump decision for static rejoin:
    • do not bump on member-id change alone
    • bump only on epoch-relevant metadata changes (topology epoch, rack id, client tags, process id)
  • Added helper builder support for cloning Streams member with a new member id.
  • Added utility method to check whether a static member mapping is currently valid.
  • Add test case.

Scope

  • Client-side changes are out of scope. (The client-side implementation will be addressed in a separate PR.)

Related PR

@github-actions github-actions bot added triage PRs from the community group-coordinator labels Feb 24, 2026
@chickenchickenlove
Copy link
Copy Markdown
Contributor Author

Hi, @lucasbru !
I’ve implemented the server-side logic for static members in KIP-1071!
I’ll split the client-side support into a separate PR and implement it soon.
When you have bandwidth, could you take a look?

Copy link
Copy Markdown
Contributor

@squah-confluent squah-confluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch!

Comment on lines +503 to +516
/**
* @return Returns whether the given instance ID is currently associated with an existing static member.
*/
public boolean staticMemberExist(String instanceId) {
String memberId = staticMembers.get(instanceId);
if (memberId == null)
return false;

StreamsGroupMember member = members.get(memberId);
if (member == null)
return false;
return true;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we follow the same pattern as ConsumerGroup.hasStaticMember?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squah-confluent
It makes sense to me!

This method was added to prevent an unintended GroupMaxSizeReachedException from being thrown when a static member joins within throwsIfStreamsGroupsIsFull.

Unlike ConsumerGroup, StreamsGroup manages members and staticMembers separately. However, given the purpose above, it does not seem necessary to verify whether there is an actual member currently bound to the static member.

Even if we assume a case where a static member exists but no corresponding member exists, the impact on GroupMaxSizeReachedException remains unchanged.

Comment on lines +2166 to +2176
if (instanceId == null) {
targetAssignmentEpoch = group.assignmentEpoch();
targetAssignment = group.targetAssignment(updatedMember.memberId());
} else {
targetAssignmentEpoch = group.assignmentEpoch();
StreamsGroupMember maybeOldStaticMember = group.staticMember(instanceId);
String maybeOldStaticMemberId = maybeOldStaticMember == null ?
updatedMember.memberId() :
maybeOldStaticMember.memberId();
targetAssignment = group.targetAssignment(maybeOldStaticMemberId);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In consumer groups, this is written as

            targetAssignmentEpoch = group.assignmentEpoch();
            targetAssignment = group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());

Could we follow the same pattern?

*
* @return A CoordinatorResult with a single record signifying that the static member is leaving.
*/
private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupStaticMemberGroupLeave(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lucasbru Do we need to reset the assignment epochs of tasks here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for making you confused. 🙇‍♂️
JFYI, This is why I implemented it that way. (AFAIK, KIP-1071 does not define the behavior for this case.)

My understanding is that the coordinator tracks pendingRevocationTasks and uses them to determine whether a task is still owned or not. pendingRevocationTasks represent tasks that the member is expected to revoke and report as revoked in the next heartbeat response.

However, when a static member terminates with epoch -2, we can generally assume the Streams application has shut down. In that case, even if we never receive a follow-up heartbeat response, it is reasonable to treat the revocation as completed.

Therefore, by writing pendingRevocationTasks as EMPTY, another member that is waiting to acquire those tasks can be assigned them immediately, without having to wait for the static member that left with -2 to come back.

What do you think?

@github-actions github-actions bot removed the triage PRs from the community label Mar 2, 2026
Comment on lines +8989 to +8996
static boolean hasEpochRelevantMemberConfigChanged(
StreamsGroupMember oldMember,
StreamsGroupMember newMember
) {
// The group epoch is bumped: (KIP-1071)
// - When a member updates its topology metadata, rack ID, client tags or process ID.
return !Objects.equals(oldMember.topologyEpoch(), newMember.topologyEpoch())
|| !Objects.equals(oldMember.rackId(), newMember.rackId())
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lucasbru @squah-confluent

I’d also like to discuss this.
In KIP-1071, it explicitly states that the group epoch should be bumped only in those specific cases.

The group epoch is bumped:

- When a member joins or leaves the group.
- When a member is fenced or removed from the group by the group coordinator.
- When the partition metadata is updated. For instance when a new partition is added or a new topic matching the subscribed topics is created.
- When a member with an assigned warm-up task reports a task changelog offset and task changelog end offset whose difference is less than acceptable.recovery.lag.
- When a member updates its topology metadata, rack ID, client tags or process ID. Note: Typically, these do not change within the lifetime of a Streams client, so this only happens when a member with static membership rejoins with an updated configuration.
- When an assignment configuration for the group is updated.

However, the current code seems to evaluate the group-epoch bump condition more strictly than what KIP-1071 describes.

On the other hand, KIP-1071 also includes the following sentence:

Updates information of the member if needed. The group epoch is incremented if there is any change.

So there appears to be a conflict between the explicitly listed conditions for bumping the group epoch and the statement that the group epoch is incremented whenever anything changes. I think we should resolve this inconsistency. Once it’s clarified, we may need to adjust this method and the call sites that depend on it.

What do you think?
If I’m misunderstanding anything, please let me know! 🙇‍♂️

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the difference is mainly in configs that are always fixed in Kafka Streams clients

  • rebalance.timeout.ms
  • user.endpoint
  • client ID
  • client Host

So it did not make a difference before. But you are right that with static members, this does make a difference since we can restart the client without a rebalance.

But I think it would be fine to then always call this method, even for dynamic members.

Copy link
Copy Markdown
Contributor Author

@chickenchickenlove chickenchickenlove Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood!
Would it make sense to split this into a separate PR so we can keep the scope focused on static membership?
Which would you prefer?

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Mar 11, 2026

I did not review this PR, but I just had a question / comment about this. If this PR does address this already, great, but based on the comments (which I skimmed over) I believe the PR might not do this.

For static group membership, when a member re-joins, there is a couple of corner case for which it it not enough to just return the previous assignment (or just bump the epoch), but we need to do more:

  • application.server config changed -- for this case, we don't need to compute a new task assignment, but we need to ensure that the new application.server config is broadcasted to all other members
  • client.tags config changes -- if standby tasks are enabled (ie, set to > 1), I think we need to re-compute the standby task assignment (not full assignment)
  • rack.id -- this one is a little tricky -- it's not used broker side atm, but it might be used to compute an assignment -- for a static member, I would not really expect it to change, but if it does change, I think we need to re-compute the full assignment
  • for processId I am not sure? -- for dynamic member, we have a test that verifies that we compute a new assignment (not 100% sure why though? -- Maybe Lucas can explain?)
  • are there any other thing that could change and also require specific handling?

About Lucas' comment:

I think the difference is mainly in configs that are always fixed in Kafka Streams clients

rebalance.timeout.ms
user.endpoint
client ID
client Host

For static member, when they are stopped and restarted, we cannot guarantee that the configs do not change. So we need to consider this case IMHO -- as a matter of fact, I know explicitly for application.server config (aka user.endpoint) that it was an issue in the "classic" protocol reported by users, that no rebalance was triggered if this config was update for static members... It was not really possible to fix for "classic" protocol, but we should ensure that we fix it for 1071.

Of course, bumping the epoch will be required in all these cases (not trying to say otherwise), but what I am saying is that bumping the epoch does not imply, that we need to re-compute the assignment for all cases, nor that it is sufficient to only bump the epoch for all cases.

Btw: these things are very subtle, and we will need to update the documentation accordingly to explain how this all works -- wondering if it might even make sense to update the KIP with it? It's more than an implementation detail IMHO.

Curious to hear what you think about this?

@lucasbru
Copy link
Copy Markdown
Member

@mjsax What you are saying was exactly my point, when I said "So it did not make a difference before. But you are right that with static members, this does make a difference since we can restart the client without a rebalance." And that's also how it was specified in KIP-1071. In the current code, we simplified the logic here since for dynamic members those properties won't change in practice, but we went the safe route to bump the group epoch and reassign if they change (which I guess can only happen in a non-standard implemtation of KS).

KIP-1071 clearly specifies which properties need to bump the group epoch, in the first paragraph @chickenchickenlove mentioned. The second one seems like an oversimplification. But yeah, it's good to go through the list of properties again and make sure the KIP as updated correctly (since we have made bunch of changes). So KIP-1071 species

  • Topology metadata - This doesn't matter yet, since topology metadata is fixed ATM
  • Rack ID - Correct, since it can influence assignment. Note that consumer groups do not rebalance if rack.id of the static member changes. But I would agree we should call the assignor again.
  • Client tags - Correct, as above.
  • Process ID - Correct, this does influence assignment of standby tasks (they cannot be assigned to a process which already owns the active task), so call assignor again. However, this can be tricky for the assignor, because changing the process ID can mean that our current assignment is now invalid (since one process now owns active + static version of a task). This is something we need to test for -- or, as a simplification, we could just disallow process ID changes for static members. I'd like to keep it simple here, and add extra complications when there is a need for it, wdyt @mjsax .

So I think the list looks okay. Here are other things that can change without a group epoch bump:

  • user.endpoint -- this does not influence the assignment at all. However, we do need to handle it via the endpointInformationEpoch.
  • client ID -- does not influence assignment.
  • client Host -- does not influence assignment.
  • rebalance.timeout.ms -- does not influence assignment.

So I would for this particular check, the list in KIP-1071 good. But yeah, we need to take care to handle user.endpoint and process ID correctly for static members.

@chickenchickenlove
Copy link
Copy Markdown
Contributor Author

@mjsax @lucasbru
Thank you both for your comments!
I think some of the points you mentioned are already addressed, while others are not yet covered.

For example, cases where assignments need to be recalculated seem to be handled (topology, processId, clientTags, rackId). However, I haven’t verified yet whether changes to application.server in a static member are propagated to other members. I will validate this through test code.

I have one question: in this PR, should we consider scenarios where static and dynamic members coexist? This may be related to online migration, but I’m not sure whether it should be handled within the scope of this PR or addressed as part of the online migration work separately.

If there are any other aspects you think I should consider, please let me know.
I’ll review my code and update both the implementation and the test cases accordingly 🙇‍♂️

@lucasbru
Copy link
Copy Markdown
Member

I have one question: in this PR, should we consider scenarios where static and dynamic members coexist? This may be related to online migration, but I’m not sure whether it should be handled within the scope of this PR or addressed as part of the online migration work separately.

Static and dynamic members can definitely coexist. But this is not really related to online migration.

@chickenchickenlove
Copy link
Copy Markdown
Contributor Author

Static and dynamic members can definitely coexist.

Thanks for your comments!
Let me check more about this.

@lucasbru
Copy link
Copy Markdown
Member

@chickenchickenlove are you still working on thsi?

@chickenchickenlove
Copy link
Copy Markdown
Contributor Author

@lucasbru thanks for your comment!

Yes! Also, I think it will be ready for review once I add some test cases covering a scenario where static and dynamic members coexist. I have not written that scenario yet, but I should be able to add it soon.

If there is anything else you would like me to check before the review,
Please let me know.

@squah-confluent
Copy link
Copy Markdown
Contributor

When resolving the merge conflict, we must remember to update UpdateTargetAssignmentResult.fromLastTargetAssignment(StreamsGroup, Optional<StreamsGroupMember>) to call the static member overload of StreamsGroup.targetAssignment.

@chickenchickenlove
Copy link
Copy Markdown
Contributor Author

Thanks @squah-confluent !
I’ll resolve the conflicts by rebasing onto trunk or merging trunk into my branch, and then incorporate your feedback 🙇‍♂️

@lucasbru
Copy link
Copy Markdown
Member

We should remember to update tests/kafkatest/tests/streams/streams_static_membership_test.py to include the new protocol.

@chickenchickenlove
Copy link
Copy Markdown
Contributor Author

While rebasing onto trunk, I found that the diff had become too large for me to confidently carry the rebase through correctly. So I decided not to continue with the rebase onto trunk, and instead proceeded as follows.

  1. I created a new branch, branch A, based on the current trunk, and reapplied the previous commits one by one.
  2. I then replaced KAFKA-20169-server with branch A and pushed it using --force-with-lease.

In the process, the previous commit history was removed. I’m sorry that this also removed the parts you had already taken the time to review.

I believe this PR will be ready for review once I complete the following remaining items, and I will ping you again once they are done.

  • Add a test case covering the scenario where dynamic and static members coexist
  • Add code to tests/kafkatest/tests/streams/streams_static_membership_test.py

@chickenchickenlove
Copy link
Copy Markdown
Contributor Author

I added client-side implementation here. (originally, it was implemented in #21603)
In that PR, I left these comments.

m1a2st , Hi!
I’m currently working on implementing KAFKA-20169, which adds Static Membership support for StreamsGroup.

During the client-side work, I realized I had to implement part of what’s being discussed in KIP-1284. Specifically, for static members, the client should be able to leave with epoch -2, but at the moment it always leaves with -1. Because of that, I had to adjust a portion of the code related to close(CloseOptions).
...

Given this, we may need to wait until KIP-1284 is incorporated.

@chickenchickenlove
Copy link
Copy Markdown
Contributor Author

About tests/kafkatest/tests/streams/streams_static_membership_test.py, I'm not familiar with this and ducktape and there is no streams group protocol example in tests/kafkatest/services/streams.py. For that reason, it may take me some time to put together the appropriate test cases, and I would appreciate your understanding. 🙇‍♂️

@lucasbru
Copy link
Copy Markdown
Member

@chickenchickenlove yes, we are close to accepting KIP-1284, hopefully it's going to be implemented soon. I'll push some other committer to look at it as well.

As for streams_static_membership_test, have a look at streams_smoke_test. It runs for both the old and the new protocol.

@chickenchickenlove
Copy link
Copy Markdown
Contributor Author

@lucasbru
I added duck tape test code for static membership of streams group.
I think this PR can be ready to be reviewed.
When you get a chance, please take a look. 🙇‍♂️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants