Skip to content

Commit

Permalink
Merge pull request #215 from swisspost/develop
Browse files Browse the repository at this point in the history
PR for release
  • Loading branch information
mcweba authored Oct 7, 2024
2 parents cce77b0 + 36ff2e7 commit f065b38
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 22 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.swisspush</groupId>
<artifactId>redisques</artifactId>
<version>4.1.2-SNAPSHOT</version>
<version>4.1.3-SNAPSHOT</version>
<name>redisques</name>
<description>
A highly scalable redis-persistent queuing system for vertx
Expand Down
9 changes: 1 addition & 8 deletions src/main/java/org/swisspush/redisques/QueueStatsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,6 @@ private <CTX> void fetchRetryDetails(GetQueueStatsRequest<CTX> req, BiConsumer<T
}

private <CTX> void attachDequeueStats(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> onDone) {
// Setup a lookup table as we need to find by name further below.
Map<String, JsonObject> detailsByName = new HashMap<>(req.queuesJsonArr.size());
for (var it = (Iterator<JsonObject>) (Object) req.queuesJsonArr.iterator(); it.hasNext(); ) {
JsonObject detailJson = it.next();
String name = detailJson.getString(MONITOR_QUEUE_NAME);
detailsByName.put(name, detailJson);
}

dequeueStatisticCollector.getAllDequeueStatistics().onSuccess(event -> {
for (Queue queue : req.queues) {
if (event.containsKey(queue.name)) {
Expand All @@ -221,6 +213,7 @@ private static class GetQueueStatsRequest<CTX> {
private CTX mCtx;
private GetQueueStatsMentor<CTX> mentor;
private List<String> queueNames;
/* TODO: Why is 'queuesJsonArr' never accessed? Isn't this the reason of our class in the first place? */
private JsonArray queuesJsonArr;
private List<Queue> queues;
}
Expand Down
16 changes: 6 additions & 10 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,9 @@ private enum QueueState {
private final Semaphore getQueuesItemsCountRedisRequestQuota;

public RedisQues() {
this.exceptionFactory = newThriftyExceptionFactory();
this(null, null, null, newThriftyExceptionFactory(), new Semaphore(Integer.MAX_VALUE),
new Semaphore(Integer.MAX_VALUE), new Semaphore(Integer.MAX_VALUE), new Semaphore(Integer.MAX_VALUE));
log.warn("Fallback to legacy behavior and allow up to {} simultaneous requests to redis", Integer.MAX_VALUE);
this.redisMonitoringReqQuota = new Semaphore(Integer.MAX_VALUE);
this.checkQueueRequestsQuota = new Semaphore(Integer.MAX_VALUE);
this.queueStatsRequestQuota = new Semaphore(Integer.MAX_VALUE);
this.getQueuesItemsCountRedisRequestQuota = new Semaphore(Integer.MAX_VALUE);
}

public RedisQues(
Expand Down Expand Up @@ -314,10 +311,6 @@ public void start(Promise<Void> promise) {
this.configurationProvider = new DefaultRedisquesConfigurationProvider(vertx, config());
}

if (this.dequeueStatisticCollector == null) {
this.dequeueStatisticCollector = new DequeueStatisticCollector(vertx);
}

if (this.periodicSkipScheduler == null) {
this.periodicSkipScheduler = new PeriodicSkipScheduler(vertx);
}
Expand All @@ -326,11 +319,14 @@ public void start(Promise<Void> promise) {
log.info("Starting Redisques module with configuration: {}", configurationProvider.configuration());

int dequeueStatisticReportIntervalSec = modConfig.getDequeueStatisticReportIntervalSec();
if (dequeueStatisticReportIntervalSec > 0) {
if (modConfig.isDequeueStatsEnabled()) {
dequeueStatisticEnabled = true;
Runnable publisher = newDequeueStatisticPublisher();
vertx.setPeriodic(1000L * dequeueStatisticReportIntervalSec, time -> publisher.run());
}
if (this.dequeueStatisticCollector == null) {
this.dequeueStatisticCollector = new DequeueStatisticCollector(vertx,dequeueStatisticEnabled);
}
queuesKey = modConfig.getRedisPrefix() + "queues";
queuesPrefix = modConfig.getRedisPrefix() + "queues:";
consumersPrefix = modConfig.getRedisPrefix() + "consumers:";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,10 @@ public void handle(AsyncResult<Response> handleQueues) {
var obj = new JsonObject().put(STATUS, OK).put(QUEUES, result);
long jsonCreateDurationMs = currentTimeMillis() - beginEpchMs;
if (jsonCreateDurationMs > 10) {
log.info("Creating JSON with {} entries did block this tread for {}ms",
log.info("Creating JSON with {} entries did block this thread for {}ms",
ctx.queueLengths.length, jsonCreateDurationMs);
}else{
log.debug("Creating JSON with {} entries did block this tread for {}ms",
log.debug("Creating JSON with {} entries did block this thread for {}ms",
ctx.queueLengths.length, jsonCreateDurationMs);
}
workerPromise.complete(obj);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Map;

public class DequeueStatisticCollector {
private static final Logger log = LoggerFactory.getLogger(DequeueStatisticCollector.class);
private final static String DEQUEUE_STATISTIC_DATA = "dequeueStatisticData";
private final static String DEQUEUE_STATISTIC_LOCK_PREFIX = "dequeueStatisticLock.";
private final SharedData sharedData;
private final boolean dequeueStatisticEnabled;

public DequeueStatisticCollector(Vertx vertx) {
public DequeueStatisticCollector(Vertx vertx, boolean dequeueStatisticEnabled) {
this.sharedData = vertx.sharedData();
this.dequeueStatisticEnabled = dequeueStatisticEnabled;
}

/**
Expand Down Expand Up @@ -107,6 +110,10 @@ public Future<Void> setDequeueStatistic(final String queueName, final DequeueSta
}

public Future<Map<String, DequeueStatistic>> getAllDequeueStatistics() {
// Check if dequeue statistics are enabled
if (!dequeueStatisticEnabled) {
return Future.succeededFuture(Collections.emptyMap()); // Return an empty map to avoid NullPointerExceptions
}
Promise<Map<String, DequeueStatistic>> promise = Promise.promise();
sharedData.getAsyncMap(DEQUEUE_STATISTIC_DATA, (Handler<AsyncResult<AsyncMap<String, DequeueStatistic>>>) asyncResult -> {
if (asyncResult.failed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,10 @@ public String toString() {
return asJsonObject().toString();
}

public boolean isDequeueStatsEnabled() {
return getDequeueStatisticReportIntervalSec() > 0;
}

/**
* RedisquesConfigurationBuilder class for simplified configuration.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package org.swisspush.redisques.util;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.shareddata.AsyncMap;
import io.vertx.core.shareddata.SharedData;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

import java.util.HashMap;
import java.util.Map;

import static org.mockito.Mockito.*;

@RunWith(VertxUnitRunner.class)
public class DequeueStatisticCollectorTest {

private Vertx vertx;
private SharedData sharedData;
private AsyncMap<String, DequeueStatistic> asyncMap;
private DequeueStatisticCollector dequeueStatisticCollectorEnabled;
private DequeueStatisticCollector dequeueStatisticCollectorDisabled;

@Before
public void setUp() {
vertx = mock(Vertx.class);
sharedData = mock(SharedData.class);
asyncMap = mock(AsyncMap.class);

// Mock sharedData.getAsyncMap to return asyncMap
doAnswer(invocation -> {
io.vertx.core.Handler<io.vertx.core.AsyncResult<AsyncMap<String, DequeueStatistic>>> handler = invocation.getArgument(1);
handler.handle(Future.succeededFuture(asyncMap));
return null;
}).when(sharedData).getAsyncMap(anyString(), any());

when(vertx.sharedData()).thenReturn(sharedData);

// Initialize DequeueStatisticCollector with enabled/disabled stats collection
dequeueStatisticCollectorEnabled = new DequeueStatisticCollector(vertx, true);
dequeueStatisticCollectorDisabled = new DequeueStatisticCollector(vertx, false);
}

@Test
public void testGetAllDequeueStatisticsEnabled(TestContext context) {
// Mock asyncMap.entries() to return a non-empty map
Map<String, DequeueStatistic> dequeueStats = new HashMap<>();
dequeueStats.put("queue1", new DequeueStatistic());
when(asyncMap.entries()).thenReturn(Future.succeededFuture(dequeueStats));

// Test when dequeue statistics are enabled
Async async = context.async();
dequeueStatisticCollectorEnabled.getAllDequeueStatistics().onComplete(result -> {
context.assertTrue(result.succeeded());
context.assertEquals(1, result.result().size());
async.complete();
});

// Verify that sharedData and asyncMap were used correctly
verify(sharedData, times(1)).getAsyncMap(anyString(), any());
verify(asyncMap, times(1)).entries();
}

@Test
public void testGetAllDequeueStatisticsDisabled(TestContext context) {
// Test when dequeue statistics are disabled
Async async = context.async();
dequeueStatisticCollectorDisabled.getAllDequeueStatistics().onComplete(result -> {
context.assertTrue(result.succeeded());
context.assertTrue(result.result().isEmpty());
async.complete();
});

// Verify that sharedData and asyncMap were NOT used
verifyNoInteractions(sharedData);
verifyNoInteractions(asyncMap);
}

@Test
public void testGetAllDequeueStatisticsAsyncMapFailure(TestContext context) {
// Simulate failure in sharedData.getAsyncMap
doAnswer(invocation -> {
io.vertx.core.Handler<io.vertx.core.AsyncResult<AsyncMap<String, DequeueStatistic>>> handler = invocation.getArgument(1);
handler.handle(Future.failedFuture(new RuntimeException("Failed to retrieve async map")));
return null;
}).when(sharedData).getAsyncMap(anyString(), any());

// Test when asyncMap retrieval fails
Async async = context.async();
dequeueStatisticCollectorEnabled.getAllDequeueStatistics().onComplete(result -> {
context.assertTrue(result.failed());
context.assertEquals("Failed to retrieve async map", result.cause().getMessage());
async.complete();
});

// Verify that sharedData.getAsyncMap was used, but asyncMap.entries() was not
verify(sharedData, times(1)).getAsyncMap(anyString(), any());
verifyNoInteractions(asyncMap);
}

@Test
public void testGetAllDequeueStatisticsEntriesFailure(TestContext context) {
// Simulate success in sharedData.getAsyncMap, but failure in asyncMap.entries
doAnswer(invocation -> {
io.vertx.core.Handler<io.vertx.core.AsyncResult<AsyncMap<String, DequeueStatistic>>> handler = invocation.getArgument(1);
handler.handle(Future.succeededFuture(asyncMap));
return null;
}).when(sharedData).getAsyncMap(anyString(), any());

when(asyncMap.entries()).thenReturn(Future.failedFuture(new RuntimeException("Failed to retrieve entries")));

// Test when asyncMap.entries fails
Async async = context.async();
dequeueStatisticCollectorEnabled.getAllDequeueStatistics().onComplete(result -> {
context.assertTrue(result.failed());
context.assertEquals("Failed to retrieve entries", result.cause().getMessage());
async.complete();
});

// Verify that sharedData.getAsyncMap and asyncMap.entries were used correctly
verify(sharedData, times(1)).getAsyncMap(anyString(), any());
verify(asyncMap, times(1)).entries();
}
}

0 comments on commit f065b38

Please sign in to comment.