Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void onWorkerFinished(final String messageId, final Map<String, Object> m
}

@Override
public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) {
public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, int numberOfMessagesInProgress) {
if (isItDead(!runningTasks.isEmpty(), numberOfMessages)) {
LOG.info("It looks like some tasks are zombies on {} worker queue, so all tasks currently in state running are released.", workerQueueName);
listeners.forEach(QueueWatchDogListener::reset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

/**
* Class to be used at startup. The Scheduler should not start before it is known how many messages are still on the queue.
* This to register any work that is still on the queuue and to properly calculate load metrics.
* Because the Taskmanager is not aware of the tasks already on the queue and therefore otherwise these messaages won't be counted in the metrics.
* This to register any work that is still on the queue and to properly calculate load metrics.
* Because the Task Manager is not aware of the tasks already on the queue and therefore otherwise these messages won't be counted in the metrics.
* This can result in the metrics being skewed, and thereby negatively reporting load metrics.
*/
public class StartupGuard implements WorkerSizeObserver {
Expand All @@ -47,7 +47,7 @@ public void waitForOpen() throws InterruptedException {
}

@Override
public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) {
public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, final int numberOfMessagesInProgress) {
synchronized (openSemaphore) {
if (!open) {
open = true;
Expand Down
Loading
Loading