Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(consumer): incremental cooperative balance strategy (KIP-429) #2608

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

napallday
Copy link
Contributor

@napallday napallday commented Aug 20, 2023

Incremental Cooperative Protocol POC Proposal

I'm excited to present this Proof of Concept (POC) version that supports the incremental cooperative protocol as outlined in KIP-429. The proposed feature addresses a long-standing community demand (#1858). I've conducted several manual tests, and the results have been promising. (Can run examples for a try)

Major Changes

The existing design in Sarama tightly couples with the EAGER protocol, particularly evident in ConsumerGroupSession. In order to accommodate the new COOPERATIVE rebalance protocol, I've introduced the ConsumerGroupHandlerV2 interface and a new method called ConsumeV2.

Seeking Community Guidance

I'm eager to align this work with the community's future direction. Two possible paths are under consideration:

  1. Major Version Update Plan: If Sarama intends to increment the major version while introducing support for the COOPERATIVE rebalance protocol, it would allow us to streamline our implementation. We could eliminate ConsumerGroupSession-related coded from versions v2.x.x, focusing solely on the new approach.

  2. Maintaining Compatibility: Alternatively, if a major version update isn't planned, I'll take steps to enhance future maintainability. This involves refactoring common logic within the Consume and ConsumerV2 methods, paving the way for smoother maintenance.

Upcoming Work

In the pipeline, I plan to incorporate the following enhancements:

  • Integration of metrics for the new protocol.
  • Implementation of unit tests, functional tests, and benchmark tests to ensure reliability.
  • Validation of Pause and Resume functionality within the new protocol.
  • Verification for customized cooperative balance strategy implementation.

I'm looking forward to the community's insights and feedback as we work towards a more efficient and feature-rich Sarama.

@napallday napallday force-pushed the napallday/incremental-cooperative-balance-strategy branch 5 times, most recently from b7a87f1 to 1c8e617 Compare August 20, 2023 15:32
@prestona
Copy link
Member

prestona commented Aug 21, 2023

@napallday - I'm exited to see that cooperative incremental re-balancing is making its way into Sarama. I think we have apps that could benefit from it. I've had a quick play with the consumergroup_cooperative example and here's what I found:

  1. Multiple co-operative Sarama consumers in the same group work as expected. I tried starting / stopping individual consumers and each time other consumers in the group were revoked/assigned partitions appropriately without a complete re-assignment of all of the topic's partitions. Sometimes Setup(...) is invoked with an empty map of newly assigned partitions, I think corresponding to re-balances where the consumer's set of partitions is unchanged. This appears to be consistent with how the Java client behaves, so I'm not suggesting changing this behavior.
  2. Upgrading Sarama consumers from sticky -> co-operative sticky via rolling restarts worked as expected. E.g. all consumers started out "sticky", then were upgraded to "sticky, sticky-cooperative" , then finally upgraded to "sticky-cooperative" only.
  3. Co-existence with a Java partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor consumer in the same group has mixed results. If a Java consumer was leading the group and then a Sarama consumer joins - this works as expected. If a Sarama consumer was leading the group and a Java consumer joins then the Sarama leader panics:
     2023/08/21 20:49:59 Error from consumer: kafka: insufficient data to decode packet, more bytes expected
    panic: Error from consumer: kafka: insufficient data to decode packet, more bytes expected
    
     goroutine 11 [running]:
     log.Panicf({0x100e23868?, 0x100f5b3d8?}, {0x140000affa0?, 0x14000198af0?, 0x1?})
         /usr/local/go/src/log/log.go:395 +0x68
    main.main.func1()
        /tmp/sarama/examples/consumergroup_cooperative/main.go:119 +0x114
    created by main.main
        /tmp/sarama/examples/consumergroup_cooperative/main.go:110 +0x440
    
    I can grab more debug, if it would be helpful. (This was using Kafka 3.3.1, a Kafka 3.1.2 client, and the consumergroups_cooperative example).

Regarding your Seeking Community Guidance question. IMO it would be great to add cooperative re-balancing support as soon as practical - without necessarily waiting for Sarama v2. For example, Sarama v1 could be extended with the ConsumerV2 method, which would then be re-factored into a more elegant single consumer approach in Sarama v2.

@dnwe dnwe force-pushed the napallday/incremental-cooperative-balance-strategy branch from 1c8e617 to 03e4160 Compare August 21, 2023 20:24
@dnwe dnwe added the feat label Aug 21, 2023
@dnwe
Copy link
Collaborator

dnwe commented Aug 21, 2023

@napallday thanks for putting together this PR!

RE: 3. mentioned by @prestona it looks like a bug in our decoding of the MemberMetadata in the JoinGroups response where we return early if OwnedPartitions is an empty array, so we haven't consumed the whole buffer. I'll fix that up in a separate PR

I think you also need to cope with UserData being empty in the balance strategy to avoid a decoding failure:

diff --git a/balance_strategy.go b/balance_strategy.go
index cc7a0a3..eab26b2 100644
--- a/balance_strategy.go
+++ b/balance_strategy.go
@@ -967,6 +967,9 @@ func prepopulateCurrentAssignments(members map[string]ConsumerGroupMemberMetadat
 	// for each partition we create a sorted map of its consumers by generation
 	sortedPartitionConsumersByGeneration := make(map[topicPartitionAssignment]map[int]string)
 	for memberID, meta := range members {
+		if len(meta.UserData) == 0 {
+			continue
+		}
 		consumerUserData, err := deserializeTopicPartitionAssignment(meta.UserData)
 		if err != nil {
 			return nil, nil, err

I'll add this change as well after rebasing your PR on #2618

@dnwe dnwe force-pushed the napallday/incremental-cooperative-balance-strategy branch from 03e4160 to ada6a47 Compare August 21, 2023 22:03
@dnwe
Copy link
Collaborator

dnwe commented Aug 21, 2023

That seemed to work well for me. I ran sarama, rdkafka and (Java) kafka-console-consumer in the same cooperative balance group and ran some chaos testing restarting each of them randomly and each member was able to join/rejoin/lead without causing issues for the others

In the middle of lots of runs I did trigger one protocol decode failure in deserializeTopicPartitionAssignment where it seemed like UserData for one member was 4 bytes (0xff,0xff,0xff,0xff) but it's not clear what caused that. We may just wish to guard the deserializeTopicPartitionAssignment call by ignoring "InsufficientData" err and continuing in the loop

@napallday napallday changed the title feat[POC]: incremental cooperative balance strategy feat[POC]: incremental cooperative rebalance protocol Aug 22, 2023
@napallday
Copy link
Contributor Author

napallday commented Aug 22, 2023

Big thanks to @prestona and @dnwe for the thorough testing and fix! 😄

In the middle of lots of runs I did trigger one protocol decode failure in deserializeTopicPartitionAssignment where it seemed like UserData for one member was 4 bytes (0xff,0xff,0xff,0xff) but it's not clear what caused that. We may just wish to guard the deserializeTopicPartitionAssignment call by ignoring "InsufficientData" err and continuing in the loop

@dnwe Regarding this issue, could you help provide information about the combination of Kafka clients and the version of Kafka server used during testing? I personally conducted tests using Sarama, without direct interaction with rdkafka or Java clients. So far, I have not come across this particular issue.

@dnwe
Copy link
Collaborator

dnwe commented Aug 22, 2023

@napallday no worries, so I set KAFKA_VERSION=3.3.2 and started up docker-compose as used in the FV with the toxiproxy routes configured and created a six partition topic called cooperative-topic. I setup a sarama producer to continuously write some records to the topic.

Then in three different terminals I ran:

# sarama
./consumergroup_cooperative -version 3.3.2 -brokers 127.0.0.1:29091 -group sarama-coop -topics cooperative-topic

# librdkafka
./kcat -b 127.0.0.1:29091 -G sarama-coop -X partition.assignment.strategy=cooperative-sticky cooperative-topic

# java
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29091 --topic cooperative-topic --group sarama-coop --consumer-property partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

And stopped and started each of them at various points so they all had a turn leading and joining the group

@napallday
Copy link
Contributor Author

napallday commented Aug 22, 2023

The panic will be reproduced following these steps:

Step 1. Run the Sarama consumer (Leader).
Step 2. Run the Java consumer.

This is due to the following improvements of cooperative-sticky strategy implementations within Java. Specifically, in userData, the first 4 bytes represent the generationID (where 4 bytes of 0xff actually translate to -1). This can be observed in the code changes here.

I will address this issue at a later time.

@napallday
Copy link
Contributor Author

Hi @prestona @dnwe,

FYI, I've submitted a commit to fix the issue mentioned above. 🛠️

After extensive testing with different Kafka clients, including librdkafka (v3.3.2), java client (v3.5.0 & v3.3.2), and sarama (v3.3.2), I believe the panic issues have been successfully resolved.

However, during testing with java client (v2.8.1), I observed an occurrence of java.lang.IllegalStateException in this client when the cooperative assignment was being validated by it. It's worth noting that this behavior appears to be specific to the older versions of the Java client. Since this java client is the leader who did the assignments, I believe that the underlying cause might be rooted in these outdated Java client versions rather than sarama.

@dnwe
Copy link
Collaborator

dnwe commented Aug 23, 2023

Interesting. It took me a few attempts, but yes I also managed to trigger that IllegalStateException with a Java 2.8.1 consumer:

[2023-08-23 21:03:16,182] ERROR [Consumer clientId=consumer-sarama-coop-1, groupId=sarama-coop] With the COOPERATIVE protocol, owned partitions cannot be reassigned to other members; however the assignor has reassigned partitions [cooperative-topic-2] which are still owned by some members (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2023-08-23 21:03:16,189] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
java.lang.IllegalStateException: Assignor supporting the COOPERATIVE protocol violates its requirements
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.validateCooperativeAssignment(ConsumerCoordinator.java:668)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:592)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:693)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:599)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:562)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1182)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1157)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
        at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:443)
        at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:102)
        at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:76)
        at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
        at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

@dnwe
Copy link
Collaborator

dnwe commented Aug 23, 2023

@napallday OK I looked over apache/kafka and I see that they skipped this validation code in the kafka-client under KAFKA-13406 and PR #11439 due to issues with the verification and that was the Java client behaviour from 2.8.2 onward

@napallday
Copy link
Contributor Author

Cool! 🎉 Huge thanks to @dnwe for the invetigation.

The explanations provided in the Jira ticket are crystal clear.

By the way, just like this fix in apache/kafka, the official Java clients have some improvements in sticky and cooperative-sticky assignors since the initial introduction of the incremental cooperative balance protocol. In the future, we can integrate these enhancements gradually.

@dnwe dnwe force-pushed the napallday/incremental-cooperative-balance-strategy branch from fe9fdd5 to 4c83bd0 Compare September 12, 2023 13:00
@dnwe dnwe changed the title feat[POC]: incremental cooperative rebalance protocol feat(consumer): incremental cooperative balance strategy (KIP-429) Sep 12, 2023
@dnwe
Copy link
Collaborator

dnwe commented Oct 2, 2023

@napallday were you planning to do the follow up work around metrics, pause/resume and additional FV tests under this PR?

I suppose the alternative would be to merge this as-is, but report the feature as experimental in the changelog so people can test it out in their staging environments

@napallday napallday force-pushed the napallday/incremental-cooperative-balance-strategy branch from 070a073 to 4c83bd0 Compare October 9, 2023 09:15
@napallday napallday force-pushed the napallday/incremental-cooperative-balance-strategy branch from ade333e to bf40615 Compare October 9, 2023 10:11
@napallday
Copy link
Contributor Author

@dnwe Thanks for the reminder.

For the pause/resume feature, I've verified that the behaviour is consistent with Java Kafka.

  • For EAGLE protocol: all paused status will be dropped when rebalance
  • For COOPERATIVE protocol: paused status of revoked partitions will be dropped when rebalance

Regarding the metrics and FV tests, I may add it at a later time due to some personal matters. Therefore, I concur with releasing it as an experimental feature initially.

Copy link

Thank you for your contribution! However, this pull request has not had any activity in the past 90 days and will be closed in 30 days if no updates occur.
If you believe the changes are still valid then please verify your branch has no conflicts with main and rebase if needed. If you are awaiting a (re-)review then please let us know.

@github-actions github-actions bot added the stale Issues and pull requests without any recent activity label Jan 25, 2024
@wwwjfy
Copy link

wwwjfy commented Jan 26, 2024

Hi @napallday @dnwe, it seems there is no major concern about this PR. Do you think this can be merged soon?

It is indeed a useful feature I'd like to try out. But it's better to use main branch (or better a release) than a pull request branch.

@dnwe
Copy link
Collaborator

dnwe commented Jan 26, 2024

@wwwjfy 👋🏻 thanks for getting in touch

The issue with merging to main at the moment is that the PR branch introduces new interfaces of ConsumeV2 and ConsumerGroupHandlerV2, which we'd then be locked into supporting and ideally we want to be exposing cooperative balance strategy as just a config change for users, rather than requiring them to rewrite their app code to make use of it.

Similarly, if we merge to main without more FV coverage, it might not be obvious to users (unless they read notes on GitHub Releases) that it is considered experimental.

Would it help your testing if I pull the commits into a branch on the main repository rather than them being only on a fork?
i.e.,., if I make a cooperative-balance-strategy branch with these commits then you can go get github.com/IBM/sarama@cooperative-balance-strategy, start calling the new interfaces and experiment with the code and provide feedback?

@dnwe dnwe added stale/exempt Issues and pull requests that should never be closed as stale and removed stale Issues and pull requests without any recent activity labels Jan 26, 2024
@lifepuzzlefun
Copy link

I hope the current ConsumerGroupHandler can still use the cooperative balance stragtegy. If user want to receive the revoke and assign partition, they can provide their own OnAssignPartition and OnRevokePartition method. and the default handler will be provide if user use origin ConsumerGroupHandler without change current code.

@wwwjfy
Copy link

wwwjfy commented Jan 26, 2024

Thanks for the quick response :)

It does make sense, if possible, we maintain a single interface than two. (Sorry I didn't read the change and the comments in this PR)

I'm not sure what's the way forward. Using single interfaces will probably break backward compatibility I guess, which means we'll wait until a major version upgrade as well as the consolidation work before that.

For testing, to me alone, I have my forked version anyway, though for others, what you mentioned may be useful :D

Still evaluating using sarama or confluent sdk which already has this feature. will have feedback if I get anything useful

@dnwe
Copy link
Collaborator

dnwe commented Jan 26, 2024

Yes I think the ideal scenario would be if we can rework this PR to keep the Sarama consumer interface consistent, but expose an opt-in similarly to the Java client via an equivalent to the partition.assignment.strategy config variable

@napallday
Copy link
Contributor Author

I feel it's hard to keep using the ConsumerGroupHandler interface since it's highly coupled with eager rebalance protocol - e.g. ConsumerGroupSession as the function parameter, is generated in every new generation.

As the new implementation ConsumerGroupHandlerV2 also works for the previous one ConsumerGroupHandler, may I know if it's possible to have another major version of Sarama in the future to include this PR?

I can help add fv tests later but it seems to be a huge work...

@aiquestion
Copy link
Contributor

if we want to keep ConsumerGroupHandler, i think the possible way is to :

  • call Setup(ConsumerGroupSession) / Cleanup(ConsumerGroupSession) when new assignment receives / a new rebalance tirggered.
  • change ConsumerGroupSession's Claims/MemberId/GenerationId functions to be thread safe.

so if user don't care about assignment change in their previous code(or they only do some log print), only enable cooperative in config will be enough.
But user will still need to be aware of the API behaviour change when enable cooperative rebalance.

diff := make(map[string][]int32)
for topic, partitions := range map1 {
for _, partition := range partitions {
if _, exist := set[topic][partition]; !exist {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be simplified to if !set[topic][partition]

@@ -19,6 +19,8 @@ const (
// StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy
StickyBalanceStrategyName = "sticky"

CooperativeStickyBalanceStrategyName = "cooperative-sticky"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a comment for exported variable

om.pomsLock.RLock()
pom := om.poms[topic][partition]
om.pomsLock.RUnlock()
err := pom.Close()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I met panic during local test.

panic: send on closed channel

goroutine 67 [running]:
github.com/IBM/sarama.(*partitionOffsetManager).handleError(0x1400013a300, {0x104c21ac8, 0x104fb5e30})
    /Users/wwwjfy/Develop/streamsdk/tmp/vendor/github.com/IBM/sarama/offset_manager.go:748 +0xac
github.com/IBM/sarama.(*offsetManager).handleResponse(0x140004961a0, 0x14000494008, 0x140003414a0, 0x1400039bde0)
    /Users/wwwjfy/Develop/streamsdk/tmp/vendor/github.com/IBM/sarama/offset_manager.go:491 +0x338
github.com/IBM/sarama.(*offsetManager).flushToBroker(0x140004961a0)
    /Users/wwwjfy/Develop/streamsdk/tmp/vendor/github.com/IBM/sarama/offset_manager.go:381 +0x80
github.com/IBM/sarama.(*offsetManager).Commit(0x140004961a0)
    /Users/wwwjfy/Develop/streamsdk/tmp/vendor/github.com/IBM/sarama/offset_manager.go:357 +0x20
github.com/IBM/sarama.(*offsetManager).mainLoop(0x140004961a0)
    /Users/wwwjfy/Develop/streamsdk/tmp/vendor/github.com/IBM/sarama/offset_manager.go:341 +0x88
github.com/IBM/sarama.withRecover(0x0?)
    /Users/wwwjfy/Develop/streamsdk/tmp/vendor/github.com/IBM/sarama/utils.go:43 +0x40
created by github.com/IBM/sarama.newOffsetManagerFromClient in goroutine 23
    /Users/wwwjfy/Develop/streamsdk/tmp/vendor/github.com/IBM/sarama/offset_manager.go:113 +0x230
exit status 2

It happened during a partition is revoked from this consumer.

I tried to track how this happens, and I think this is the case:

goroutine 1:
1.1. consumer is informed a partition is revoked
1.2. it comes here to do cleanup, and this line leads to errors channel closure at line 755
1.3. later this pom is removed from om.poms at line 180

goroutine 2:

2.1 in the mainLoop, om.Commit is triggered every second by default
2.2 it constructs requests based on om.poms (line 440)
2.3 it gets response, and if it's an error ErrIllegalGeneration, send to pom.errors (line 490)

In a race condition, 2.3 happens between 1.2 and 1.3, a panic will happen.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

propose change:

iff --git offset_manager.go offset_manager.go
index 6c40a88..4cfe3f5 100644
--- offset_manager.go
+++ offset_manager.go
@@ -150,10 +150,15 @@ func (om *offsetManager) RemovePartitions(topicPartitions map[string][]int32) er
                        go func(topic string, partition int32) {
                                defer wg.Done()
 
-                               om.pomsLock.RLock()
+                               om.pomsLock.Lock()
                                pom := om.poms[topic][partition]
-                               om.pomsLock.RUnlock()
                                err := pom.Close()
+                               delete(om.poms[topic], partition)
+                               if len(om.poms[topic]) == 0 {
+                                       delete(om.poms, topic)
+                               }
+
+                               om.pomsLock.Unlock()
                                if err != nil {
                                        errsLock.Lock()
                                        var consumerErrs ConsumerErrors
@@ -174,17 +179,6 @@ func (om *offsetManager) RemovePartitions(topicPartitions map[string][]int32) er
                }
        }
 
-       om.pomsLock.Lock()
-       for topic, partitions := range topicPartitions {
-               for _, partition := range partitions {
-                       delete(om.poms[topic], partition)
-                       if len(om.poms[topic]) == 0 {
-                               delete(om.poms, topic)
-                       }
-               }
-       }
-
-       om.pomsLock.Unlock()
        if len(errs) > 0 {
                return errs
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for troubleshooting. I think this propose change works, and I also move the logic of last offset flush ahead.

// response and send another join request with that id to actually join the
// group
c.memberID = join.MemberId
return c.retryJoinGroup(ctx, topics, retries+1 /*keep retry time*/, join.Err)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the accepted pull request #2759, should this be return c.joinGroup(ctx, topics, retries)?

@dberardo-com
Copy link

any ETA for this feature ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feat stale/exempt Issues and pull requests that should never be closed as stale
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants