diff --git a/source/taskmanager-client/src/main/java/nl/aerius/taskmanager/client/mq/RabbitMQWorkerMonitor.java b/source/taskmanager-client/src/main/java/nl/aerius/taskmanager/client/mq/RabbitMQWorkerMonitor.java deleted file mode 100644 index 928533e..0000000 --- a/source/taskmanager-client/src/main/java/nl/aerius/taskmanager/client/mq/RabbitMQWorkerMonitor.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Copyright the State of the Netherlands - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see http://www.gnu.org/licenses/. - */ -package nl.aerius.taskmanager.client.mq; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.AlreadyClosedException; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.DefaultConsumer; -import com.rabbitmq.client.Envelope; -import com.rabbitmq.client.ShutdownSignalException; - -import nl.aerius.taskmanager.client.BrokerConnectionFactory; - -/** - * Watches the RabbitMQ event channel for changes in consumers to dynamically monitor the number of workers added or removed. - */ -public class RabbitMQWorkerMonitor { - - /** - * Observer registered for a specific worker queue that gets called when an update event is received. - */ - public interface RabbitMQWorkerObserver { - /** - * Gives updated values for worker queue size and utilisation. - * - * @param size number of workers available - * @param utilisation number of workers being bussy - */ - void updateWorkers(String workerQueueName, int size, int utilisation); - } - - private static final Logger LOG = LoggerFactory.getLogger(RabbitMQWorkerMonitor.class); - - public static final String EXCHANGE_TYPE = "fanout"; - public static final String AERIUS_EVENT_EXCHANGE = "aerius.events"; - - public static final String HEADER_PARAM_QUEUE = "queue"; - public static final String HEADER_PARAM_WORKER_SIZE = "size"; - public static final String HEADER_PARAM_UTILISATION = "utilisation"; - - private final BrokerConnectionFactory factory; - private final List observers = new ArrayList<>(); - private final AtomicBoolean tryStartingConsuming = new AtomicBoolean(); - private boolean isShutdown; - private DefaultConsumer consumer; - private Channel channel; - private String queueName; - - /** - * Constructor - * - * @param factory connection factory - */ - public RabbitMQWorkerMonitor(final BrokerConnectionFactory factory) { - this.factory = factory; - } - - /** - * Add the observer. - * - * @param observer observer to add - */ - public void addObserver(final RabbitMQWorkerObserver observer) { - synchronized (observers) { - observers.add(observer); - } - } - - /** - * Remove the observer. - * - * @param observer observer to remove - */ - public void removeObserver(final RabbitMQWorkerObserver observer) { - synchronized (observers) { - observers.remove(observer); - } - } - - /** - * Start the watcher. - * - * @throws IOException - */ - public void start() throws IOException { - tryStartingConsuming.set(true); - while (!isShutdown) { - try { - stopAndStartConsumer(); - LOG.debug("Successfully (re)started consumer RabbitMQWorkerMonitor"); - if (consumer.getChannel().isOpen()) { - tryStartingConsuming.set(false); - break; - } - } catch (final ShutdownSignalException | IOException e1) { - LOG.warn("(Re)starting consumer RabbitMQWorkerMonitor failed, retrying", e1); - } - } - } - - private void stopAndStartConsumer() throws IOException { - synchronized (this) { - if (consumer != null) { - try { - if (consumer.getChannel().isOpen()) { - consumer.getChannel().basicCancel(queueName); - } - } catch (final AlreadyClosedException | IOException e) { - LOG.debug("Exception while stopping consuming, ignoring.", e); - } - } - final Connection c = factory.getConnection(); - channel = c.createChannel(); - channel.exchangeDeclare(AERIUS_EVENT_EXCHANGE, EXCHANGE_TYPE); - queueName = channel.queueDeclare().getQueue(); - channel.queueBind(queueName, AERIUS_EVENT_EXCHANGE, ""); - consumer = new MonitorConsumer(channel); - channel.basicConsume(queueName, true, consumer); - } - } - - private void handleDelivery(final AMQP.BasicProperties properties) { - final Map headers = properties.getHeaders(); - final String workerQueueName = getParam(headers, HEADER_PARAM_QUEUE); - final int workerSize = getParamInt(headers, HEADER_PARAM_WORKER_SIZE, -1); - final int utilisationSize = getParamInt(headers, HEADER_PARAM_UTILISATION, -1); - - synchronized (observers) { - observers.forEach(ro -> ro.updateWorkers(workerQueueName, workerSize, utilisationSize)); - } - } - - /** - * Returns the value in the header parameter or null. - * - * @param headers headers map - * @param param param to return the value for - * @return value of param or null if not present - */ - private static String getParam(final Map headers, final String param) { - final Object value = headers.get(param); - - return value == null ? null : value.toString(); - } - - /** - * Returns the int value in the header parameter or the other value. - * - * @param headers headers map - * @param param param to return the value for - * @param other if param is not present this value is returned - * @return int value of param or other if not present - */ - private static int getParamInt(final Map headers, final String param, final int other) { - final String value = getParam(headers, param); - - try { - return value == null ? other : Integer.valueOf(value); - } catch (final NumberFormatException e) { - LOG.error("Error parsing param " + param + ", not an int value but: " + value, e); - return other; - } - } - - /** - * Shutdown the monitoring process. - */ - public void shutdown() { - isShutdown = true; - try { - channel.close(); - } catch (final IOException | TimeoutException e) { - LOG.trace("Worker event monitor shutdown failed", e); - } - } - - private class MonitorConsumer extends DefaultConsumer { - public MonitorConsumer(final Channel channel) { - super(channel); - } - - @Override - public void handleDelivery(final String consumerTag, final Envelope envelope, final AMQP.BasicProperties properties, final byte[] body) - throws IOException { - RabbitMQWorkerMonitor.this.handleDelivery(properties); - } - - @Override - public void handleShutdownSignal(final String consumerTag, final ShutdownSignalException sig) { - if (sig.isInitiatedByApplication()) { - isShutdown = true; - LOG.info("Worker event monitor {} was shut down by the application.", consumerTag); - } else { - LOG.debug("Worker event monitor {} was shut down.", consumerTag); - // restart - try { - abort(); - if (!tryStartingConsuming.get()) { - start(); - LOG.info("Restarted worker event monitor {}", consumerTag); - } - } catch (final IOException e) { - LOG.debug("Worker event monitor restart failed", e); - } - } - } - - private void abort() { - try { - channel.abort(); - } catch (final IOException e) { - // Eat error when closing channel. - } - } - } -} diff --git a/source/taskmanager-client/src/test/java/nl/aerius/taskmanager/client/mq/RabbitMQWorkerMonitorTest.java b/source/taskmanager-client/src/test/java/nl/aerius/taskmanager/client/mq/RabbitMQWorkerMonitorTest.java deleted file mode 100644 index ade3453..0000000 --- a/source/taskmanager-client/src/test/java/nl/aerius/taskmanager/client/mq/RabbitMQWorkerMonitorTest.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright the State of the Netherlands - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see http://www.gnu.org/licenses/. - */ -package nl.aerius.taskmanager.client.mq; - -import static nl.aerius.taskmanager.client.mq.RabbitMQWorkerMonitor.HEADER_PARAM_QUEUE; -import static nl.aerius.taskmanager.client.mq.RabbitMQWorkerMonitor.HEADER_PARAM_UTILISATION; -import static nl.aerius.taskmanager.client.mq.RabbitMQWorkerMonitor.HEADER_PARAM_WORKER_SIZE; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; - -import com.rabbitmq.client.AMQP.BasicProperties; -import com.rabbitmq.client.AMQP.Queue; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.Consumer; - -import nl.aerius.taskmanager.client.BrokerConnectionFactory; -import nl.aerius.taskmanager.client.mq.RabbitMQWorkerMonitor.RabbitMQWorkerObserver; - -/** - * Test class for {@link RabbitMQWorkerMonitor}. - */ -class RabbitMQWorkerMonitorTest { - - private static final String TEST_QUEUENAME = "test"; - - private static ExecutorService executor; - - private RabbitMQWorkerMonitor monitor; - private Channel mockChannel; - - @BeforeAll - static void setupClass() { - executor = Executors.newSingleThreadExecutor(); - } - - @AfterAll - static void afterClass() { - executor.shutdown(); - } - - @BeforeEach - void setUp() throws Exception { - final Connection mockConnection = Mockito.mock(Connection.class); - mockChannel = Mockito.mock(Channel.class); - doReturn(mockChannel).when(mockConnection).createChannel(); - when(mockChannel.isOpen()).thenReturn(true); - final Queue.DeclareOk mockDeclareOk = Mockito.mock(Queue.DeclareOk.class); - doReturn(mockDeclareOk).when(mockChannel).queueDeclare(); - monitor = new RabbitMQWorkerMonitor(new BrokerConnectionFactory(executor) { - @Override - protected Connection createNewConnection() throws IOException { - return mockConnection; - } - }); - } - - @Test - void testReceiving() throws IOException { - final AtomicInteger sizeAI = new AtomicInteger(); - final AtomicInteger utilisationAI = new AtomicInteger(); - final RabbitMQWorkerObserver observer = (workerQueueName, size, utilisation) -> { - sizeAI.set(size); - utilisationAI.set(utilisation); - }; - monitor.addObserver(observer); - doAnswer(i -> { - final Map headers = new HashMap<>(); - - headers.put(HEADER_PARAM_QUEUE, TEST_QUEUENAME); - headers.put(HEADER_PARAM_WORKER_SIZE, 10); - headers.put(HEADER_PARAM_UTILISATION, 5); - ((Consumer) i.getArgument(2)).handleDelivery(null, null, new BasicProperties().builder().headers(headers).build(), null); - return null; - }).when(mockChannel).basicConsume(any(), eq(true), any()); - monitor.start(); - assertEquals(10, sizeAI.get(), "Expected received size doesn't match"); - assertEquals(5, utilisationAI.get(), "Expected utilisation doesn't match"); - monitor.removeObserver(observer); - } -} diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java index 2dd111c..c58db32 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java @@ -69,7 +69,7 @@ public void onWorkerFinished(final String messageId, final Map m } @Override - public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { + public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, int numberOfMessagesInProgress) { if (isItDead(!runningTasks.isEmpty(), numberOfMessages)) { LOG.info("It looks like some tasks are zombies on {} worker queue, so all tasks currently in state running are released.", workerQueueName); listeners.forEach(QueueWatchDogListener::reset); diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/StartupGuard.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/StartupGuard.java index 68c91a4..580eaad 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/StartupGuard.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/StartupGuard.java @@ -22,8 +22,8 @@ /** * Class to be used at startup. The Scheduler should not start before it is known how many messages are still on the queue. - * This to register any work that is still on the queuue and to properly calculate load metrics. - * Because the Taskmanager is not aware of the tasks already on the queue and therefore otherwise these messaages won't be counted in the metrics. + * This to register any work that is still on the queue and to properly calculate load metrics. + * Because the Task Manager is not aware of the tasks already on the queue and therefore otherwise these messages won't be counted in the metrics. * This can result in the metrics being skewed, and thereby negatively reporting load metrics. */ public class StartupGuard implements WorkerSizeObserver { @@ -47,7 +47,7 @@ public void waitForOpen() throws InterruptedException { } @Override - public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { + public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, final int numberOfMessagesInProgress) { synchronized (openSemaphore) { if (!open) { open = true; diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java index 61a226a..cd1d251 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java @@ -42,6 +42,10 @@ import nl.aerius.taskmanager.domain.TaskSchedule; import nl.aerius.taskmanager.metrics.OpenTelemetryMetrics; import nl.aerius.taskmanager.metrics.PerformanceMetricsReporter; +import nl.aerius.taskmanager.metrics.RabbitMQUsageMetricsProvider; +import nl.aerius.taskmanager.metrics.TaskManagerMetricsRegister; +import nl.aerius.taskmanager.metrics.TaskManagerUsageMetricsProvider; +import nl.aerius.taskmanager.metrics.TaskManagerUsageMetricsWrapper; import nl.aerius.taskmanager.scheduler.TaskScheduler; import nl.aerius.taskmanager.scheduler.TaskScheduler.TaskSchedulerFactory; @@ -58,6 +62,7 @@ class TaskManager> { private final TaskSchedulerFactory schedulerFactory; private final WorkerSizeProviderProxy workerSizeObserverProxy; private final Map buckets = new HashMap<>(); + private final TaskManagerUsageMetricsWrapper taskManagerMetrics; public TaskManager(final ExecutorService executorService, final ScheduledExecutorService scheduledExecutorService, final AdaptorFactory factory, final TaskSchedulerFactory schedulerFactory, final WorkerSizeProviderProxy workerSizeObserverProxy) { @@ -66,6 +71,7 @@ public TaskManager(final ExecutorService executorService, final ScheduledExecuto this.factory = factory; this.schedulerFactory = schedulerFactory; this.workerSizeObserverProxy = workerSizeObserverProxy; + this.taskManagerMetrics = new TaskManagerUsageMetricsWrapper(OpenTelemetryMetrics.METER); } /** @@ -78,6 +84,7 @@ public void updateTaskScheduler(final TaskSchedule schedule) throws Interrupt // Set up scheduler with worker pool final String workerQueueName = schedule.getWorkerQueueName(); final QueueConfig workerQueueConfig = new QueueConfig(workerQueueName, schedule.isDurable(), schedule.isEagerFetch(), schedule.getQueueType()); + if (!buckets.containsKey(workerQueueName)) { LOG.info("Added scheduler for worker queue {}", workerQueueName); buckets.put(workerQueueName, new TaskScheduleBucket(workerQueueConfig)); @@ -112,6 +119,7 @@ public void removeTaskScheduler(final String workerQueueName) { public void shutdown() { buckets.forEach((k, v) -> v.shutdown()); buckets.clear(); + taskManagerMetrics.close(); } private class TaskScheduleBucket { @@ -129,17 +137,23 @@ public TaskScheduleBucket(final QueueConfig queueConfig) throws InterruptedExcep taskScheduler = schedulerFactory.createScheduler(queueConfig); workerProducer = factory.createWorkerProducer(queueConfig); final WorkerPool workerPool = new WorkerPool(workerQueueName, workerProducer, taskScheduler); + final TaskManagerUsageMetricsProvider taskManagerUsageMetrics = new TaskManagerUsageMetricsProvider(workerQueueName); + final TaskManagerMetricsRegister taskManagerMetricsRegister = new TaskManagerMetricsRegister(taskManagerUsageMetrics, startupGuard); final PerformanceMetricsReporter reporter = new PerformanceMetricsReporter(scheduledExecutorService, queueConfig.queueName(), - OpenTelemetryMetrics.METER, startupGuard); + OpenTelemetryMetrics.METER); watchDog.addQueueWatchDogListener(workerPool); watchDog.addQueueWatchDogListener(taskScheduler); watchDog.addQueueWatchDogListener(reporter); + watchDog.addQueueWatchDogListener(taskManagerMetricsRegister); workerProducer.addWorkerProducerHandler(reporter); + workerProducer.addWorkerProducerHandler(taskManagerMetricsRegister); workerProducer.addWorkerProducerHandler(watchDog); - workerSizeObserverProxy.addObserver(workerQueueName, reporter); + final RabbitMQUsageMetricsProvider rabbitMQWorkerSizeObserver = new RabbitMQUsageMetricsProvider(workerQueueName); + workerSizeObserverProxy.addObserver(workerQueueName, rabbitMQWorkerSizeObserver); + workerSizeObserverProxy.addObserver(workerQueueName, taskManagerMetricsRegister); workerSizeObserverProxy.addObserver(workerQueueName, workerPool); workerSizeObserverProxy.addObserver(workerQueueName, watchDog); // startup Guard should be the last observer added as it will unlock the task dispatcher @@ -158,6 +172,9 @@ public TaskScheduleBucket(final QueueConfig queueConfig) throws InterruptedExcep // Wait for worker queue to be empty. startupGuard.waitForOpen(); LOG.info("Starting task scheduler {}: {}", taskScheduler.getClass().getSimpleName(), queueConfig); + taskManagerMetrics.addRabbitMQUsageMetricsProvider(rabbitMQWorkerSizeObserver); + taskManagerMetrics.addWorkerPoolUsageMetricsProvider(workerPool); + taskManagerMetrics.addTaskManagerUsageMetricsProvider(taskManagerUsageMetrics); dispatcher.run(); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); @@ -220,6 +237,7 @@ private void removeTaskConsumer(final String taskQueueName) { public void shutdown() { dispatcher.shutdown(); workerProducer.shutdown(); + taskManagerMetrics.remove(workerQueueName); WorkerPoolMetrics.removeMetrics(workerQueueName); taskConsumers.forEach((k, v) -> v.shutdown()); } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java index 10b0332..61aac1a 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java @@ -26,7 +26,6 @@ import org.slf4j.LoggerFactory; import nl.aerius.taskmanager.adaptor.WorkerProducer; -import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerMetrics; import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerProducerHandler; import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; import nl.aerius.taskmanager.domain.QueueWatchDogListener; @@ -34,6 +33,7 @@ import nl.aerius.taskmanager.domain.TaskRecord; import nl.aerius.taskmanager.domain.WorkerUpdateHandler; import nl.aerius.taskmanager.exception.NoFreeWorkersException; +import nl.aerius.taskmanager.metrics.UsageMetricsProvider; /** * Class to manage workers. Contains a list of all available workers, which are: free workers, reserved workers and running workers. @@ -41,7 +41,7 @@ *

Reserved workers are workers that are waiting for a task to become available on the queue. *

Running workers are workers for that are busy running the task and are waiting for the task to finish. */ -class WorkerPool implements WorkerSizeObserver, WorkerProducerHandler, WorkerMetrics, QueueWatchDogListener { +class WorkerPool implements WorkerSizeObserver, WorkerProducerHandler, UsageMetricsProvider, QueueWatchDogListener { private static final Logger LOG = LoggerFactory.getLogger(WorkerPool.class); @@ -84,24 +84,34 @@ public void sendTaskToWorker(final Task task) throws IOException { LOG.trace("[{}][taskId:{}] Task sent", workerQueueName, task.getId()); } - public int getWorkerSize() { + @Override + public String getWorkerQueueName() { + return workerQueueName; + } + + @Override + public int getNumberOfWorkers() { synchronized (this) { return freeWorkers.availablePermits() + runningWorkers.size() + initialUnaccountedWorkers; } } - @Override public int getReportedWorkerSize() { return totalReportedWorkers; } @Override - public int getRunningWorkerSize() { + public int getNumberOfUsedWorkers() { synchronized (this) { return runningWorkers.size() + initialUnaccountedWorkers; } } + @Override + public int getNumberOfFreeWorkers() { + return freeWorkers.availablePermits(); + } + @Override public void onWorkerFinished(final String messageId, final Map messageMetaData) { releaseWorker(messageId); @@ -165,6 +175,17 @@ public void reserveWorker() { } } + @Override + public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, final int numberOfMessagesInProgress) { + synchronized (this) { + if (!firstUpdateReceived) { + initialUnaccountedWorkers = numberOfMessages; + firstUpdateReceived = true; + } + updateNumberOfWorkers(numberOfWorkers); + } + } + /** * Sets the number of workers which are actually available. This number should * be determined on the number of workers that are actually in operation. @@ -175,23 +196,11 @@ public void reserveWorker() { * workers matches the actual number. * * @param numberOfWorkers Actual size of number of workers in operation - * @param numberOfMessages Actual total number of messages on the queue */ - @Override - public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { - synchronized (this) { - if (!firstUpdateReceived) { - initialUnaccountedWorkers = numberOfMessages; - firstUpdateReceived = true; - } - updateNumberOfWorkers(numberOfWorkers); - } - } - private void updateNumberOfWorkers(final int numberOfWorkers) { final int previousTotalReportedWorkers = totalReportedWorkers; totalReportedWorkers = numberOfWorkers; - final int deltaWorkers = totalReportedWorkers - getWorkerSize(); + final int deltaWorkers = totalReportedWorkers - getNumberOfWorkers(); if (deltaWorkers > 0) { freeWorkers.release(deltaWorkers); diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPoolMetrics.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPoolMetrics.java index 23f1bf4..c75ef10 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPoolMetrics.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPoolMetrics.java @@ -25,6 +25,7 @@ import io.opentelemetry.api.metrics.ObservableDoubleGauge; import nl.aerius.taskmanager.metrics.OpenTelemetryMetrics; +import nl.aerius.taskmanager.metrics.UsageMetricsProvider; /** * Set up metric collection for this worker pool with the given type name. @@ -35,9 +36,13 @@ public final class WorkerPoolMetrics { private enum WorkerPoolMetricType { // @formatter:off - WORKER_SIZE(WorkerPool::getWorkerSize, "Number of workers based on internal state of taskmanager"), - CURRENT_WORKER_SIZE(WorkerPool::getReportedWorkerSize, "Current number of workers according to taskmanager"), - RUNNING_WORKER_SIZE(WorkerPool::getRunningWorkerSize, "Running (or occupied) number of workers according to taskmanager"); + WORKER_SIZE(UsageMetricsProvider::getNumberOfWorkers, "Number of workers based on internal state of taskmanager"), + @Deprecated + CURRENT_WORKER_SIZE(WorkerPool::getReportedWorkerSize, + "Current number of workers according to taskmanager (deprecated replaced with 'aer.taskmanager.workerppol.worker.usage')"), + @Deprecated + RUNNING_WORKER_SIZE(UsageMetricsProvider::getNumberOfUsedWorkers, + "Used number of workers according to taskmanager (deprecated replaced with 'aer.taskmanager.workerppol.worker.usage')"); // @formatter:on private final Function function; @@ -68,12 +73,13 @@ private WorkerPoolMetrics() { public static void setupMetrics(final WorkerPool workerPool, final String workerQueueName) { final Attributes attributes = OpenTelemetryMetrics.workerAttributes(workerQueueName); + for (final WorkerPoolMetricType metricType : WorkerPoolMetricType.values()) { REGISTERED_METRICS.put(gaugeIdentifier(workerQueueName, metricType), OpenTelemetryMetrics.METER.gaugeBuilder(metricType.getGaugeName()) - .setDescription(metricType.getDescription()) - .buildWithCallback( - result -> result.record(metricType.getValue(workerPool), attributes))); + .setDescription(metricType.getDescription()) + .buildWithCallback( + result -> result.record(metricType.getValue(workerPool), attributes))); } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerSizeObserver.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerSizeObserver.java index 7bcb427..54c57c3 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerSizeObserver.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerSizeObserver.java @@ -25,7 +25,8 @@ public interface WorkerSizeObserver { * Gives the number of workers processes connected on the queue. * * @param numberOfWorkers number of number of workers processes - * @param numberOfMessages Actual total number of messages on the queue + * @param numberOfMessages Total number of messages on the queue + * @param numberOfMessagesInProgress Number of messages being processed by the workers */ - void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages); + void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, int numberOfMessagesInProgress); } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java index 2a2a837..3e8cd0b 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java @@ -16,6 +16,8 @@ */ package nl.aerius.taskmanager.metrics; +import java.util.function.ToDoubleBiFunction; + /** * Class to keep track of work load per worker type. * Each time a new task is added (dispatched) or removed (work finished) the number of running workers is counted and calculated how long @@ -32,41 +34,50 @@ class LoadMetric { /** * Total measured time since the last time {@link #process()} was called. */ - private int totalMeasureTime; + private long totalMeasureTime; /** - * Total registered load time. + * Measured free workers as the sum of number of free workers for specific time moments. Sum of (free workers * time frame). + * Dividing this number by the total time of the time frame will give an average number of free workers. */ - private double totalLoad; + private double total; /** * Number of workers running at a time. */ - private int runningWorkers; + private int usedWorkers; /** * Total number of available workers. */ - private int totalWorkers; + private int numberOfWorkers; + private final ToDoubleBiFunction countFunction; + private final ToDoubleBiFunction sumFunction; + + public LoadMetric(final ToDoubleBiFunction countFunction, final ToDoubleBiFunction sumFunction) { + this.countFunction = countFunction; + this.sumFunction = sumFunction; + } /** * Register change in number of running workers. * - * @param deltaActiveWorkers number of jobs on the workers being added or subtracted. - * @param totalWorkers Total number of available workers + * @param deltaUsedWorkers number of jobs on the workers being added or subtracted. + * @param numberOfWorkers Number of available workers */ - public synchronized void register(final int deltaActiveWorkers, final int totalWorkers) { - this.totalWorkers = totalWorkers; + public synchronized void register(final int deltaUsedWorkers, final int numberOfWorkers) { + this.numberOfWorkers = numberOfWorkers; final long newLast = System.currentTimeMillis(); final long delta = newLast - last; - totalLoad += delta * (totalWorkers > 0 ? (runningWorkers / (double) totalWorkers) : 0); + + total += delta * countFunction.applyAsDouble(numberOfWorkers, usedWorkers); totalMeasureTime += delta; last = newLast; - runningWorkers += deltaActiveWorkers; + usedWorkers += deltaUsedWorkers; } /** * Resets the metric state. Sets running workers to 0, and resets the average load time by calling process. */ public synchronized void reset() { - runningWorkers = 0; + usedWorkers = 0; process(); } @@ -77,11 +88,11 @@ public synchronized void reset() { */ public synchronized double process() { // Call register here to set the end time this moment. This will calculate workers running up till now as being active. - register(0, totalWorkers); - final double averageLoad = (totalLoad * 100.0) / totalMeasureTime; + register(0, numberOfWorkers); + final double averageTotal = totalMeasureTime > 0 ? sumFunction.applyAsDouble(total, totalMeasureTime) : 0; totalMeasureTime = 0; - totalLoad = 0; - return averageLoad; + total = 0; + return averageTotal; } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/OpenTelemetryMetrics.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/OpenTelemetryMetrics.java index ef66e57..e381813 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/OpenTelemetryMetrics.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/OpenTelemetryMetrics.java @@ -24,7 +24,7 @@ import io.opentelemetry.api.metrics.Meter; /** - * Class to help with opntelemetry metrics within the taskmanager. + * Class to help with Open Telemetry metrics within the TaskManager. */ public final class OpenTelemetryMetrics { @@ -43,6 +43,13 @@ public static Attributes workerAttributes(final String workerType) { .build(); } + public static Attributes workerAttributes(final String workerType, final String atttributeName, final String attributeValue) { + return Attributes.builder() + .put(WORKER_TYPE_ATTRIBUTE, workerIdentifier(workerType)) + .put(atttributeName, attributeValue) + .build(); + } + public static Attributes queueAttributes(final String workerQueueName, final String queueName) { return Attributes.builder() .put(WORKER_TYPE_ATTRIBUTE, workerIdentifier(workerQueueName)) diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java index 7b07f78..88a0e96 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java @@ -31,9 +31,7 @@ import io.opentelemetry.api.metrics.DoubleGauge; import io.opentelemetry.api.metrics.Meter; -import nl.aerius.taskmanager.StartupGuard; import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerProducerHandler; -import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; import nl.aerius.taskmanager.client.TaskMetrics; import nl.aerius.taskmanager.domain.QueueWatchDogListener; import nl.aerius.taskmanager.metrics.DurationMetric.DurationMetricValue; @@ -53,7 +51,7 @@ * * - Average load (in percentage) of all workers (of a certain type) together. */ -public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueWatchDogListener, WorkerSizeObserver { +public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueWatchDogListener { private static final Logger LOG = LoggerFactory.getLogger(PerformanceMetricsReporter.class); @@ -76,26 +74,19 @@ public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueW private final DurationMetric dispatchedWorkerMetrics; private final Map workQueueMetrics = new HashMap<>(); private final DurationMetric workWorkerMetrics; - private final LoadMetric loadMetrics = new LoadMetric(); - - private final StartupGuard startupGuard; private final Meter meter; private final String queueGroupName; - private final DoubleGauge loadGauge; - private final Attributes workerAttributes; + private final Attributes attributesWorker; + // Keep track of dispatched tasks, because when taskmanager restarts it should not register tasks already on the queue // as it doesn't have any metrics on it anymore. private final Set dispatchedTasks = new HashSet<>(); - private int numberOfWorkers; - - public PerformanceMetricsReporter(final ScheduledExecutorService newScheduledThreadPool, final String queueGroupName, final Meter meter, - final StartupGuard startupGuard) { + public PerformanceMetricsReporter(final ScheduledExecutorService newScheduledThreadPool, final String queueGroupName, final Meter meter) { this.queueGroupName = queueGroupName; this.meter = meter; - this.startupGuard = startupGuard; // Gauges for measuring number of tasks, and average duration time it took before a task was send to to the worker. // Measures by worker and per queue to the worker @@ -117,12 +108,9 @@ public PerformanceMetricsReporter(final ScheduledExecutorService newScheduledThr workQueueDurationGauge = createGauge("aer.taskmanager.work.queue.duration", "Average duration time a task from a queue took to process on a worker, including wait time."); - // Average load time (in percentage) of the work load on all workers together. - loadGauge = meter.gaugeBuilder("aer.taskmanager.work.load").setDescription("Percentage of workers used in the timeframe.").build(); - - workerAttributes = OpenTelemetryMetrics.workerAttributes(queueGroupName); - dispatchedWorkerMetrics = new DurationMetric(workerAttributes); - workWorkerMetrics = new DurationMetric(workerAttributes); + attributesWorker = OpenTelemetryMetrics.workerAttributes(queueGroupName); + dispatchedWorkerMetrics = new DurationMetric(attributesWorker); + workWorkerMetrics = new DurationMetric(attributesWorker); newScheduledThreadPool.scheduleWithFixedDelay(this::update, 1, UPDATE_TIME_SECONDS, TimeUnit.SECONDS); } @@ -140,7 +128,7 @@ public void onWorkDispatched(final String messageId, final Map m taskMetrics.determineDuration(); dispatchedQueueMetrics.computeIfAbsent(taskMetrics.queueName(), k -> createQueueDurationMetric(taskMetrics)).register(taskMetrics); dispatchedWorkerMetrics.register(taskMetrics); - loadMetrics.register(1, numberOfWorkers); + } @Override @@ -149,16 +137,7 @@ public synchronized void onWorkerFinished(final String messageId, final Map createQueueDurationMetric(taskMetrics)).register(taskMetrics); workWorkerMetrics.register(taskMetrics); - loadMetrics.register(-1, numberOfWorkers); - } - @Override - public synchronized void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { - this.numberOfWorkers = numberOfWorkers; - if (!startupGuard.isOpen() && numberOfMessages > 0) { - LOG.info("Queue {} will be started with {} messages already on the queue.", queueGroupName, numberOfMessages); - loadMetrics.register(numberOfMessages, numberOfWorkers); - } } @Override @@ -167,7 +146,6 @@ public void reset() { dispatchedQueueMetrics.entrySet().forEach(e -> e.getValue().process()); dispatchedWorkerMetrics.process(); // work metrics not needed to be reset because they are about work already done. - loadMetrics.reset(); } private DurationMetric createQueueDurationMetric(final TaskMetrics taskMetrics) { @@ -180,7 +158,6 @@ private synchronized void update() { metrics(DISPATCH, dispatchedQueueCountGauge, dispatchedQueueWaitGauge, queueGroupName, dispatchedWorkerMetrics); metrics(WORK, workQueueMetrics, workWorkerCountGauge, workWorkerDurationGauge); metrics(WORK, workQueueCountGauge, workQueueDurationGauge, queueGroupName, workWorkerMetrics); - workLoad(); } catch (final RuntimeException e) { LOG.error("Update metrics failed.", e); } @@ -204,13 +181,4 @@ private static void metrics(final String prefixText, final DoubleGauge gauge, fi LOG.debug("{} for {}: {} ms/task (#tasks: {})", prefixText, name, metric.avgDuration(), count); } } - - private void workLoad() { - if (startupGuard.isOpen()) { - final double load = loadMetrics.process(); - - loadGauge.set(load, workerAttributes); - LOG.debug("Workload for '{}' is: {}%", queueGroupName, Math.round(load)); - } - } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/RabbitMQUsageMetricsProvider.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/RabbitMQUsageMetricsProvider.java new file mode 100644 index 0000000..24654c6 --- /dev/null +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/RabbitMQUsageMetricsProvider.java @@ -0,0 +1,65 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager.metrics; + +import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; + +public class RabbitMQUsageMetricsProvider implements WorkerSizeObserver, UsageMetricsProvider { + + private final String workerQueueName; + + private int numberOfWorkers; + private int numberOfMessages; + private int numberOfMessagesInProgress; + + public RabbitMQUsageMetricsProvider(final String workerQueueName) { + this.workerQueueName = workerQueueName; + } + + @Override + public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, final int numberOfMessagesInProgress) { + this.numberOfWorkers = numberOfWorkers; + this.numberOfMessages = numberOfMessages; + this.numberOfMessagesInProgress = numberOfMessagesInProgress; + } + + @Override + public String getWorkerQueueName() { + return workerQueueName; + } + + @Override + public int getNumberOfWorkers() { + return numberOfWorkers; + } + + @Override + public int getNumberOfUsedWorkers() { + return numberOfMessagesInProgress; + } + + @Override + public int getNumberOfFreeWorkers() { + return Math.max(0, numberOfWorkers - numberOfMessagesInProgress); + } + + @Override + public int getNumberOfWaiting() { + return Math.max(0, numberOfMessages - numberOfMessagesInProgress); + } + +} diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegister.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegister.java new file mode 100644 index 0000000..74694ce --- /dev/null +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegister.java @@ -0,0 +1,70 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager.metrics; + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import nl.aerius.taskmanager.StartupGuard; +import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerProducerHandler; +import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; +import nl.aerius.taskmanager.domain.QueueWatchDogListener; + +/** + * + */ +public class TaskManagerMetricsRegister implements WorkerProducerHandler, WorkerSizeObserver, QueueWatchDogListener { + + private static final Logger LOG = LoggerFactory.getLogger(TaskManagerMetricsRegister.class); + + private final TaskManagerUsageMetricsProvider taskManagerUsageMetricsProvider; + private final StartupGuard startupGuard; + + private int numberOfWorkers; + + public TaskManagerMetricsRegister(final TaskManagerUsageMetricsProvider taskManagerUsageMetricsProvider, final StartupGuard startupGuard) { + this.taskManagerUsageMetricsProvider = taskManagerUsageMetricsProvider; + this.startupGuard = startupGuard; + } + + @Override + public void onWorkDispatched(final String messageId, final Map messageMetaData) { + taskManagerUsageMetricsProvider.register(1, numberOfWorkers); + } + + @Override + public void onWorkerFinished(final String messageId, final Map messageMetaData) { + taskManagerUsageMetricsProvider.register(-1, numberOfWorkers); + } + + @Override + public synchronized void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, final int numberOfMessagesInProgress) { + this.numberOfWorkers = numberOfWorkers; + if (!startupGuard.isOpen() && numberOfMessages > 0) { + LOG.info("Queue {} will be started with {} messages already on the queue.", taskManagerUsageMetricsProvider.getWorkerQueueName(), + numberOfMessages); + taskManagerUsageMetricsProvider.register(numberOfMessages, numberOfWorkers); + } + } + + @Override + public void reset() { + taskManagerUsageMetricsProvider.reset(); + } +} diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerUsageMetricsProvider.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerUsageMetricsProvider.java new file mode 100644 index 0000000..3c7d877 --- /dev/null +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerUsageMetricsProvider.java @@ -0,0 +1,77 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager.metrics; + +import java.util.function.ToDoubleBiFunction; + +public class TaskManagerUsageMetricsProvider implements UsageMetricsProvider { + + private static final ToDoubleBiFunction LOAD_SUM_FUNCTION = (total, totalMeasureTime) -> (total * 100.0) / totalMeasureTime; + private static final ToDoubleBiFunction COUNT_SUM_FUNCTION = (total, totalMeasureTime) -> Math.floor(total / totalMeasureTime); + + private final String workerQueueName; + private final LoadMetric load; + private final LoadMetric limit; + private final LoadMetric used; + private final LoadMetric free; + + public TaskManagerUsageMetricsProvider(final String workerQueueName) { + this.workerQueueName = workerQueueName; + load = new LoadMetric((numberOfWorkers, usedWorkers) -> (numberOfWorkers > 0 ? (usedWorkers / (double) numberOfWorkers) : 0), LOAD_SUM_FUNCTION); + limit = new LoadMetric((numberOfWorkers, usedWorkers) -> numberOfWorkers, COUNT_SUM_FUNCTION); + used = new LoadMetric((numberOfWorkers, usedWorkers) -> usedWorkers, COUNT_SUM_FUNCTION); + free = new LoadMetric((numberOfWorkers, usedWorkers) -> numberOfWorkers - usedWorkers, COUNT_SUM_FUNCTION); + } + + public synchronized void register(final int deltaUsedWorkers, final int numberOfWorkers) { + load.register(deltaUsedWorkers, numberOfWorkers); + limit.register(deltaUsedWorkers, numberOfWorkers); + used.register(deltaUsedWorkers, numberOfWorkers); + free.register(deltaUsedWorkers, numberOfWorkers); + } + + @Override + public String getWorkerQueueName() { + return workerQueueName; + } + + public double getLoad() { + return load.process(); + } + + @Override + public int getNumberOfWorkers() { + return (int) Math.max(0, limit.process()); + } + + @Override + public int getNumberOfUsedWorkers() { + return (int) Math.max(0, used.process()); + } + + @Override + public int getNumberOfFreeWorkers() { + return (int) Math.max(0, free.process()); + } + + public void reset() { + load.reset(); + limit.reset(); + used.reset(); + free.reset(); + } +} diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerUsageMetricsWrapper.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerUsageMetricsWrapper.java new file mode 100644 index 0000000..818a361 --- /dev/null +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerUsageMetricsWrapper.java @@ -0,0 +1,86 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager.metrics; + +import io.opentelemetry.api.metrics.Meter; + +/** + * Class that wraps all Task Manager usage metrics. + * TaskSchedulerBuckets should register their metric providers with this class. + */ +public class TaskManagerUsageMetricsWrapper { + + private final UsageMetricsWrapper rabbitMQUsageMetrics; + private final UsageMetricsWrapper workerPoolUsageMetrics; + private final UsageMetricsWrapper taskManagerUsageMetrics; + private final UsageMetricsReporter loadUsageMetricsReporter; + + public TaskManagerUsageMetricsWrapper(final Meter meter) { + rabbitMQUsageMetrics = new UsageMetricsWrapper(meter, "aer.rabbitmq", true); + workerPoolUsageMetrics = new UsageMetricsWrapper(meter, "aer.taskmanager.workerpool", false); + taskManagerUsageMetrics = new UsageMetricsWrapper(meter, "aer.taskmanager", false); + loadUsageMetricsReporter = new UsageMetricsReporter(meter, "aer.taskmanager.worker.load", ""); + } + + /** + * Add the usage metric provider that registers metrics on the queue state of a worker from the RabbitMQ queue information. + * + * @param provider The provider that registers the RabbitMQ metrics. + */ + public void addRabbitMQUsageMetricsProvider(final UsageMetricsProvider provider) { + rabbitMQUsageMetrics.add(provider); + } + + /** + * Add the usage metric provider that registers metrics on the queue state in the internal worker pool. + * + * @param provider The provider that registers the worker pool metrics. + */ + public void addWorkerPoolUsageMetricsProvider(final UsageMetricsProvider provider) { + workerPoolUsageMetrics.add(provider); + } + + /** + * Add the usage metric provider that registers metrics on the usage state of a worker from the information collected by the Task Manager. + * + * @param provider The {@link TaskManagerUsageMetricsProvider}. + */ + public void addTaskManagerUsageMetricsProvider(final TaskManagerUsageMetricsProvider provider) { + taskManagerUsageMetrics.add(provider); + loadUsageMetricsReporter.addMetrics(provider.getWorkerQueueName(), provider::getLoad, + OpenTelemetryMetrics.workerAttributes(provider.getWorkerQueueName())); + } + + /** + * Remove metrics reporting for the given worker queue. + * + * @param workerQueueName worker queue to remove the metric reporting for + */ + public void remove(final String workerQueueName) { + rabbitMQUsageMetrics.remove(workerQueueName); + workerPoolUsageMetrics.remove(workerQueueName); + taskManagerUsageMetrics.remove(workerQueueName); + loadUsageMetricsReporter.removeMetrics(workerQueueName); + } + + public void close() { + rabbitMQUsageMetrics.close(); + workerPoolUsageMetrics.close(); + taskManagerUsageMetrics.close(); + loadUsageMetricsReporter.close(); + } +} diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/UsageMetricsProvider.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/UsageMetricsProvider.java new file mode 100644 index 0000000..d2fe9d6 --- /dev/null +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/UsageMetricsProvider.java @@ -0,0 +1,35 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager.metrics; + +/** + * + */ +public interface UsageMetricsProvider { + + String getWorkerQueueName(); + + int getNumberOfWorkers(); + + int getNumberOfUsedWorkers(); + + int getNumberOfFreeWorkers(); + + default int getNumberOfWaiting() { + return 0; + } +} \ No newline at end of file diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/UsageMetricsReporter.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/UsageMetricsReporter.java new file mode 100644 index 0000000..e4ee7a1 --- /dev/null +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/UsageMetricsReporter.java @@ -0,0 +1,84 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager.metrics; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.DoubleSupplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableDoubleGauge; +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; + +/** + * Class that creates the Telemetry Gauge that uses the callback to record metrics when triggered. + * When triggered it will iterate over the list of added {@link UsageMetric}s. + */ +class UsageMetricsReporter { + + private static final Logger LOG = LoggerFactory.getLogger(UsageMetricsReporter.class); + + private record UsageMetric(DoubleSupplier metricSupplier, Attributes attributes) {} + + private final Map metricsMap = new HashMap<>(); + private final ObservableDoubleGauge gauge; + + public UsageMetricsReporter(final Meter meter, final String metricName, final String description) { + gauge = meter + .gaugeBuilder(metricName) + .setDescription(description) + .buildWithCallback(this::recordMetrics); + } + + private void recordMetrics(final ObservableDoubleMeasurement measurement) { + for (final Map.Entry entry : metricsMap.entrySet()) { + final UsageMetric metric = entry.getValue(); + + measurement.record(metric.metricSupplier.getAsDouble(), metric.attributes()); + } + LOG.debug("Workload for {}", measurement); + } + + /** + * Add a metric supplier for a specific worker queue/attributes. + * + * @param workerQueueName The worker queue this metric supplier is for + * @param metricSupplier Supplies the metric value when called + * @param attributes attributes for the metric + */ + public void addMetrics(final String workerQueueName, final DoubleSupplier metricSupplier, final Attributes attributes) { + metricsMap.put(workerQueueName, new UsageMetric(metricSupplier, attributes)); + } + + /** + * Removes the metric reporter for the given worker queue to not report the metric anymore. + * + * @param workerQueueName Worker queue to remove the metric reporter for + */ + public void removeMetrics(final String workerQueueName) { + metricsMap.remove(workerQueueName); + } + + public void close() { + gauge.close(); + metricsMap.clear(); + } +} diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/UsageMetricsWrapper.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/UsageMetricsWrapper.java new file mode 100644 index 0000000..a3ee3ef --- /dev/null +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/UsageMetricsWrapper.java @@ -0,0 +1,60 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager.metrics; + +import io.opentelemetry.api.metrics.Meter; + +/** + * Wraps several metrics to be reported on. + * The metrics supported are limit (i.e. number of workers available). + * + */ +class UsageMetricsWrapper { + private final boolean hasWaiting; + private final UsageMetricsReporter limitReporter; + private final UsageMetricsReporter usageReporter; + + public UsageMetricsWrapper(final Meter meter, final String metricPrefix, final boolean hasWaiting) { + this.hasWaiting = hasWaiting; + limitReporter = new UsageMetricsReporter(meter, metricPrefix + ".worker.limit", ""); + usageReporter = new UsageMetricsReporter(meter, metricPrefix + ".worker.usage", ""); + } + + public void add(final UsageMetricsProvider provider) { + final String workerQueueName = provider.getWorkerQueueName(); + + limitReporter.addMetrics(workerQueueName, provider::getNumberOfWorkers, OpenTelemetryMetrics.workerAttributes(workerQueueName)); + usageReporter.addMetrics(workerQueueName, provider::getNumberOfUsedWorkers, + OpenTelemetryMetrics.workerAttributes(workerQueueName, "state", "used")); + usageReporter.addMetrics(workerQueueName, provider::getNumberOfFreeWorkers, + OpenTelemetryMetrics.workerAttributes(workerQueueName, "state", "free")); + if (hasWaiting) { + usageReporter.addMetrics(workerQueueName, provider::getNumberOfWaiting, + OpenTelemetryMetrics.workerAttributes(workerQueueName, "state", "waiting")); + } + } + + public void remove(final String workerQueueName) { + limitReporter.removeMetrics(workerQueueName); + usageReporter.removeMetrics(workerQueueName); + } + + public void close() { + limitReporter.close(); + usageReporter.close(); + } +} diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitor.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitor.java index 2611a03..3cf1ed3 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitor.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitor.java @@ -56,6 +56,7 @@ public class RabbitMQQueueMonitor { private static final Logger LOG = LoggerFactory.getLogger(RabbitMQQueueMonitor.class); + private static final int TIMEOUT = (int) TimeUnit.SECONDS.toMillis(3); private final ObjectMapper objectMapper = new ObjectMapper(); @@ -117,8 +118,9 @@ public void updateWorkerQueueState(final String queueName, final WorkerSizeObser } else { final int numberOfWorkers = getJsonIntPrimitive(jsonObject, "consumers"); final int numberOfMessages = getJsonIntPrimitive(jsonObject, "messages"); + final int numberOfMessagesInProgress = getJsonIntPrimitive(jsonObject, "messages_unacknowledged"); - observer.onNumberOfWorkersUpdate(numberOfWorkers, numberOfMessages); + observer.onNumberOfWorkersUpdate(numberOfWorkers, numberOfMessages, numberOfMessagesInProgress); LOG.trace("[{}] active workers:{}", queueName, numberOfWorkers); } } catch (final URISyntaxException | IOException e) { diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerEventProducer.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerEventProducer.java deleted file mode 100644 index e177c41..0000000 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerEventProducer.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright the State of the Netherlands - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see http://www.gnu.org/licenses/. - */ -package nl.aerius.taskmanager.mq; - -import static nl.aerius.taskmanager.client.mq.RabbitMQWorkerMonitor.AERIUS_EVENT_EXCHANGE; -import static nl.aerius.taskmanager.client.mq.RabbitMQWorkerMonitor.EXCHANGE_TYPE; -import static nl.aerius.taskmanager.client.mq.RabbitMQWorkerMonitor.HEADER_PARAM_QUEUE; -import static nl.aerius.taskmanager.client.mq.RabbitMQWorkerMonitor.HEADER_PARAM_UTILISATION; -import static nl.aerius.taskmanager.client.mq.RabbitMQWorkerMonitor.HEADER_PARAM_WORKER_SIZE; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.rabbitmq.client.AMQP.BasicProperties; -import com.rabbitmq.client.Channel; - -import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerMetrics; -import nl.aerius.taskmanager.client.BrokerConnectionFactory; - -/** - * Publishes on a regular basis worker metrics events to a dedicated RabbitMQ exchange. - * Consumers can listen to this exchange to retrieve updates. - * - * It uses caching to only send these events in case the value has changed. - */ -public class RabbitMQWorkerEventProducer { - - private static final Logger LOG = LoggerFactory.getLogger(RabbitMQWorkerEventProducer.class); - - private static final int REFRESH_TIME_SECONDS = 5; - - private final BrokerConnectionFactory factory; - private final Map metrics = new HashMap<>(); - private final ScheduledExecutorService executor; - private ScheduledFuture future; - // maps to keep track of last know values to only log differences. - private final Map logCacheSize = new HashMap<>(); - private final Map logCacheUtilisation = new HashMap<>(); - - /** - * Constructor. - * - * @param executor scheduled executor to run the repeating task on - * @param factory connection factory - */ - public RabbitMQWorkerEventProducer(final ScheduledExecutorService executor, final BrokerConnectionFactory factory) { - this.executor = executor; - this.factory = factory; - } - - /** - * Add metrics provider for the given worker queue name. - * - * @param workerQueueName name of the worker queue the metrics are - * @param workerMetrics provider of the metrics - */ - public void addMetrics(final String workerQueueName, final WorkerMetrics workerMetrics) { - metrics.put(workerQueueName, workerMetrics); - } - - /** - * Remove metrics provider for the given worker queue name. - * - * @param workerQueueName worker queue name to remove - */ - public void removeMetrics(final String workerQueueName) { - metrics.remove(workerQueueName); - } - - /** - * Start a scheduled thread to regularly publish new worker metrics. - */ - public void start() { - future = executor.scheduleWithFixedDelay(this::updateMetrics, 0, REFRESH_TIME_SECONDS, TimeUnit.SECONDS); - } - - /** - * Stops publishing metrics. - */ - public void shutdown() { - if (future != null) { - future.cancel(true); - } - } - - private void updateMetrics() { - try { - metrics.forEach((q, wpm) -> { - final int size = wpm.getReportedWorkerSize(); - final int utilisation = wpm.getRunningWorkerSize(); - - try { - publish(q, size, utilisation); - } catch (final IOException e) { - throw new UncheckedIOException(e); - } catch (final TimeoutException e) { - throw new RuntimeException(e); - } - }); - } catch (final RuntimeException e) { - LOG.error("Trying to send worker metrics update event failed: ", e); - } - } - - private void publish(final String queueName, final int size, final int utilisation) throws IOException, TimeoutException { - try (final Channel channel = factory.getConnection().createChannel()) { - channel.exchangeDeclare(AERIUS_EVENT_EXCHANGE, EXCHANGE_TYPE); - - final Map headers = new HashMap<>(); - headers.put(HEADER_PARAM_QUEUE, queueName); - headers.put(HEADER_PARAM_WORKER_SIZE, size); - headers.put(HEADER_PARAM_UTILISATION, utilisation); - final BasicProperties props = new BasicProperties().builder().headers(headers).build(); - channel.basicPublish(AERIUS_EVENT_EXCHANGE, "", props, null); - debugLogState(queueName, size, utilisation); - } - } - - private void debugLogState(final String queueName, final int size, final int utilisation) { - if (LOG.isDebugEnabled()) { - final Integer previousSize = Optional.ofNullable(logCacheSize.put(queueName, size)).orElse(0); - final Integer previousUtilisation = Optional.ofNullable(logCacheUtilisation.put(queueName, utilisation)).orElse(0); - - if (utilisation != previousUtilisation || size != previousSize) { - LOG.debug("Publish event for queue {} - size: {}, utilisation: {}", queueName, size, utilisation); - } - } - } -} diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java index 0aa5a16..3aec259 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java @@ -29,7 +29,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerMetrics; import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; import nl.aerius.taskmanager.adaptor.WorkerSizeProviderProxy; import nl.aerius.taskmanager.client.BrokerConnectionFactory; @@ -55,7 +54,6 @@ public class RabbitMQWorkerSizeProvider implements WorkerSizeProviderProxy { private final ScheduledExecutorService executorService; private final BrokerConnectionFactory factory; private final RabbitMQChannelQueueEventsWatcher channelQueueEventsWatcher; - private final RabbitMQWorkerEventProducer eventProducer; /** * The time in seconds between each scheduled update. */ @@ -78,7 +76,6 @@ public RabbitMQWorkerSizeProvider(final ScheduledExecutorService executorService this.factory = factory; channelQueueEventsWatcher = new RabbitMQChannelQueueEventsWatcher(factory, this); refreshRateSeconds = factory.getConnectionConfiguration().getBrokerManagementRefreshRate(); - eventProducer = new RabbitMQWorkerEventProducer(executorService, factory); refreshDelayBeforeUpdateSeconds = Math.min(refreshRateSeconds / 2, DELAY_BEFORE_UPDATE_TIME_SECONDS); } @@ -94,9 +91,6 @@ public void addObserver(final String workerQueueName, final WorkerSizeObserver o } } observers.computeIfAbsent(workerQueueName, k -> new WorkerSizeObserverComposite()).add(observer); - if (observer instanceof WorkerMetrics) { - eventProducer.addMetrics(workerQueueName, (WorkerMetrics) observer); - } } /** @@ -116,14 +110,12 @@ public boolean removeObserver(final String workerQueueName) { if (monitor != null) { monitor.shutdown(); } - eventProducer.removeMetrics(workerQueueName); return observers.remove(workerQueueName) != null; } @Override public void start() throws IOException { channelQueueEventsWatcher.start(); - eventProducer.start(); if (refreshRateSeconds > 0) { running = true; executorService.scheduleWithFixedDelay(this::updateWorkerQueueState, INITIAL_DELAY_SECONDS, refreshRateSeconds, TimeUnit.SECONDS); @@ -135,7 +127,6 @@ public void shutdown() { for (final String key : new ArrayList<>(observers.keySet())) { removeObserver(key); } - eventProducer.shutdown(); channelQueueEventsWatcher.shutdown(); } @@ -178,10 +169,10 @@ public void add(final WorkerSizeObserver observer) { } @Override - public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { + public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, final int numberOfMessagesInProgress) { for (final WorkerSizeObserver observer : observers) { try { - observer.onNumberOfWorkersUpdate(numberOfWorkers, numberOfMessages); + observer.onNumberOfWorkersUpdate(numberOfWorkers, numberOfMessages, numberOfMessagesInProgress); } catch (final RuntimeException e) { LOG.error("RuntimeException during onNumberOfWorkersUpdate in {}", observer.getClass(), e); } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerMetrics.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerMetrics.java index 9f3f945..fc77450 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerMetrics.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerMetrics.java @@ -44,7 +44,7 @@ class PriorityTaskSchedulerMetrics { public void addMetric(final IntSupplier countSupplier, final String workerQueueName, final String clientQueueName) { metrics.put(clientQueueName, OpenTelemetryMetrics.METER .gaugeBuilder(METRIC_PREFIX) - .setDescription(DESCRIPTION) + .setDescription(DESCRIPTION + clientQueueName) .buildWithCallback( result -> result.record(countSupplier.getAsInt(), OpenTelemetryMetrics.queueAttributes(workerQueueName, clientQueueName)))); } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java index 1974a67..88bff47 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java @@ -55,13 +55,13 @@ protected LocalDateTime now() { IntStream.range(0, runningWorkers).forEach(i -> qwd.onWorkDispatched(String.valueOf(i), null)); IntStream.range(0, finishedWorkers).forEach(i -> qwd.onWorkerFinished(String.valueOf(i), null)); - qwd.onNumberOfWorkersUpdate(0, numberOfMessages); + qwd.onNumberOfWorkersUpdate(0, numberOfMessages, 0); // reset should never trigger the first time the problem was reported. verify(listener, never()).reset(); // Fast forward 20 minutes to trigger reset if there is a problem. now.set(now.get().plusMinutes(20)); - qwd.onNumberOfWorkersUpdate(0, numberOfMessages); + qwd.onNumberOfWorkersUpdate(0, numberOfMessages, 0); verify(listener, times(expected)).reset(); } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/StartupGuardTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/StartupGuardTest.java index 73397c7..9c42e15 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/StartupGuardTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/StartupGuardTest.java @@ -35,9 +35,9 @@ void testOpen() { final StartupGuard guard = new StartupGuard(); assertFalse(guard.isOpen(), "Guard should not be open."); - guard.onNumberOfWorkersUpdate(0, 1); + guard.onNumberOfWorkersUpdate(0, 1, 0); assertTrue(guard.isOpen(), "Guard should be open when onNumberOfWorkersUpdate is called."); - guard.onNumberOfWorkersUpdate(0, 1); + guard.onNumberOfWorkersUpdate(0, 1, 0); assertTrue(guard.isOpen(), "Guard should still remain open onNumberOfWorkersUpdate has been called."); } @@ -60,7 +60,7 @@ void testWaitForOpen() throws InterruptedException { // First wait for first semaphore to be unlocked. waitForStart.acquire(); assertFalse(guard.isOpen(), "Guard should not be open."); - guard.onNumberOfWorkersUpdate(1, 1); + guard.onNumberOfWorkersUpdate(1, 1, 0); // Wait for semaphore that is called after waitForOpen is unlocked. waitForOpen.acquire(); assertTrue(guard.isOpen(), "Guard should now be open."); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java index 79ea691..7555254 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java @@ -80,17 +80,17 @@ void after() throws InterruptedException { @Timeout(value = 3, unit = TimeUnit.SECONDS) void testNoFreeWorkers() { // Add Worker which will unlock - workerPool.onNumberOfWorkersUpdate(1, 0); + workerPool.onNumberOfWorkersUpdate(1, 0, 0); executor.execute(dispatcher); await().until(() -> dispatcher.getState() == State.WAIT_FOR_TASK); // Remove worker, 1 worker locked but at this point no actual workers available. - workerPool.onNumberOfWorkersUpdate(0, 0); + workerPool.onNumberOfWorkersUpdate(0, 0, 0); // Send task, should get NoFreeWorkersException in dispatcher. forwardTaskAsync(createTask(), null); // Dispatcher should go back to wait for worker to become available. await().until(() -> dispatcher.getState() == State.WAIT_FOR_WORKER); assertEquals(0, workerPool.getReportedWorkerSize(), "WorkerPool should be empty"); - workerPool.onNumberOfWorkersUpdate(1, 0); + workerPool.onNumberOfWorkersUpdate(1, 0, 0); assertEquals(1, workerPool.getReportedWorkerSize(), "WorkerPool should have 1 running"); } @@ -100,7 +100,7 @@ void testForwardTest() { final Task task = createTask(); final Future future = forwardTaskAsync(task, null); executor.execute(dispatcher); - workerPool.onNumberOfWorkersUpdate(1, 0); //add worker which will unlock + workerPool.onNumberOfWorkersUpdate(1, 0, 0); //add worker which will unlock await().until(() -> dispatcher.getState() == State.WAIT_FOR_WORKER); await().until(future::isDone); assertFalse(future.isCancelled(), "Taskconsumer must be unlocked at this point without error"); @@ -114,7 +114,7 @@ void testForwardDuplicateTask() { executor.execute(dispatcher); final Future future = forwardTaskAsync(task, null); await().until(() -> dispatcher.getState() == State.WAIT_FOR_WORKER); - workerPool.onNumberOfWorkersUpdate(2, 0); //add worker which will unlock + workerPool.onNumberOfWorkersUpdate(2, 0, 0); //add worker which will unlock // Now force the issue. assertSame(TaskDispatcher.State.WAIT_FOR_TASK, dispatcher.getState(), "Taskdispatcher must be waiting for task"); // Forwarding same Task object, so same id. @@ -134,19 +134,19 @@ void testExceptionDuringForward() { final Future future = forwardTaskAsync(task, null); await().until(() -> dispatcher.getState() == State.WAIT_FOR_WORKER); // Now open up a worker - workerPool.onNumberOfWorkersUpdate(1, 0); + workerPool.onNumberOfWorkersUpdate(1, 0, 0); // At this point the exception should be thrown. This could be the case when rabbitmq connection is lost for a second. // Wait for it to be unlocked again await().until(() -> dispatcher.getState() == State.WAIT_FOR_TASK); //simulate workerpool being reset - workerPool.onNumberOfWorkersUpdate(0, 0); + workerPool.onNumberOfWorkersUpdate(0, 0, 0); //now stop throwing exception to indicate connection is restored again workerProducer.setShutdownExceptionOnForward(false); //simulate connection being restored by first forwarding task again forwardTaskAsync(task, future); await().until(() -> dispatcher.getState() == State.WAIT_FOR_WORKER); //now simulate the worker being back - workerPool.onNumberOfWorkersUpdate(1, 0); + workerPool.onNumberOfWorkersUpdate(1, 0, 0); //should now be unlocked, but waiting for worker to be done await().until(() -> dispatcher.getState() == State.WAIT_FOR_WORKER); workerPool.onWorkerFinished(task.getId(), Map.of()); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskManagerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskManagerTest.java index 59bb280..0920df3 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskManagerTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskManagerTest.java @@ -64,7 +64,7 @@ void setUp() throws IOException { doAnswer(a -> { // This will unblock the startup guard - ((WorkerSizeObserver) a.getArgument(1)).onNumberOfWorkersUpdate(0, 0); + ((WorkerSizeObserver) a.getArgument(1)).onNumberOfWorkersUpdate(0, 0, 0); return null; }).when(workerSizeProvider).addObserver(any(), any()); taskManager = new TaskManager<>(executor, scheduledExecutorService, factory, schedulerFactory, workerSizeProvider); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java index 0a205fd..30554f2 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java @@ -76,7 +76,7 @@ public void messageDelivered(final Message message) { @Test void testWorkerPoolSizing() throws IOException { assertEquals(0, workerPool.getReportedWorkerSize(), "Check if workerPool size is empty at start"); - workerPool.onNumberOfWorkersUpdate(10, 0); + workerPool.onNumberOfWorkersUpdate(10, 0, 0); assertEquals(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is changed after sizing"); assertEquals(10, numberOfWorkers, "Check if workerPool change handler called."); workerPool.reserveWorker(); @@ -89,15 +89,15 @@ void testWorkerPoolSizing() throws IOException { @Test void testWorkerPoolSizingWithInitialSize() throws IOException { - workerPool.onNumberOfWorkersUpdate(10, 5); - assertEquals(5, workerPool.getRunningWorkerSize(), "Check if workerPool size is 5"); - assertEquals(10, workerPool.getWorkerSize(), "Internal worker size should match reported number of workers"); - workerPool.onNumberOfWorkersUpdate(10, 5); - assertEquals(5, workerPool.getRunningWorkerSize(), "Check if workerPool size is still 5"); - assertEquals(10, workerPool.getWorkerSize(), "Internal worker size should still match reported number of workers"); + workerPool.onNumberOfWorkersUpdate(10, 5, 0); + assertEquals(5, workerPool.getNumberOfUsedWorkers(), "Check if workerPool size is 5"); + assertEquals(10, workerPool.getNumberOfWorkers(), "Internal worker size should match reported number of workers"); + workerPool.onNumberOfWorkersUpdate(10, 5, 0); + assertEquals(5, workerPool.getNumberOfUsedWorkers(), "Check if workerPool size is still 5"); + assertEquals(10, workerPool.getNumberOfWorkers(), "Internal worker size should still match reported number of workers"); IntStream.range(1, 6).forEach(a -> workerPool.onWorkerFinished("", null)); - assertEquals(0, workerPool.getRunningWorkerSize(), "After unknown tasks received running size should be 0"); - assertEquals(10, workerPool.getWorkerSize(), "Internal worker size should still match reported number of workers"); + assertEquals(0, workerPool.getNumberOfUsedWorkers(), "After unknown tasks received running size should be 0"); + assertEquals(10, workerPool.getNumberOfWorkers(), "Internal worker size should still match reported number of workers"); } @Test @@ -108,26 +108,26 @@ void testNoFreeWorkers() { @Test void testWorkerPoolScaleDown() throws IOException { - workerPool.onNumberOfWorkersUpdate(5, 0); + workerPool.onNumberOfWorkersUpdate(5, 0, 0); final Task task1 = createAndSendTaskToWorker(); final Task task2 = createAndSendTaskToWorker(); final Task task3 = createAndSendTaskToWorker(); assertEquals(5, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after 2 workers running"); - workerPool.onNumberOfWorkersUpdate(1, 0); - assertEquals(3, workerPool.getWorkerSize(), + workerPool.onNumberOfWorkersUpdate(1, 0, 0); + assertEquals(3, workerPool.getNumberOfWorkers(), "Workpool size should match number of running tasks, since new total is lower than currently running"); assertEquals(1, workerPool.getReportedWorkerSize(), "Check if current workerPool size is same after decreasing # workers"); workerPool.releaseWorker(task1.getId()); - assertEquals(2, workerPool.getWorkerSize(), "Check if workerPool size is lower, but not yet same as total because still process running"); + assertEquals(2, workerPool.getNumberOfWorkers(), "Check if workerPool size is lower, but not yet same as total because still process running"); workerPool.releaseWorker(task2.getId()); - assertEquals(1, workerPool.getWorkerSize(), "Check if workerPool size is lower"); + assertEquals(1, workerPool.getNumberOfWorkers(), "Check if workerPool size is lower"); workerPool.releaseWorker(task3.getId()); - assertEquals(1, workerPool.getWorkerSize(), "Check if workerPool size should remain the same"); + assertEquals(1, workerPool.getNumberOfWorkers(), "Check if workerPool size should remain the same"); } @Test void testReleaseTaskTwice() throws IOException { - workerPool.onNumberOfWorkersUpdate(2, 0); + workerPool.onNumberOfWorkersUpdate(2, 0, 0); final Task task1 = createAndSendTaskToWorker(); final String id = task1.getId(); workerPool.releaseWorker(id); @@ -140,19 +140,19 @@ void testReleaseTaskTwice() throws IOException { @Test void testMessageDeliverd() throws IOException { - workerPool.onNumberOfWorkersUpdate(1, 0); + workerPool.onNumberOfWorkersUpdate(1, 0, 0); createAndSendTaskToWorker(); assertNotSame(0, message.getDeliveryTag(), "Check if message is delivered"); } @Test void testReset() throws IOException { - workerPool.onNumberOfWorkersUpdate(5, 0); + workerPool.onNumberOfWorkersUpdate(5, 0, 0); createAndSendTaskToWorker(); createAndSendTaskToWorker(); - assertEquals(2, workerPool.getRunningWorkerSize(), "Should report 2 workers running."); + assertEquals(2, workerPool.getNumberOfUsedWorkers(), "Should report 2 workers running."); workerPool.reset(); - assertEquals(0, workerPool.getRunningWorkerSize(), "Should report no workers running after internal state reset."); + assertEquals(0, workerPool.getNumberOfUsedWorkers(), "Should report no workers running after internal state reset."); } private Task createAndSendTaskToWorker() throws IOException { diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java index d30590b..c2e4c52 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java @@ -16,7 +16,6 @@ */ package nl.aerius.taskmanager.metrics; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -25,7 +24,6 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.util.HashMap; @@ -46,7 +44,6 @@ import io.opentelemetry.api.metrics.DoubleGaugeBuilder; import io.opentelemetry.api.metrics.Meter; -import nl.aerius.taskmanager.StartupGuard; import nl.aerius.taskmanager.client.TaskMetrics; /** @@ -65,9 +62,9 @@ class PerformanceMetricsReporterTest { private @Mock Meter mockedMeter; private @Mock ScheduledExecutorService scheduledExecutorService; private @Captor ArgumentCaptor methodCaptor; - private @Captor ArgumentCaptor durationCaptor; + private @Captor ArgumentCaptor doubleValue1Captor; + private @Captor ArgumentCaptor doubleValue2Captor; - private StartupGuard startupGuard; private PerformanceMetricsReporter reporter; @BeforeEach @@ -82,15 +79,14 @@ void beforeEach() { return mockGaugeBuilder; }).when(mockedMeter).gaugeBuilder(any()); lenient().doReturn(mockGaugeBuilder).when(mockGaugeBuilder).setDescription(any()); - startupGuard = new StartupGuard(); - reporter = new PerformanceMetricsReporter(scheduledExecutorService, QUEUE_GROUP_NAME, mockedMeter, startupGuard); - verify(scheduledExecutorService).scheduleWithFixedDelay(methodCaptor.capture(), anyLong(), anyLong(), any(TimeUnit.class)); + reporter = new PerformanceMetricsReporter(scheduledExecutorService, QUEUE_GROUP_NAME, mockedMeter); + verify(scheduledExecutorService).scheduleWithFixedDelay(methodCaptor.capture(), anyLong(), anyLong(), + any(TimeUnit.class)); } @Test void testOnWorkDispatched() { - startUp(10, 0); reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); reporter.onWorkDispatched("2", createMap(QUEUE_2, 200L)); methodCaptor.getValue().run(); @@ -99,7 +95,6 @@ void testOnWorkDispatched() { @Test void testOnWorkerFinished() { - startUp(2, 0); reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); reporter.onWorkerFinished("1", createMap(QUEUE_1, 100L)); reporter.onWorkerFinished("2", createMap(QUEUE_2, 200L)); @@ -109,28 +104,16 @@ void testOnWorkerFinished() { private void assertGaugeCalls(final String label, final String type, final double expected, final Predicate duration) { verify(mockedGauges.get("aer.taskmanager." + label)).set(eq(expected), any()); - verify(mockedGauges.get("aer.taskmanager.%s.%s".formatted(label, type))).set(durationCaptor.capture(), any()); + verify(mockedGauges.get("aer.taskmanager.%s.%s".formatted(label, type))).set(doubleValue1Captor.capture(), any()); verify(mockedGauges.get("aer.taskmanager.%s.queue".formatted(label))).set(eq(expected), any()); - verify(mockedGauges.get("aer.taskmanager.%s.queue.%s".formatted(label, type))).set(durationCaptor.capture(), any()); - durationCaptor.getAllValues() - .forEach(v -> assertTrue(duration.test(v), "Duration should report at least 100.0 as it is the offset of the start time, but was " + v)); - } - - @Test - void testWorkLoad() throws InterruptedException { - startUp(4, 1); - reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); - reporter.onWorkDispatched("2", createMap(QUEUE_2, 200L)); - methodCaptor.getValue().run(); - Thread.sleep(10); // Add a bit of delay to get some time frame between these 2 run calls. - methodCaptor.getValue().run(); - verify(mockedGauges.get("aer.taskmanager.work.load"), times(2)).set(durationCaptor.capture(), any()); - assertEquals(75.0, durationCaptor.getAllValues().get(1), "Expected workload of 75%"); + verify(mockedGauges.get("aer.taskmanager.%s.queue.%s".formatted(label, type))).set(doubleValue1Captor.capture(), + any()); + doubleValue1Captor.getAllValues() + .forEach(v -> assertTrue(duration.test(v), "Duration should report at least 100.0 as it is the offset of the start time, but was " + v)); } @Test void testReset() throws InterruptedException { - startUp(4, 1); reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); reporter.onWorkDispatched("2", createMap(QUEUE_2, 200L)); Thread.sleep(2); // Add a bit of delay to get some time frame between updates.. @@ -138,15 +121,6 @@ void testReset() throws InterruptedException { methodCaptor.getValue().run(); // Verify dispatched metrics have been reset. assertGaugeCalls("dispatched", "wait", 0.0, v -> v == 0.0); - - // Verify load metric have been reset. - verify(mockedGauges.get("aer.taskmanager.work.load"), times(1)).set(durationCaptor.capture(), any()); - assertEquals(0.0, durationCaptor.getAllValues().get(0), 1E-5, "Expected to have no workload anymore"); - } - - private void startUp(final int numberOfWorkers, final int numberOfMessages) { - reporter.onNumberOfWorkersUpdate(numberOfWorkers, numberOfMessages); - startupGuard.onNumberOfWorkersUpdate(numberOfWorkers, numberOfMessages); } private Map createMap(final String queueName, final long duration) { diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegisterTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegisterTest.java new file mode 100644 index 0000000..4a81d4e --- /dev/null +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegisterTest.java @@ -0,0 +1,103 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager.metrics; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.Map; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import nl.aerius.taskmanager.StartupGuard; +import nl.aerius.taskmanager.client.TaskMetrics; + +/** + * Test class for {@link TaskManagerMetricsRegister}. + */ +@ExtendWith(MockitoExtension.class) +public class TaskManagerMetricsRegisterTest { + + private static final String QUEUE_1 = "queue 1"; + private static final String QUEUE_2 = "queue 2"; + + private @Mock TaskManagerUsageMetricsProvider taskManagerUsageMetricsProvider; + + private @Captor ArgumentCaptor taskManagerUsagerMetricsProviderCaptor; + + private StartupGuard startupGuard; + private TaskManagerMetricsRegister register; + + @BeforeEach + void beforeEach() { + startupGuard = new StartupGuard(); + register = new TaskManagerMetricsRegister(taskManagerUsageMetricsProvider, startupGuard); + } + + @Test + void testOnWorkDispatched() { + startUp(10, 0); + register.onWorkDispatched("1", createMap(QUEUE_1, 100L)); + register.onWorkDispatched("2", createMap(QUEUE_2, 200L)); + verifytaskManagerUsageMetricsProvider(2, 2); + } + + @Test + void testOnWorkerFinished() { + startUp(2, 0); + register.onWorkDispatched("1", createMap(QUEUE_1, 100L)); + register.onWorkerFinished("1", createMap(QUEUE_1, 100L)); + register.onWorkerFinished("2", createMap(QUEUE_2, 200L)); + verifytaskManagerUsageMetricsProvider(3, -1); + } + + @Test + void testReset() throws InterruptedException { + startUp(4, 1); + register.onWorkDispatched("1", createMap(QUEUE_1, 100L)); + register.onWorkDispatched("2", createMap(QUEUE_2, 200L)); + Thread.sleep(2); // Add a bit of delay to get some time frame between updates.. + register.reset(); + + // Verify load metric have been reset. + verify(taskManagerUsageMetricsProvider, times(1)).reset(); + } + + private void verifytaskManagerUsageMetricsProvider(final int times, final int sum) { + verify(taskManagerUsageMetricsProvider, times(times)).register(taskManagerUsagerMetricsProviderCaptor.capture(), + anyInt()); + assertEquals(sum, taskManagerUsagerMetricsProviderCaptor.getAllValues().stream().mapToInt(Integer::intValue).sum(), + "Should have registered a total sum of " + sum); + } + + private void startUp(final int numberOfWorkers, final int numberOfMessages) { + register.onNumberOfWorkersUpdate(numberOfWorkers, numberOfMessages, 0); + startupGuard.onNumberOfWorkersUpdate(numberOfWorkers, numberOfMessages, 0); + } + + private Map createMap(final String queueName, final long duration) { + return new TaskMetrics().duration(duration).queueName(queueName).start(System.currentTimeMillis() - 100).build(); + } +} diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/TaskManagerUsageMetricsProviderTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/TaskManagerUsageMetricsProviderTest.java new file mode 100644 index 0000000..2789a53 --- /dev/null +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/TaskManagerUsageMetricsProviderTest.java @@ -0,0 +1,40 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager.metrics; + +/** + * Test class for {@link TaskManagerUsageMetricsProvider}. + */ +class TaskManagerUsageMetricsProviderTest { + + /* + @Test + void testWorkLoad() throws InterruptedException { + startUp(4, 1); + reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); + reporter.onWorkDispatched("2", createMap(QUEUE_2, 200L)); + methodCaptor.getValue().run(); + Thread.sleep(10); // Add a bit of delay to get some time frame between these 2 run calls. + methodCaptor.getValue().run(); + verify(mockedGauges.get("aer.taskmanager.work.load"), times(2)).set(doubleValue1Captor.capture(), any()); + assertEquals(75.0, doubleValue1Captor.getAllValues().get(1), "Expected workload of 50%"); + verify(mockedGauges.get("aer.taskmanager.work.free"), times(2)).set(doubleValue2Captor.capture(), any()); + assertEquals(2.0, doubleValue2Captor.getAllValues().get(1), "Expected number of free workers to be 4 - 2"); + } + + */ +} diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQChannelQueueEventsWatcherTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQChannelQueueEventsWatcherTest.java index a572901..e0ab641 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQChannelQueueEventsWatcherTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQChannelQueueEventsWatcherTest.java @@ -16,7 +16,6 @@ */ package nl.aerius.taskmanager.mq; -import static nl.aerius.taskmanager.client.mq.RabbitMQWorkerMonitor.HEADER_PARAM_QUEUE; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; @@ -51,6 +50,7 @@ */ class RabbitMQChannelQueueEventsWatcherTest { private static final String TEST_QUEUENAME = "test"; + private static final String HEADER_PARAM_QUEUE = "queue"; private static ExecutorService executor; diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitorTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitorTest.java index e82f254..4796d37 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitorTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitorTest.java @@ -44,7 +44,7 @@ void testGetWorkerQueueState() { final ConnectionConfiguration configuration = ConnectionConfiguration.builder() .brokerHost(DUMMY).brokerPort(0).brokerUsername(DUMMY).brokerPassword(DUMMY).build(); final AtomicInteger workerSize = new AtomicInteger(); - final WorkerSizeObserver mwps = (numberOfWorkers, numberOfMessages) -> workerSize.set(numberOfWorkers); + final WorkerSizeObserver mwps = (numberOfWorkers, numberOfMessages, numberOfMessagesInProgress) -> workerSize.set(numberOfWorkers); final RabbitMQQueueMonitor rpm = new RabbitMQQueueMonitor(configuration) { @Override protected JsonNode getJsonResultFromApi(final String apiPath) throws IOException {