Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
Signed-off-by: PoAn Yang <[email protected]>
  • Loading branch information
FrankYang0529 committed Sep 22, 2024
1 parent 0c1ff17 commit 1cebf23
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ public void transitionToJoining() {
* to leave the group has been sent out.
*/
public CompletableFuture<Void> leaveGroup() {
if (isNotInGroup()) {
if (isNotInGroup() || state == MemberState.JOINING) {
if (state == MemberState.FENCED) {
clearAssignment();
transitionTo(MemberState.UNSUBSCRIBED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ private void process(final SubscriptionChangeEvent ignored) {
*/
private void process(final UnsubscribeEvent event) {
if (requestManagers.consumerHeartbeatRequestManager.isPresent()) {
System.out.println("UnsubscribeEvent: " + event);
CompletableFuture<Void> future = requestManagers.consumerHeartbeatRequestManager.get().membershipManager().leaveGroup();
System.out.println("UnsubscribeEvent: " + future.isCompletedExceptionally());
future.whenComplete(complete(event.future()));
} else {
// If the consumer is not using the group management capabilities, we still need to clear all assignments it may have.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ class PlaintextConsumerSubscriptionTest extends AbstractConsumerTest {
}, waitTimeMs = 5000, msg = "An InvalidTopicException should be thrown.")

assertEquals(s"Invalid topics: [${invalidTopicName}]", exception.getMessage)
Thread.sleep(1000)
}
}

Expand Down

0 comments on commit 1cebf23

Please sign in to comment.