Skip to content

Commit 146a17d

Browse files
dclimfjy
authored andcommitted
KafkaIndexTask: allow pause to break out of retry loop (apache#3401)
1 parent 8d2ae14 commit 146a17d

File tree

2 files changed

+57
-27
lines changed

2 files changed

+57
-27
lines changed

extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import com.google.common.base.Joiner;
3030
import com.google.common.base.Optional;
3131
import com.google.common.base.Preconditions;
32-
import com.google.common.base.Predicate;
3332
import com.google.common.base.Supplier;
3433
import com.google.common.base.Throwables;
3534
import com.google.common.collect.ImmutableList;
@@ -40,7 +39,6 @@
4039
import com.google.common.collect.Sets;
4140
import com.google.common.primitives.Ints;
4241
import com.metamx.common.ISE;
43-
import com.metamx.common.RetryUtils;
4442
import com.metamx.common.guava.Sequence;
4543
import com.metamx.common.logger.Logger;
4644
import com.metamx.common.parsers.ParseException;
@@ -97,7 +95,6 @@
9795
import java.util.Properties;
9896
import java.util.Random;
9997
import java.util.Set;
100-
import java.util.concurrent.Callable;
10198
import java.util.concurrent.ConcurrentHashMap;
10299
import java.util.concurrent.TimeUnit;
103100
import java.util.concurrent.locks.Condition;
@@ -121,6 +118,7 @@ public enum Status
121118
private static final String TYPE = "index_kafka";
122119
private static final Random RANDOM = new Random();
123120
private static final long POLL_TIMEOUT = 100;
121+
private static final long POLL_RETRY_MS = 30000;
124122
private static final String METADATA_NEXT_PARTITIONS = "nextPartitions";
125123

126124
private final DataSchema dataSchema;
@@ -161,6 +159,8 @@ public enum Status
161159
private final Lock pauseLock = new ReentrantLock();
162160
private final Condition hasPaused = pauseLock.newCondition();
163161
private final Condition shouldResume = pauseLock.newCondition();
162+
private final Lock pollRetryLock = new ReentrantLock();
163+
private final Condition isAwaitingRetry = pollRetryLock.newCondition();
164164
private volatile boolean pauseRequested = false;
165165
private volatile long pauseMillis = 0;
166166

@@ -342,6 +342,7 @@ public void run()
342342
// Main loop.
343343
// Could eventually support leader/follower mode (for keeping replicas more in sync)
344344
boolean stillReading = !assignment.isEmpty();
345+
status = Status.READING;
345346
try {
346347
while (stillReading) {
347348
if (possiblyPause(assignment)) {
@@ -363,30 +364,23 @@ public void run()
363364
// The retrying business is because the KafkaConsumer throws OffsetOutOfRangeException if the seeked-to
364365
// offset is not present in the topic-partition. This can happen if we're asking a task to read from data
365366
// that has not been written yet (which is totally legitimate). So let's wait for it to show up.
366-
final ConsumerRecords<byte[], byte[]> records = RetryUtils.retry(
367-
new Callable<ConsumerRecords<byte[], byte[]>>()
368-
{
369-
@Override
370-
public ConsumerRecords<byte[], byte[]> call() throws Exception
371-
{
372-
try {
373-
return consumer.poll(POLL_TIMEOUT);
374-
}
375-
finally {
376-
status = Status.READING;
377-
}
378-
}
379-
},
380-
new Predicate<Throwable>()
381-
{
382-
@Override
383-
public boolean apply(Throwable input)
384-
{
385-
return input instanceof OffsetOutOfRangeException;
386-
}
387-
},
388-
Integer.MAX_VALUE
389-
);
367+
ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
368+
try {
369+
records = consumer.poll(POLL_TIMEOUT);
370+
}
371+
catch (OffsetOutOfRangeException e) {
372+
log.warn("OffsetOutOfRangeException with message [%s], retrying in %dms", e.getMessage(), POLL_RETRY_MS);
373+
pollRetryLock.lockInterruptibly();
374+
try {
375+
long nanos = TimeUnit.MILLISECONDS.toNanos(POLL_RETRY_MS);
376+
while (nanos > 0L && !pauseRequested) {
377+
nanos = isAwaitingRetry.awaitNanos(nanos);
378+
}
379+
}
380+
finally {
381+
pollRetryLock.unlock();
382+
}
383+
}
390384

391385
for (ConsumerRecord<byte[], byte[]> record : records) {
392386
if (log.isTraceEnabled()) {
@@ -689,6 +683,14 @@ public Response pause(@QueryParam("timeout") @DefaultValue("0") final long timeo
689683
pauseMillis = timeout <= 0 ? PAUSE_FOREVER : timeout;
690684
pauseRequested = true;
691685

686+
pollRetryLock.lockInterruptibly();
687+
try {
688+
isAwaitingRetry.signalAll();
689+
}
690+
finally {
691+
pollRetryLock.unlock();
692+
}
693+
692694
if (isPaused()) {
693695
shouldResume.signalAll(); // kick the monitor so it re-awaits with the new pauseMillis
694696
}

extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1200,6 +1200,34 @@ public void testRunAndPauseAfterReadWithModifiedEndOffsets() throws Exception
12001200
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc3));
12011201
}
12021202

1203+
@Test(timeout = 30_000L)
1204+
public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception
1205+
{
1206+
final KafkaIndexTask task = createTask(
1207+
null,
1208+
new KafkaIOConfig(
1209+
"sequence0",
1210+
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)),
1211+
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
1212+
kafkaServer.consumerProperties(),
1213+
true,
1214+
false,
1215+
null
1216+
),
1217+
null
1218+
);
1219+
1220+
runTask(task);
1221+
1222+
while (!task.getStatus().equals(KafkaIndexTask.Status.READING)) {
1223+
Thread.sleep(2000);
1224+
}
1225+
1226+
task.pause(0);
1227+
1228+
Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
1229+
}
1230+
12031231
private ListenableFuture<TaskStatus> runTask(final Task task)
12041232
{
12051233
try {

0 commit comments

Comments
 (0)