Skip to content

Commit 959a6d5

Browse files
committed
KAFKA-19661: Streams groups sometimes describe as NOT_READY when STABLE
Streams groups sometimes describe as NOT_READY when STABLE. That is, the group is configured and all topics exist, but when you use LIST_GROUP and STREAMS_GROUP_DESCRIBE, the group will show up as not ready. The root cause seems to be that apache#19802 moved the creation of the soft state configured topology from the replay path to the heartbeat. This way, LIST_GROUP and STREAMS_GROUP_DESCRIBE, which show the snapshot of the last committed offset, may not show the configured topology, because the configured topology that is created in the heartbeat is "thrown away", and the new group is recreated on the replay-path. We can work around this issue by storing a reference to the streams group objected created in the heartbeat immediately in the GroupMetadataManager. This is a snapshottable collection, so if we fail to write the corresponding records to the consumer offset topic, this addition of the streams group will be reverted.
1 parent 4f2114a commit 959a6d5

File tree

3 files changed

+109
-13
lines changed

3 files changed

+109
-13
lines changed

core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import java.lang.{Long => JLong}
2424
import java.time.{Duration => JDuration}
2525
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
2626
import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
27-
import java.util.{Collections, Locale, Optional, Properties}
27+
import java.util.{Collections, Locale, Optional, Properties, UUID}
2828
import java.{time, util}
2929
import kafka.integration.KafkaServerTestHarness
3030
import kafka.server.KafkaConfig
@@ -34,6 +34,7 @@ import org.apache.kafka.clients.HostResolver
3434
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
3535
import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
3636
import org.apache.kafka.clients.admin._
37+
import org.apache.kafka.clients.consumer.internals.{StreamsRebalanceData, StreamsRebalanceListener}
3738
import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerConfig, GroupProtocol, KafkaConsumer, OffsetAndMetadata, ShareConsumer}
3839
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
3940
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
@@ -2317,6 +2318,89 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
23172318
}
23182319
}
23192320

2321+
@Test
2322+
def testDescribeStreamsGroups(): Unit = {
2323+
val streamsGroupId = "stream_group_id"
2324+
val testTopicName = "test_topic"
2325+
val testNumPartitions = 1
2326+
2327+
val config = createConfig
2328+
client = Admin.create(config)
2329+
2330+
prepareTopics(List(testTopicName), testNumPartitions)
2331+
prepareRecords(testTopicName)
2332+
2333+
val streams = createStreamsGroupToDescribe(
2334+
inputTopic = testTopicName,
2335+
streamsGroupId = streamsGroupId
2336+
)
2337+
2338+
try {
2339+
TestUtils.waitUntilTrue(() => {
2340+
val firstGroup = client.listGroups().all().get().stream().findFirst().orElse(null)
2341+
firstGroup.groupState().orElse(null) == GroupState.STABLE && firstGroup.groupId() == streamsGroupId
2342+
}, "Stream group not stable yet")
2343+
2344+
// Verify the describe call works correctly
2345+
val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
2346+
val group = describedGroups.get(streamsGroupId)
2347+
assertNotNull(group)
2348+
assertEquals(streamsGroupId, group.groupId())
2349+
assertFalse(group.members().isEmpty)
2350+
assertNotNull(group.subtopologies())
2351+
assertFalse(group.subtopologies().isEmpty)
2352+
2353+
// Verify the topology contains the expected source and sink topics
2354+
val subtopologies = group.subtopologies().asScala
2355+
assertTrue(subtopologies.exists(subtopology =>
2356+
subtopology.sourceTopics().contains(testTopicName)))
2357+
2358+
// Test describing a non-existing group
2359+
val nonExistingGroup = "non_existing_stream_group"
2360+
val describedNonExistingGroupResponse = client.describeStreamsGroups(util.List.of(nonExistingGroup))
2361+
assertFutureThrows(classOf[GroupIdNotFoundException], describedNonExistingGroupResponse.all())
2362+
2363+
} finally {
2364+
Utils.closeQuietly(streams, "streams")
2365+
Utils.closeQuietly(client, "adminClient")
2366+
}
2367+
}
2368+
2369+
private def createStreamsGroupToDescribe(
2370+
inputTopic: String,
2371+
streamsGroupId: String
2372+
): Consumer[Array[Byte], Array[Byte]] = {
2373+
streamsConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, streamsGroupId)
2374+
streamsConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
2375+
val consumer = createStreamsConsumer(streamsRebalanceData = new StreamsRebalanceData(
2376+
UUID.randomUUID(),
2377+
Optional.empty(),
2378+
util.Map.of(
2379+
"subtopology-0", new StreamsRebalanceData.Subtopology(
2380+
util.Set.of(inputTopic),
2381+
util.Set.of(),
2382+
util.Map.of(),
2383+
util.Map.of(),
2384+
util.Set.of()
2385+
)),
2386+
Map.empty[String, String].asJava
2387+
))
2388+
consumer.subscribe(
2389+
util.Set.of(inputTopic),
2390+
new StreamsRebalanceListener {
2391+
override def onTasksRevoked(tasks: util.Set[StreamsRebalanceData.TaskId]): Optional[Exception] =
2392+
Optional.empty()
2393+
2394+
override def onTasksAssigned(assignment: StreamsRebalanceData.Assignment): Optional[Exception] =
2395+
Optional.empty()
2396+
2397+
override def onAllTasksLost(): Optional[Exception] =
2398+
Optional.empty()
2399+
}
2400+
)
2401+
consumer.poll(JDuration.ofMillis(500L))
2402+
consumer
2403+
}
23202404

23212405
/**
23222406
* Test the consumer group APIs for member removal.

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -850,11 +850,16 @@ StreamsGroup getOrCreateStreamsGroup(
850850
) {
851851
Group group = groups.get(groupId);
852852

853+
// Streams groups are inserted immediately into the `groups` map to allow soft-state
853854
if (group == null) {
854-
return new StreamsGroup(logContext, snapshotRegistry, groupId, metrics);
855+
StreamsGroup newGroup = new StreamsGroup(logContext, snapshotRegistry, groupId, metrics);
856+
groups.put(groupId, newGroup);
857+
return newGroup;
855858
} else if (maybeDeleteEmptyClassicGroup(group, records)) {
859+
StreamsGroup newGroup = new StreamsGroup(logContext, snapshotRegistry, groupId, metrics);
860+
groups.put(groupId, newGroup);
856861
log.info("[GroupId {}] Converted the empty classic group to a streams group.", groupId);
857-
return new StreamsGroup(logContext, snapshotRegistry, groupId, metrics);
862+
return newGroup;
858863
} else {
859864
return castToStreamsGroup(group);
860865
}
@@ -1029,8 +1034,8 @@ private StreamsGroup getOrMaybeCreatePersistedStreamsGroup(
10291034
} else if (group.type() == STREAMS) {
10301035
return (StreamsGroup) group;
10311036
} else {
1032-
// We don't support upgrading/downgrading between protocols at the moment, so
1033-
// we throw an exception if a group exists with the wrong type.
1037+
// Conversion between groups is handled explicitly in the heartbeat implementation.
1038+
// We should not convert by reading a record, so this is an illegal state.
10341039
throw new IllegalStateException(String.format("Group %s is not a streams group.", groupId));
10351040
}
10361041
}
@@ -6005,7 +6010,13 @@ public void replay(
60056010

60066011
if (value == null) {
60076012
// Tombstone. Group should be removed.
6008-
removeGroup(groupId);
6013+
// In case of streams groups, which get inserted into memory immediately to store soft state,
6014+
// It may happen that the groups map contains the new streams groups already, and the classic group
6015+
// was removed already. In this case, we can ignore the tombstone.
6016+
Group group = groups.get(groupId);
6017+
if (group instanceof ClassicGroup) {
6018+
removeGroup(groupId);
6019+
}
60096020
} else {
60106021
List<ClassicGroupMember> loadedMembers = new ArrayList<>();
60116022
for (GroupMetadataValue.MemberMetadata member : value.members()) {

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16378,6 +16378,7 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage)
1637816378
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
1637916379
);
1638016380

16381+
assertEquals(StreamsGroupState.NOT_READY, context.streamsGroupState(groupId));
1638116382
assertRecordsEquals(expectedRecords, result.records());
1638216383
}
1638316384

@@ -16461,6 +16462,7 @@ barTopicName, computeTopicHash(barTopicName, metadataImage)
1646116462
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
1646216463
);
1646316464

16465+
assertEquals(StreamsGroupState.NOT_READY, context.streamsGroupState(groupId));
1646416466
assertRecordsEquals(expectedRecords, result.records());
1646516467
}
1646616468

@@ -17194,8 +17196,7 @@ public void testStreamsGroupHeartbeatPartialResponseWhenNothingChanges() {
1719417196
new StreamsGroupHeartbeatResponseData()
1719517197
.setMemberId(memberId)
1719617198
.setMemberEpoch(1)
17197-
.setHeartbeatIntervalMs(5000)
17198-
.setEndpointInformationEpoch(-1),
17199+
.setHeartbeatIntervalMs(5000),
1719917200
result.response().data()
1720017201
);
1720117202
}
@@ -18229,7 +18230,7 @@ public void testStreamsRebalanceTimeoutLifecycle() {
1822918230
.setActiveTasks(List.of())
1823018231
.setStandbyTasks(List.of())
1823118232
.setWarmupTasks(List.of())
18232-
.setEndpointInformationEpoch(-1),
18233+
.setEndpointInformationEpoch(0),
1823318234
result.response().data()
1823418235
);
1823518236

@@ -18258,7 +18259,7 @@ public void testStreamsRebalanceTimeoutLifecycle() {
1825818259
.setPartitions(List.of(0, 1))))
1825918260
.setStandbyTasks(List.of())
1826018261
.setWarmupTasks(List.of())
18261-
.setEndpointInformationEpoch(0),
18262+
.setEndpointInformationEpoch(1),
1826218263
result.response().data()
1826318264
);
1826418265

@@ -18289,7 +18290,7 @@ public void testStreamsRebalanceTimeoutLifecycle() {
1828918290
.setMemberId(memberId1)
1829018291
.setMemberEpoch(2)
1829118292
.setHeartbeatIntervalMs(5000)
18292-
.setEndpointInformationEpoch(0),
18293+
.setEndpointInformationEpoch(1),
1829318294
result.response().data()
1829418295
);
1829518296

@@ -18387,7 +18388,7 @@ public void testStreamsRebalanceTimeoutExpiration() {
1838718388
.setActiveTasks(List.of())
1838818389
.setStandbyTasks(List.of())
1838918390
.setWarmupTasks(List.of())
18390-
.setEndpointInformationEpoch(-1),
18391+
.setEndpointInformationEpoch(0),
1839118392
result.response().data()
1839218393
);
1839318394

@@ -18415,7 +18416,7 @@ public void testStreamsRebalanceTimeoutExpiration() {
1841518416
.setPartitions(List.of(0, 1))))
1841618417
.setStandbyTasks(List.of())
1841718418
.setWarmupTasks(List.of())
18418-
.setEndpointInformationEpoch(0),
18419+
.setEndpointInformationEpoch(1),
1841918420
result.response().data()
1842018421
);
1842118422

0 commit comments

Comments
 (0)