KAFKA-20169: Support static membership for Kafka Streams with the streams rebalance protocol at Server Side.#21565
Conversation
|
Hi, @lucasbru ! |
squah-confluent
left a comment
There was a problem hiding this comment.
Thanks for the patch!
| /** | ||
| * @return Returns whether the given instance ID is currently associated with an existing static member. | ||
| */ | ||
| public boolean staticMemberExist(String instanceId) { | ||
| String memberId = staticMembers.get(instanceId); | ||
| if (memberId == null) | ||
| return false; | ||
|
|
||
| StreamsGroupMember member = members.get(memberId); | ||
| if (member == null) | ||
| return false; | ||
| return true; | ||
| } | ||
|
|
There was a problem hiding this comment.
Could we follow the same pattern as ConsumerGroup.hasStaticMember?
There was a problem hiding this comment.
@squah-confluent
It makes sense to me!
This method was added to prevent an unintended GroupMaxSizeReachedException from being thrown when a static member joins within throwsIfStreamsGroupsIsFull.
Unlike ConsumerGroup, StreamsGroup manages members and staticMembers separately. However, given the purpose above, it does not seem necessary to verify whether there is an actual member currently bound to the static member.
Even if we assume a case where a static member exists but no corresponding member exists, the impact on GroupMaxSizeReachedException remains unchanged.
| if (instanceId == null) { | ||
| targetAssignmentEpoch = group.assignmentEpoch(); | ||
| targetAssignment = group.targetAssignment(updatedMember.memberId()); | ||
| } else { | ||
| targetAssignmentEpoch = group.assignmentEpoch(); | ||
| StreamsGroupMember maybeOldStaticMember = group.staticMember(instanceId); | ||
| String maybeOldStaticMemberId = maybeOldStaticMember == null ? | ||
| updatedMember.memberId() : | ||
| maybeOldStaticMember.memberId(); | ||
| targetAssignment = group.targetAssignment(maybeOldStaticMemberId); | ||
| } |
There was a problem hiding this comment.
In consumer groups, this is written as
targetAssignmentEpoch = group.assignmentEpoch();
targetAssignment = group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
Could we follow the same pattern?
| * | ||
| * @return A CoordinatorResult with a single record signifying that the static member is leaving. | ||
| */ | ||
| private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupStaticMemberGroupLeave( |
There was a problem hiding this comment.
@lucasbru Do we need to reset the assignment epochs of tasks here?
There was a problem hiding this comment.
Sorry for making you confused. 🙇♂️
JFYI, This is why I implemented it that way. (AFAIK, KIP-1071 does not define the behavior for this case.)
My understanding is that the coordinator tracks pendingRevocationTasks and uses them to determine whether a task is still owned or not. pendingRevocationTasks represent tasks that the member is expected to revoke and report as revoked in the next heartbeat response.
However, when a static member terminates with epoch -2, we can generally assume the Streams application has shut down. In that case, even if we never receive a follow-up heartbeat response, it is reasonable to treat the revocation as completed.
Therefore, by writing pendingRevocationTasks as EMPTY, another member that is waiting to acquire those tasks can be assigned them immediately, without having to wait for the static member that left with -2 to come back.
What do you think?
| static boolean hasEpochRelevantMemberConfigChanged( | ||
| StreamsGroupMember oldMember, | ||
| StreamsGroupMember newMember | ||
| ) { | ||
| // The group epoch is bumped: (KIP-1071) | ||
| // - When a member updates its topology metadata, rack ID, client tags or process ID. | ||
| return !Objects.equals(oldMember.topologyEpoch(), newMember.topologyEpoch()) | ||
| || !Objects.equals(oldMember.rackId(), newMember.rackId()) |
There was a problem hiding this comment.
I’d also like to discuss this.
In KIP-1071, it explicitly states that the group epoch should be bumped only in those specific cases.
The group epoch is bumped:
- When a member joins or leaves the group.
- When a member is fenced or removed from the group by the group coordinator.
- When the partition metadata is updated. For instance when a new partition is added or a new topic matching the subscribed topics is created.
- When a member with an assigned warm-up task reports a task changelog offset and task changelog end offset whose difference is less than acceptable.recovery.lag.
- When a member updates its topology metadata, rack ID, client tags or process ID. Note: Typically, these do not change within the lifetime of a Streams client, so this only happens when a member with static membership rejoins with an updated configuration.
- When an assignment configuration for the group is updated.
However, the current code seems to evaluate the group-epoch bump condition more strictly than what KIP-1071 describes.
On the other hand, KIP-1071 also includes the following sentence:
Updates information of the member if needed. The group epoch is incremented if there is any change.
So there appears to be a conflict between the explicitly listed conditions for bumping the group epoch and the statement that the group epoch is incremented whenever anything changes. I think we should resolve this inconsistency. Once it’s clarified, we may need to adjust this method and the call sites that depend on it.
What do you think?
If I’m misunderstanding anything, please let me know! 🙇♂️
There was a problem hiding this comment.
I think the difference is mainly in configs that are always fixed in Kafka Streams clients
- rebalance.timeout.ms
- user.endpoint
- client ID
- client Host
So it did not make a difference before. But you are right that with static members, this does make a difference since we can restart the client without a rebalance.
But I think it would be fine to then always call this method, even for dynamic members.
There was a problem hiding this comment.
Understood!
Would it make sense to split this into a separate PR so we can keep the scope focused on static membership?
Which would you prefer?
|
I did not review this PR, but I just had a question / comment about this. If this PR does address this already, great, but based on the comments (which I skimmed over) I believe the PR might not do this. For static group membership, when a member re-joins, there is a couple of corner case for which it it not enough to just return the previous assignment (or just bump the epoch), but we need to do more:
About Lucas' comment: For static member, when they are stopped and restarted, we cannot guarantee that the configs do not change. So we need to consider this case IMHO -- as a matter of fact, I know explicitly for Of course, bumping the epoch will be required in all these cases (not trying to say otherwise), but what I am saying is that bumping the epoch does not imply, that we need to re-compute the assignment for all cases, nor that it is sufficient to only bump the epoch for all cases. Btw: these things are very subtle, and we will need to update the documentation accordingly to explain how this all works -- wondering if it might even make sense to update the KIP with it? It's more than an implementation detail IMHO. Curious to hear what you think about this? |
|
@mjsax What you are saying was exactly my point, when I said "So it did not make a difference before. But you are right that with static members, this does make a difference since we can restart the client without a rebalance." And that's also how it was specified in KIP-1071. In the current code, we simplified the logic here since for dynamic members those properties won't change in practice, but we went the safe route to bump the group epoch and reassign if they change (which I guess can only happen in a non-standard implemtation of KS). KIP-1071 clearly specifies which properties need to bump the group epoch, in the first paragraph @chickenchickenlove mentioned. The second one seems like an oversimplification. But yeah, it's good to go through the list of properties again and make sure the KIP as updated correctly (since we have made bunch of changes). So KIP-1071 species
So I think the list looks okay. Here are other things that can change without a group epoch bump:
So I would for this particular check, the list in KIP-1071 good. But yeah, we need to take care to handle |
|
@mjsax @lucasbru For example, cases where assignments need to be recalculated seem to be handled (topology, processId, clientTags, rackId). However, I haven’t verified yet whether changes to application.server in a static member are propagated to other members. I will validate this through test code. I have one question: in this PR, should we consider scenarios where static and dynamic members coexist? This may be related to online migration, but I’m not sure whether it should be handled within the scope of this PR or addressed as part of the online migration work separately. If there are any other aspects you think I should consider, please let me know. |
Static and dynamic members can definitely coexist. But this is not really related to online migration. |
Thanks for your comments! |
|
@chickenchickenlove are you still working on thsi? |
|
@lucasbru thanks for your comment! Yes! Also, I think it will be ready for review once I add some test cases covering a scenario where static and dynamic members coexist. I have not written that scenario yet, but I should be able to add it soon. If there is anything else you would like me to check before the review, |
|
When resolving the merge conflict, we must remember to update |
|
Thanks @squah-confluent ! |
|
We should remember to update tests/kafkatest/tests/streams/streams_static_membership_test.py to include the new protocol. |
…eams rebalance protocol at Server Side.
5ba30e6 to
f291d80
Compare
|
While rebasing onto trunk, I found that the diff had become too large for me to confidently carry the rebase through correctly. So I decided not to continue with the rebase onto trunk, and instead proceeded as follows.
In the process, the previous commit history was removed. I’m sorry that this also removed the parts you had already taken the time to review. I believe this PR will be ready for review once I complete the following remaining items, and I will ping you again once they are done.
|
|
I added client-side implementation here. (originally, it was implemented in #21603)
Given this, we may need to wait until KIP-1284 is incorporated. |
|
About |
|
@chickenchickenlove yes, we are close to accepting KIP-1284, hopefully it's going to be implemented soon. I'll push some other committer to look at it as well. As for streams_static_membership_test, have a look at streams_smoke_test. It runs for both the old and the new protocol. |
|
@lucasbru |
Introduction
This PR enables "static membership for Streams group heartbeat" and aligns group-epoch behavior with the KIP-1071 intent for Streams rebalancing.
Notification
Changes
memberIdfor sameinstanceId-2(temporary leave): keep static identity and write current-assignment epoch-2-1(actual leave): fence/remove member and bump group epochinstanceIdalready maps to an existing static member, skip max-size rejection for replacement flowScope
Related PR