Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Don't Merge] Test #16463

Open
wants to merge 8 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
Expand Down Expand Up @@ -239,6 +240,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
private Optional<ClientTelemetryReporter> clientTelemetryReporter = Optional.empty();

// to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates
// and record there is a resetOffsetEvent in background thread.
private boolean cachedSubscriptionHasAllFetchPositions;
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
Expand Down Expand Up @@ -828,8 +830,10 @@ public void seekToBeginning(Collection<TopicPartition> partitions) {

acquireAndEnsureOpen();
try {
Collection<TopicPartition> parts = partitions.isEmpty() ? subscriptions.assignedPartitions() : partitions;
subscriptions.requestOffsetReset(parts, OffsetResetStrategy.EARLIEST);
Timer timer = time.timer(defaultApiTimeoutMs);
ResetOffsetEvent resetOffsetEvent = new ResetOffsetEvent(partitions, OffsetResetStrategy.EARLIEST,
calculateDeadlineMs(timer));
cachedSubscriptionHasAllFetchPositions = applicationEventHandler.addAndGet(resetOffsetEvent);
} finally {
release();
}
Expand All @@ -842,8 +846,10 @@ public void seekToEnd(Collection<TopicPartition> partitions) {

acquireAndEnsureOpen();
try {
Collection<TopicPartition> parts = partitions.isEmpty() ? subscriptions.assignedPartitions() : partitions;
subscriptions.requestOffsetReset(parts, OffsetResetStrategy.LATEST);
Timer timer = time.timer(defaultApiTimeoutMs);
ResetOffsetEvent resetOffsetEvent = new ResetOffsetEvent(partitions, OffsetResetStrategy.LATEST,
calculateDeadlineMs(timer));
cachedSubscriptionHasAllFetchPositions = applicationEventHandler.addAndGet(resetOffsetEvent);
} finally {
release();
}
Expand All @@ -864,8 +870,9 @@ public long position(TopicPartition partition, Duration timeout) {
Timer timer = time.timer(timeout);
do {
SubscriptionState.FetchPosition position = subscriptions.validPosition(partition);
if (position != null)
if (position != null) {
return position.offset;
}

updateFetchPositions(timer);
timer.update();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public abstract class ApplicationEvent {

public enum Type {
COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, TOPIC_METADATA, ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE,
LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, RESET_OFFSET, TOPIC_METADATA, ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE,
UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
COMMIT_ON_CLOSE,
SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.slf4j.Logger;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -99,6 +100,10 @@ public void process(ApplicationEvent event) {
process((ListOffsetsEvent) event);
return;

case RESET_OFFSET:
process((ResetOffsetEvent) event);
return;

case CHECK_AND_UPDATE_POSITIONS:
process((CheckAndUpdatePositionsEvent) event);
return;
Expand Down Expand Up @@ -255,6 +260,14 @@ private void process(final UnsubscribeEvent event) {
}
}

private void process(final ResetOffsetEvent event) {
Collection<TopicPartition> parts = event.topicPartitions().isEmpty() ?
subscriptions.assignedPartitions() : event.topicPartitions();
subscriptions.requestOffsetReset(parts, event.offsetResetStrategy());
CompletableFuture<Boolean> future = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
future.whenComplete(complete(event.future()));
}

/**
* Check if all assigned partitions have fetch positions. If there are missing positions, fetch offsets and use
* them to update positions in the subscription state.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.Collection;
import java.util.Collections;
import java.util.Objects;

/**
* Event to perform {@link AsyncKafkaConsumer#seekToBeginning(Collection)} and {@link AsyncKafkaConsumer#seekToEnd(Collection)}
* in the background thread. This can avoid race conditions when subscription state is updated.
*/
public class ResetOffsetEvent extends CompletableApplicationEvent<Boolean> {

private final Collection<TopicPartition> topicPartitions;

private final OffsetResetStrategy offsetResetStrategy;

public ResetOffsetEvent(Collection<TopicPartition> topicPartitions, OffsetResetStrategy offsetResetStrategy, long deadline) {
super(Type.RESET_OFFSET, deadline);
this.topicPartitions = Collections.unmodifiableCollection(topicPartitions);
this.offsetResetStrategy = Objects.requireNonNull(offsetResetStrategy);
}

public Collection<TopicPartition> topicPartitions() {
return topicPartitions;
}

public OffsetResetStrategy offsetResetStrategy() {
return offsetResetStrategy;
}

@Override
public String toStringBase() {
return super.toStringBase() + ", topicPartitions=" + topicPartitions + ", offsetStrategy=" + offsetResetStrategy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
Expand Down Expand Up @@ -74,6 +75,7 @@
import org.apache.kafka.test.MockConsumerInterceptor;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -1885,6 +1887,68 @@ public void testUnsubscribeWithoutGroupId() {
verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
}

@Test
public void testSeekToBeginning() {
SubscriptionState subscriptions = mock(SubscriptionState.class);
Collection<TopicPartition> topics = Collections.singleton(new TopicPartition("test", 0));
consumer = newConsumer(
mock(FetchBuffer.class),
new ConsumerInterceptors<>(Collections.emptyList()),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id");
completeResetOffsetEventSuccessfully();
consumer.seekToBeginning(topics);
verify(subscriptions).requestOffsetReset(topics, OffsetResetStrategy.EARLIEST);
}

@Test
public void testSeekToBeginningWithException() {
SubscriptionState subscriptions = mock(SubscriptionState.class);
Collection<TopicPartition> topics = Collections.singleton(new TopicPartition("test", 0));
consumer = newConsumer(
mock(FetchBuffer.class),
new ConsumerInterceptors<>(Collections.emptyList()),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id");
completeResetOffsetEventExceptionally(new TimeoutException());
Assertions.assertThrows(TimeoutException.class, () -> consumer.seekToBeginning(topics));
}

@Test
public void testSeekToEndWithException() {
SubscriptionState subscriptions = mock(SubscriptionState.class);
Collection<TopicPartition> topics = Collections.singleton(new TopicPartition("test", 0));
consumer = newConsumer(
mock(FetchBuffer.class),
new ConsumerInterceptors<>(Collections.emptyList()),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id");
completeResetOffsetEventSuccessfully();
Assertions.assertThrows(TimeoutException.class, () -> consumer.seekToEnd(topics));
}

@Test
public void testSeekToEnd() {
SubscriptionState subscriptions = mock(SubscriptionState.class);
Collection<TopicPartition> topics = Collections.singleton(new TopicPartition("test", 0));
consumer = newConsumer(
mock(FetchBuffer.class),
new ConsumerInterceptors<>(Collections.emptyList()),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id");
completeResetOffsetEventExceptionally(new TimeoutException());
consumer.seekToEnd(topics);
verify(subscriptions).requestOffsetReset(topics, OffsetResetStrategy.LATEST);
}

private void verifyUnsubscribeEvent(SubscriptionState subscriptions) {
// Check that an unsubscribe event was generated, and that the consumer waited for it to
// complete processing background events.
Expand Down Expand Up @@ -1940,6 +2004,20 @@ private void completeCommitSyncApplicationEventExceptionally(Exception ex) {
}).when(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitEvent.class));
}

private void completeResetOffsetEventSuccessfully() {
doAnswer(invocation -> {
ResetOffsetEvent event = invocation.getArgument(0);
// Collections.unmodifiedCollection does not support hashcode compare so we need to create a new one
Collection<TopicPartition> partitions = Collections.singleton(event.topicPartitions().iterator().next());
consumer.subscriptions().requestOffsetReset(partitions, event.offsetResetStrategy());
return true;
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ResetOffsetEvent.class));
}

private void completeResetOffsetEventExceptionally(Exception ex) {
doThrow(ex).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ResetOffsetEvent.class));
}

private void completeCommitAsyncApplicationEventSuccessfully() {
doAnswer(invocation -> {
AsyncCommitEvent event = invocation.getArgument(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
Expand All @@ -39,6 +40,7 @@
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -178,6 +180,18 @@ public void testAssignmentChangeEventWithException() {
assertInstanceOf(IllegalStateException.class, e.getCause());
}

@Test
public void testResetOffsetEvent() {
Collection<TopicPartition> tp = Collections.singleton(new TopicPartition("topic", 0));
OffsetResetStrategy strategy = OffsetResetStrategy.EARLIEST;
ResetOffsetEvent event = new ResetOffsetEvent(tp, strategy, calculateDeadlineMs(time, 100));

setupProcessor(false);

processor.process(event);
verify(subscriptionState).requestOffsetReset(tp, strategy);
}

@Test
public void testSeekUnvalidatedEvent() {
TopicPartition tp = new TopicPartition("topic", 0);
Expand Down
Loading