Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17505: New consumer seekToBeginning/End should run in background thread #17230

Open
wants to merge 14 commits into
base: trunk
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
Expand Down Expand Up @@ -825,27 +826,24 @@ public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)

@Override
public void seekToBeginning(Collection<TopicPartition> partitions) {
if (partitions == null)
throw new IllegalArgumentException("Partitions collection cannot be null");

acquireAndEnsureOpen();
try {
Collection<TopicPartition> parts = partitions.isEmpty() ? subscriptions.assignedPartitions() : partitions;
subscriptions.requestOffsetReset(parts, OffsetResetStrategy.EARLIEST);
} finally {
release();
}
seekWithResetEventInternal(partitions, OffsetResetStrategy.EARLIEST);
}

@Override
public void seekToEnd(Collection<TopicPartition> partitions) {
seekWithResetEventInternal(partitions, OffsetResetStrategy.LATEST);
}

private void seekWithResetEventInternal(Collection<TopicPartition> partitions, OffsetResetStrategy offsetResetStrategy) {
if (partitions == null)
throw new IllegalArgumentException("Partitions collection cannot be null");

acquireAndEnsureOpen();
try {
Collection<TopicPartition> parts = partitions.isEmpty() ? subscriptions.assignedPartitions() : partitions;
subscriptions.requestOffsetReset(parts, OffsetResetStrategy.LATEST);
Timer timer = time.timer(defaultApiTimeoutMs);
ResetOffsetEvent resetOffsetEvent = new ResetOffsetEvent(partitions, offsetResetStrategy,
calculateDeadlineMs(timer));
applicationEventHandler.addAndGet(resetOffsetEvent);
} finally {
release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public abstract class ApplicationEvent {

public enum Type {
COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, TOPIC_METADATA, ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE,
LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, RESET_OFFSET, TOPIC_METADATA, ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE,
UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
COMMIT_ON_CLOSE,
SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.slf4j.Logger;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -99,6 +100,10 @@ public void process(ApplicationEvent event) {
process((ListOffsetsEvent) event);
return;

case RESET_OFFSET:
process((ResetOffsetEvent) event);
return;

case CHECK_AND_UPDATE_POSITIONS:
process((CheckAndUpdatePositionsEvent) event);
return;
Expand Down Expand Up @@ -261,6 +266,17 @@ private void process(final UnsubscribeEvent event) {
}
}

private void process(final ResetOffsetEvent event) {
try {
Collection<TopicPartition> parts = event.topicPartitions().isEmpty() ?
subscriptions.assignedPartitions() : event.topicPartitions();
subscriptions.requestOffsetReset(parts, event.offsetResetStrategy());
event.future().complete(null);
} catch (Exception e) {
event.future().completeExceptionally(e);
}
}

/**
* Check if all assigned partitions have fetch positions. If there are missing positions, fetch offsets and use
* them to update positions in the subscription state.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.Collection;
import java.util.Collections;
import java.util.Objects;

/**
* Event to perform {@link AsyncKafkaConsumer#seekToBeginning(Collection)} and {@link AsyncKafkaConsumer#seekToEnd(Collection)}
* in the background thread. This can avoid race conditions when subscription state is updated.
*/
public class ResetOffsetEvent extends CompletableApplicationEvent<Void> {

private final Collection<TopicPartition> topicPartitions;

private final OffsetResetStrategy offsetResetStrategy;

public ResetOffsetEvent(Collection<TopicPartition> topicPartitions, OffsetResetStrategy offsetResetStrategy, long deadline) {
super(Type.RESET_OFFSET, deadline);
this.topicPartitions = Collections.unmodifiableCollection(topicPartitions);
this.offsetResetStrategy = Objects.requireNonNull(offsetResetStrategy);
}

public Collection<TopicPartition> topicPartitions() {
return topicPartitions;
}

public OffsetResetStrategy offsetResetStrategy() {
TaiJuWu marked this conversation as resolved.
Show resolved Hide resolved
return offsetResetStrategy;
}

@Override
public String toStringBase() {
return super.toStringBase() + ", topicPartitions=" + topicPartitions + ", offsetStrategy=" + offsetResetStrategy;
}
}
TaiJuWu marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
Expand Down Expand Up @@ -1904,6 +1905,68 @@ public void testUnsubscribeWithoutGroupId() {
verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
}

@Test
public void testSeekToBeginning() {
SubscriptionState subscriptions = mock(SubscriptionState.class);
Collection<TopicPartition> topics = Collections.singleton(new TopicPartition("test", 0));
consumer = newConsumer(
mock(FetchBuffer.class),
new ConsumerInterceptors<>(Collections.emptyList()),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id");
completeResetOffsetEventSuccessfully();
consumer.seekToBeginning(topics);
verify(subscriptions).requestOffsetReset(topics, OffsetResetStrategy.EARLIEST);
}

@Test
public void testSeekToBeginningWithException() {
SubscriptionState subscriptions = mock(SubscriptionState.class);
Collection<TopicPartition> topics = Collections.singleton(new TopicPartition("test", 0));
consumer = newConsumer(
mock(FetchBuffer.class),
new ConsumerInterceptors<>(Collections.emptyList()),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id");
completeResetOffsetEventExceptionally(new TimeoutException());
assertThrows(TimeoutException.class, () -> consumer.seekToBeginning(topics));
}

@Test
public void testSeekToEndWithException() {
SubscriptionState subscriptions = mock(SubscriptionState.class);
Collection<TopicPartition> topics = Collections.singleton(new TopicPartition("test", 0));
consumer = newConsumer(
mock(FetchBuffer.class),
new ConsumerInterceptors<>(Collections.emptyList()),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id");
completeResetOffsetEventExceptionally(new TimeoutException());
assertThrows(TimeoutException.class, () -> consumer.seekToEnd(topics));
}

@Test
public void testSeekToEnd() {
SubscriptionState subscriptions = mock(SubscriptionState.class);
Collection<TopicPartition> topics = Collections.singleton(new TopicPartition("test", 0));
consumer = newConsumer(
mock(FetchBuffer.class),
new ConsumerInterceptors<>(Collections.emptyList()),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id");
completeResetOffsetEventSuccessfully();
consumer.seekToEnd(topics);
verify(subscriptions).requestOffsetReset(topics, OffsetResetStrategy.LATEST);
}

private void verifyUnsubscribeEvent(SubscriptionState subscriptions) {
// Check that an unsubscribe event was generated, and that the consumer waited for it to
// complete processing background events.
Expand Down Expand Up @@ -1959,6 +2022,20 @@ private void completeCommitSyncApplicationEventExceptionally(Exception ex) {
}).when(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitEvent.class));
}

private void completeResetOffsetEventSuccessfully() {
doAnswer(invocation -> {
ResetOffsetEvent event = invocation.getArgument(0);
// Collections.unmodifiedCollection does not support hashcode compare so we need to create a new one
Collection<TopicPartition> partitions = Collections.singleton(event.topicPartitions().iterator().next());
consumer.subscriptions().requestOffsetReset(partitions, event.offsetResetStrategy());
return true;
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ResetOffsetEvent.class));
}

private void completeResetOffsetEventExceptionally(Exception ex) {
doThrow(ex).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ResetOffsetEvent.class));
}

private void completeCommitAsyncApplicationEventSuccessfully() {
doAnswer(invocation -> {
AsyncCommitEvent event = invocation.getArgument(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
Expand All @@ -39,6 +40,7 @@
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -48,6 +50,7 @@
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;

import static org.apache.kafka.clients.consumer.internals.events.ApplicationEvent.Type.RESET_OFFSET;
import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
Expand Down Expand Up @@ -177,6 +180,22 @@ public void testAssignmentChangeEventWithException() {
assertInstanceOf(IllegalStateException.class, e.getCause());
}

@Test
public void testResetOffsetEvent() {
Collection<TopicPartition> tp = Collections.singleton(new TopicPartition("topic", 0));
OffsetResetStrategy strategy = OffsetResetStrategy.LATEST;

ResetOffsetEvent event = mock(ResetOffsetEvent.class);
doReturn(strategy).when(event).offsetResetStrategy();
doReturn(tp).when(event).topicPartitions();
doReturn(RESET_OFFSET).when(event).type();
doReturn(CompletableFuture.completedFuture(null)).when(event).future();
setupProcessor(false);

processor.process(event);
verify(subscriptionState).requestOffsetReset(tp, strategy);
}

@Test
public void testSeekUnvalidatedEvent() {
TopicPartition tp = new TopicPartition("topic", 0);
Expand Down