diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java index 3c0d53bc9da3f..30d28fb722f3d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java @@ -443,7 +443,7 @@ public void transitionToFatal() { log.error("Member {} with epoch {} transitioned to fatal state", memberId, memberEpoch); notifyEpochChange(Optional.empty()); - if (previousState == MemberState.UNSUBSCRIBED) { + if (previousState == MemberState.UNSUBSCRIBED && maybeCompleteLeaveInProgress()) { log.debug("Member {} with epoch {} got fatal error from the broker but it already " + "left the group, so onPartitionsLost callback won't be triggered.", memberId, memberEpoch); return; diff --git a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala index 8c8d05c607297..01d18114a0480 100644 --- a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala @@ -34,7 +34,7 @@ import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST import org.apache.kafka.server.config.ServerConfigs import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.function.Executable -import org.junit.jupiter.api.{BeforeEach, TestInfo} +import org.junit.jupiter.api.{BeforeEach, TestInfo, Timeout} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource @@ -135,6 +135,7 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) @MethodSource(Array("getTestGroupProtocolParametersAll")) + @Timeout(60) def testConsumeUnsubscribeWithoutGroupPermission(groupProtocol: String): Unit = { val topic = "topic"