Skip to content

Commit

Permalink
KAFKA-17132 Revisit testMissingOffsetNoResetPolicy for AsyncConsumer (#…
Browse files Browse the repository at this point in the history
…16587)

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
TaiJuWu authored Jul 19, 2024
1 parent bc031c0 commit f09ead1
Showing 1 changed file with 18 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -914,11 +914,9 @@ private void initMetadata(MockClient mockClient, Map<String, Integer> partitionC
mockClient.updateMetadata(initialMetadata);
}

// TODO: this test triggers a bug with the CONSUMER group protocol implementation.
// The bug will be investigated and fixed so this test can use both group protocols.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testMissingOffsetNoResetPolicy(GroupProtocol groupProtocol) {
@EnumSource(value = GroupProtocol.class)
public void testMissingOffsetNoResetPolicy(GroupProtocol groupProtocol) throws InterruptedException {
SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
Expand All @@ -935,7 +933,22 @@ public void testMissingOffsetNoResetPolicy(GroupProtocol groupProtocol) {

// lookup committed offset and find nothing
client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, -1L), Errors.NONE), coordinator);
assertThrows(NoOffsetForPartitionException.class, () -> consumer.poll(Duration.ZERO));

if (groupProtocol == GroupProtocol.CONSUMER) {
// New consumer poll(ZERO) needs to wait for the offset fetch event added by a call to poll, to be processed
// by the background thread, so it can realize there are no committed offsets and then
// throw the NoOffsetForPartitionException
TestUtils.waitForCondition(() -> {
try {
consumer.poll(Duration.ZERO);
return false;
} catch (NoOffsetForPartitionException e) {
return true;
}
}, "Consumer was not able to update fetch positions on continuous calls with 0 timeout");
} else {
assertThrows(NoOffsetForPartitionException.class, () -> consumer.poll(Duration.ZERO));
}
}

@ParameterizedTest
Expand Down

0 comments on commit f09ead1

Please sign in to comment.