Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.config.InternalSessionConfigs;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -402,6 +403,8 @@ public void flushAndAwaitPendingEvents() {
throw fatalException;
}

logInternalSummary(false);

try {
// Make a (non-blocking) pass through the finalizing queue up front, to
// free up any recently-processed events before we attempt to drain the
Expand Down Expand Up @@ -473,6 +476,7 @@ private int finalizeAtLeastOneEvent() {
return numFinalized;
}

Instant blockedCheckStart = Instant.now();
while (true) {
// There is a low chance that an event fails in such a way that it's never placed on
// the finalizing queue (this _should_ never happen unless there is a bug, which is
Expand All @@ -490,6 +494,41 @@ private int finalizeAtLeastOneEvent() {
}

checkFatalExceptionsFromAsyncThreadPool();
if (Duration.between(blockedCheckStart, Instant.now()).compareTo(Duration.ofMinutes(1)) > 0) {
log.info("blocked on finalize for at least a minute");
logInternalSummary(true);
blockedCheckStart = Instant.now();
}
}
}

private void logInternalSummary(boolean info) {
final var inFlight
= Optional.ofNullable(threadPool.getInFlight(asyncProcessorName, taskId.partition()))
.map(Map::size)
.map(Object::toString)
.orElse("null");
final String msg =
"pending({}), finalizable({}), in-flight({}), shutdown({}), scheduled({}), waiting({})";
if (info) {
log.info(msg,
pendingEvents.size(),
finalizingQueue.size(),
inFlight,
threadPool.isShutdown(),
schedulingQueue.size(),
schedulingQueue.blockedEntries()
);
} else {
// TODO: switch to a periodic log
log.debug(msg,
pendingEvents.size(),
finalizingQueue.size(),
inFlight,
threadPool.isShutdown(),
schedulingQueue.size(),
schedulingQueue.blockedEntries()
);
}
}

Expand All @@ -505,6 +544,8 @@ private int finalizeAtLeastOneEvent() {
*/
private void maybeBackOffEnqueuingNewEventWithKey(final KIn key) {
while (schedulingQueue.keyQueueIsFull(key)) {
// TODO: switch to a periodic log
log.debug("key queue is full. back off until there is room on key queue");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's log which key is backing off, size of queue and back off time (if that's easily accessible)

drainSchedulingQueue();

if (schedulingQueue.keyQueueIsFull(key)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ public void removeProcessor(final String processorName, final int partition) {
if (inFlightForTask != null) {
log.info("Cancelling {} pending records for {}[{}]",
inFlightForTask.size(), processorName, partition);
if (!inFlightForTask.isEmpty()) {
log.info("ANTITHESIS SOMETIMES: cancelling {} pending records for {}[{}]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is ANTITHESIS SOMETIMES a pragma?

inFlightForTask.size(), processorName, partition);
}
inFlightForTask.values().forEach(f -> f.future().cancel(true));
}
}
Expand Down Expand Up @@ -253,6 +257,10 @@ public void shutdown() {
executor.shutdownNow();
}

public boolean isShutdown() {
return executor.isShutdown();
}

static class InFlightEvent {
private final CompletableFuture<StreamsException> future;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ public void addFinalizableEvent(
finalizableRecords.addLast(event);
}

public int size() {
return finalizableRecords.size();
}

/**
* See {@link WriteOnlyFinalizingQueue#addFailedEvent}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public boolean isEmpty() {
return processableEvents.isEmpty() && blockedEvents.isEmpty();
}

public int size() {
return blockedEvents.values().stream().mapToInt(KeyEventQueue::size).sum();
}

public int blockedEntries() {
return blockedEvents.values().stream().mapToInt(q -> q.blockedEvents.size()).sum();
}

public int totalEnqueuedEvents() {
return processableEvents.size() + blockedEvents.values().stream()
.mapToInt(keyEventQueue -> keyEventQueue.blockedEvents.size())
Expand Down Expand Up @@ -204,6 +212,9 @@ public void addBlockedEvent(final AsyncEvent event) {
throw new IllegalStateException("Attempted to add event while key queue was full");
}

// todo: use a periodic logger
log.debug("enqueuing key onto blocked async processor scheduler queue");

blockedEvents.add(event);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ public boolean isEmpty() {
return wrapped.isEmpty();
}

@Override
public int size() {
return wrapped.size();
}

@Override
public int blockedEntries() {
return wrapped.blockedEntries();
}

@Override
public int totalEnqueuedEvents() {
return wrapped.totalEnqueuedEvents();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ public interface SchedulingQueue<KIn> {
void offer(AsyncEvent event);

boolean keyQueueIsFull(KIn key);

int size();

int blockedEntries();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: numBlockedEntries()

}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public class CommitBuffer<K extends Comparable<K>, P>
private KafkaFuture<DeletedRecords> deleteRecordsFuture = KafkaFuture.completedFuture(null);

private Instant lastFlush;
private long lastFlushedOffset = -1;

static <K extends Comparable<K>, P> CommitBuffer<K, P> from(
final BatchFlusher<K, P> batchFlusher,
Expand Down Expand Up @@ -409,6 +410,18 @@ private boolean triggerFlush() {
}

public void flush(final long consumedOffset) {
if (consumedOffset < lastFlushedOffset) {
log.error("trying to commit an offset {} older than last flushed {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when can this happen? Can we make this error message more actionable (or is it safe to ignore/just got fenced?)

consumedOffset,
lastFlushedOffset
);
throw new IllegalStateException(String.format(
"trying to commit an offset(%d) older than last flushed(%d)",
consumedOffset,
lastFlushedOffset
));
}

if (!triggerFlush()) {
return;
}
Expand All @@ -417,6 +430,7 @@ public void flush(final long consumedOffset) {

doFlush(consumedOffset, maxBatchSize);

lastFlushedOffset = consumedOffset;
lastFlush = clock.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ public synchronized List<ResponsiveStoreRegistration> getRegisteredStoresForChan
"there should always be a store for the thread (%s) if there are stores registered "
+ "for this topic partition (%s)", threadId, topicPartition));
}
if (storesForThread.size() > 1) {
LOGGER.warn("found more than 1 registration: {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably already in each registration but let's just add found more than 1 registration for {threadId}: {}"

storesForThread.stream().map(ResponsiveStoreRegistration::toString)
.collect(Collectors.joining(","))
);
}
return storesForThread;
}

Expand Down