Skip to content

Commit

Permalink
Merge pull request #35 from swisspush/issue33_propagate_changes
Browse files Browse the repository at this point in the history
Added update over the eventbus for other redisques instances (#33)
  • Loading branch information
mcweba authored Jul 12, 2017
2 parents ada10ee + 4d93c4d commit 45c05cf
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 19 deletions.
49 changes: 38 additions & 11 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.swisspush.redisques.handler.*;
import org.swisspush.redisques.lua.LuaScriptManager;
import org.swisspush.redisques.util.RedisquesConfiguration;
import org.swisspush.redisques.util.Timer;
import org.swisspush.redisques.util.RedisQuesTimer;

import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -30,6 +30,8 @@

public class RedisQues extends AbstractVerticle {

private static final String UPDATE_ADDRESS = "redisques.configuration-updated";

// State of each queue. Consuming means there is a message being processed.
private enum QueueState {
READY, CONSUMING
Expand Down Expand Up @@ -96,7 +98,7 @@ private String getQueueCheckLastexecKey() {
private int processorTimeout = 240000;

private long processorDelayMax;
private Timer timer;
private RedisQuesTimer timer;

private String redisHost;
private int redisPort;
Expand Down Expand Up @@ -153,7 +155,7 @@ public void start() {
checkInterval = modConfig.getCheckInterval();
processorTimeout = modConfig.getProcessorTimeout();
processorDelayMax = modConfig.getProcessorDelayMax();
timer = new Timer(vertx);
timer = new RedisQuesTimer(vertx);

redisHost = modConfig.getRedisHost();
redisPort = modConfig.getRedisPort();
Expand All @@ -173,6 +175,11 @@ public void start() {

RedisquesHttpRequestHandler.init(vertx, modConfig);

eb.consumer(UPDATE_ADDRESS, (Handler<Message<JsonObject>>) event -> {
log.info("Received configurations update");
setConfigurationValues(event.body(), false);
});

// Handles operations
eb.localConsumer(address, (Handler<Message<JsonObject>>) event -> {
String operation = event.body().getString(OPERATION);
Expand Down Expand Up @@ -435,28 +442,45 @@ private void getConfiguration(Message<JsonObject> event) {

private void setConfiguration(Message<JsonObject> event) {
JsonObject configurationValues = event.body().getJsonObject(PAYLOAD);
setConfigurationValues(configurationValues, true).setHandler(setConfigurationValuesEvent -> {
if(setConfigurationValuesEvent.succeeded()){
vertx.eventBus().publish(UPDATE_ADDRESS, configurationValues);
event.reply(setConfigurationValuesEvent.result());
} else {
event.reply(new JsonObject().put(STATUS, ERROR).put(MESSAGE, setConfigurationValuesEvent.cause().getMessage()));
}
});
}

private Future<JsonObject> setConfigurationValues(JsonObject configurationValues, boolean validateOnly){
Future<JsonObject> future = Future.future();

if (configurationValues != null) {
List<String> notAllowedConfigurationValues = findNotAllowedConfigurationValues(configurationValues.fieldNames());
if(notAllowedConfigurationValues.isEmpty()){
try {
Long processorDelayMaxValue = configurationValues.getLong(PROCESSOR_DELAY_MAX);
if(processorDelayMaxValue == null){
event.reply(new JsonObject().put(STATUS, ERROR).put(MESSAGE, "Value for configuration property '"+PROCESSOR_DELAY_MAX+"' is missing"));
return;
future.fail("Value for configuration property '"+PROCESSOR_DELAY_MAX+"' is missing");
return future;
}
if(!validateOnly) {
this.processorDelayMax = processorDelayMaxValue;
log.info("Updated configuration value of property '" + PROCESSOR_DELAY_MAX + "' to " + processorDelayMaxValue);
}
this.processorDelayMax = processorDelayMaxValue;
log.info("Updated configuration value of property '"+PROCESSOR_DELAY_MAX+"' to " + processorDelayMaxValue);
event.reply(new JsonObject().put(STATUS, OK));
future.complete(new JsonObject().put(STATUS, OK));
} catch(ClassCastException ex){
event.reply(new JsonObject().put(STATUS, ERROR).put(MESSAGE, "Value for configuration property '"+PROCESSOR_DELAY_MAX+"' is not a number"));
future.fail("Value for configuration property '"+PROCESSOR_DELAY_MAX+"' is not a number");
}
} else {
String notAllowedConfigurationValuesString = Joiner.on(", ").join(notAllowedConfigurationValues);
event.reply(new JsonObject().put(STATUS, ERROR).put(MESSAGE, "Not supported configuration values received: " + notAllowedConfigurationValuesString));
future.fail("Not supported configuration values received: " + notAllowedConfigurationValuesString);
}
} else {
event.reply(new JsonObject().put(STATUS, ERROR).put(MESSAGE, "Configuration values missing"));
future.fail("Configuration values missing");
}

return future;
}

private List<String> findNotAllowedConfigurationValues(Set<String> configurationValues) {
Expand Down Expand Up @@ -711,6 +735,9 @@ private void rescheduleSendMessageAfterFailure(final String queue) {
}

private void processMessageWithTimeout(final String queue, final String payload, final Handler<SendResult> handler) {
if(processorDelayMax > 0){
log.info("About to process message for queue " + queue + " with a maximum delay of " + processorDelayMax + "ms");
}
timer.executeDelayedMax(processorDelayMax).setHandler(delayed -> {
if (delayed.failed()) {
log.error("Delayed execution has failed. Cause: " + delayed.cause().getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;

import java.util.Random;

Expand All @@ -10,11 +12,13 @@
*
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
public class Timer {
public class RedisQuesTimer {
private final Vertx vertx;
private Random random;

public Timer(Vertx vertx) {
private Logger log = LoggerFactory.getLogger(RedisQuesTimer.class);

public RedisQuesTimer(Vertx vertx) {
this.vertx = vertx;
this.random = new Random();
}
Expand All @@ -30,7 +34,9 @@ public Future<Void> executeDelayedMax(long delayMs) {
Future<Void> future = Future.future();

if (delayMs > 0) {
vertx.setTimer(random.nextInt((int) (delayMs + 1)) + 1, delayed -> future.complete());
int delay = random.nextInt((int) (delayMs + 1)) + 1;
log.debug("starting timer with a delay of " + delay + "ms");
vertx.setTimer(delay, delayed -> future.complete());
} else {
future.complete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@
import org.junit.runner.RunWith;

/**
* Tests for {@link Timer} class.
* Tests for {@link RedisQuesTimer} class.
*
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
@RunWith(VertxUnitRunner.class)
public class TimerTest {
public class RedisQuesTimerTest {

@Test
public void testExecuteDelayedLong(TestContext context){
Async async = context.async();
Timer timer = new Timer(Vertx.vertx());
RedisQuesTimer timer = new RedisQuesTimer(Vertx.vertx());
final int delayMs = 1500;
final long start = System.currentTimeMillis();

Expand All @@ -35,7 +35,7 @@ public void testExecuteDelayedLong(TestContext context){
@Test
public void testExecuteDelayedShort(TestContext context){
Async async = context.async();
Timer timer = new Timer(Vertx.vertx());
RedisQuesTimer timer = new RedisQuesTimer(Vertx.vertx());
final int delayMs = 50;
final long start = System.currentTimeMillis();

Expand All @@ -52,7 +52,7 @@ public void testExecuteDelayedShort(TestContext context){
@Test
public void testExecuteDelayedZero(TestContext context){
Async async = context.async();
Timer timer = new Timer(Vertx.vertx());
RedisQuesTimer timer = new RedisQuesTimer(Vertx.vertx());
final int delayMs = 0;
final long start = System.currentTimeMillis();

Expand Down

0 comments on commit 45c05cf

Please sign in to comment.