diff --git a/README.md b/README.md
index d860fff..6e99821 100644
--- a/README.md
+++ b/README.md
@@ -219,9 +219,10 @@ The following configuration values are available:
| deltaEtagsPrefix | redis | delta:etags | The prefix for delta etags redis keys |
| lockPrefix | redis | rest-storage:locks | The prefix for lock redis keys |
| resourceCleanupAmount | redis | 100000 | The maximum amount of resources to clean in a single cleanup run |
-| resourceCleanupIntervalSec | redis | | The interval (in seconds) how often to peform the storage cleanup. When set to _null_ no periodic storage cleanup is peformed |
+| resourceCleanupIntervalSec | redis | | The interval (in seconds) how often to perform the storage cleanup. When set to _null_ no periodic storage cleanup is performed |
| rejectStorageWriteOnLowMemory | redis | false | When set to _true_, PUT requests with the x-importance-level header can be rejected when memory gets low |
| freeMemoryCheckIntervalMs | redis | 60000 | The interval in milliseconds to calculate the actual memory usage |
+| redisReadyCheckIntervalMs | redis | -1 | The interval in milliseconds to calculate the "ready state" of redis. When value < 1, no "ready state" will be calculated |
### Configuration util
diff --git a/pom.xml b/pom.xml
index 9c4ed05..3a10923 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2,7 +2,7 @@
4.0.0
org.swisspush
rest-storage
- 3.1.7-SNAPSHOT
+ 3.1.8-SNAPSHOT
rest-storage
Persistence for REST resources in the filesystem or a redis database
diff --git a/src/main/java/org/swisspush/reststorage/RedisRestStorageRunner.java b/src/main/java/org/swisspush/reststorage/RedisRestStorageRunner.java
index 2a91874..f84dfbc 100644
--- a/src/main/java/org/swisspush/reststorage/RedisRestStorageRunner.java
+++ b/src/main/java/org/swisspush/reststorage/RedisRestStorageRunner.java
@@ -20,6 +20,7 @@ public static void main(String[] args) {
.storageType(ModuleConfiguration.StorageType.redis)
.redisReconnectAttempts(-1)
.redisPoolRecycleTimeoutMs(-1)
+ .redisReadyCheckIntervalMs(10000)
.resourceCleanupIntervalSec(10);
Vertx.vertx().deployVerticle(new RestStorageMod(),
diff --git a/src/main/java/org/swisspush/reststorage/RestStorageHandler.java b/src/main/java/org/swisspush/reststorage/RestStorageHandler.java
index 95e1882..45c7332 100644
--- a/src/main/java/org/swisspush/reststorage/RestStorageHandler.java
+++ b/src/main/java/org/swisspush/reststorage/RestStorageHandler.java
@@ -8,7 +8,6 @@
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
-import io.vertx.core.spi.ExecutorServiceFactory;
import io.vertx.core.streams.Pump;
import io.vertx.ext.auth.authentication.AuthenticationProvider;
import io.vertx.ext.web.Router;
diff --git a/src/main/java/org/swisspush/reststorage/RestStorageMod.java b/src/main/java/org/swisspush/reststorage/RestStorageMod.java
index baa6713..fccfb73 100644
--- a/src/main/java/org/swisspush/reststorage/RestStorageMod.java
+++ b/src/main/java/org/swisspush/reststorage/RestStorageMod.java
@@ -6,6 +6,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.reststorage.exception.RestStorageExceptionFactory;
+import org.swisspush.reststorage.redis.DefaultRedisProvider;
import org.swisspush.reststorage.redis.RedisProvider;
import org.swisspush.reststorage.redis.RedisStorage;
import org.swisspush.reststorage.util.ModuleConfiguration;
diff --git a/src/main/java/org/swisspush/reststorage/DefaultRedisProvider.java b/src/main/java/org/swisspush/reststorage/redis/DefaultRedisProvider.java
similarity index 90%
rename from src/main/java/org/swisspush/reststorage/DefaultRedisProvider.java
rename to src/main/java/org/swisspush/reststorage/redis/DefaultRedisProvider.java
index f8ca4d6..062403d 100644
--- a/src/main/java/org/swisspush/reststorage/DefaultRedisProvider.java
+++ b/src/main/java/org/swisspush/reststorage/redis/DefaultRedisProvider.java
@@ -1,4 +1,4 @@
-package org.swisspush.reststorage;
+package org.swisspush.reststorage.redis;
import io.vertx.core.Future;
import io.vertx.core.Promise;
@@ -11,7 +11,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.reststorage.exception.RestStorageExceptionFactory;
-import org.swisspush.reststorage.redis.RedisProvider;
import org.swisspush.reststorage.util.ModuleConfiguration;
import java.util.ArrayList;
@@ -36,26 +35,42 @@ public class DefaultRedisProvider implements RedisProvider {
private Redis redis;
private final AtomicBoolean connecting = new AtomicBoolean();
private RedisConnection client;
+ private RedisReadyProvider readyProvider;
private final AtomicReference> connectPromiseRef = new AtomicReference<>();
public DefaultRedisProvider(
- Vertx vertx,
- ModuleConfiguration configuration,
- RestStorageExceptionFactory exceptionFactory
+ Vertx vertx,
+ ModuleConfiguration configuration,
+ RestStorageExceptionFactory exceptionFactory
) {
this.vertx = vertx;
this.configuration = configuration;
this.exceptionFactory = exceptionFactory;
+
+ maybeInitRedisReadyProvider();
+ }
+
+ private void maybeInitRedisReadyProvider() {
+ if (configuration.getRedisReadyCheckIntervalMs() > 0) {
+ this.readyProvider = new DefaultRedisReadyProvider(vertx, configuration.getRedisReadyCheckIntervalMs());
+ }
}
@Override
public Future redis() {
- if (redisAPI != null) {
- return Future.succeededFuture(redisAPI);
- } else {
+ if(redisAPI == null) {
return setupRedisClient();
}
+ if(readyProvider == null) {
+ return Future.succeededFuture(redisAPI);
+ }
+ return readyProvider.ready(redisAPI).compose(ready -> {
+ if (ready) {
+ return Future.succeededFuture(redisAPI);
+ }
+ return Future.failedFuture("Not yet ready!");
+ });
}
private boolean reconnectEnabled() {
diff --git a/src/main/java/org/swisspush/reststorage/redis/DefaultRedisReadyProvider.java b/src/main/java/org/swisspush/reststorage/redis/DefaultRedisReadyProvider.java
new file mode 100644
index 0000000..1d3d6d0
--- /dev/null
+++ b/src/main/java/org/swisspush/reststorage/redis/DefaultRedisReadyProvider.java
@@ -0,0 +1,93 @@
+package org.swisspush.reststorage.redis;
+
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.redis.client.RedisAPI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Default implementation of the {@link RedisReadyProvider} based on the INFO
command in Redis
+ *
+ * @author https://github.com/mcweba [Marc-Andre Weber]
+ */
+public class DefaultRedisReadyProvider implements RedisReadyProvider {
+
+ private static final Logger log = LoggerFactory.getLogger(DefaultRedisReadyProvider.class);
+ private static final String DELIMITER = ":";
+ private static final String LOADING = "loading";
+ final AtomicBoolean redisReady = new AtomicBoolean(true);
+ final AtomicBoolean updateRedisReady = new AtomicBoolean(true);
+
+ /**
+ * Constructor defining the "ready-state" update interval
+ * @param vertx
+ * @param updateIntervalMs interval in ms how often to update the "ready-state"
+ */
+ public DefaultRedisReadyProvider(Vertx vertx, int updateIntervalMs) {
+ vertx.setPeriodic(updateIntervalMs, l -> {
+ updateRedisReady.set(true);
+ });
+ }
+
+ @Override
+ public Future ready(RedisAPI redisAPI) {
+ if(updateRedisReady.compareAndSet(true, false)){
+ return updateRedisReadyState(redisAPI);
+ }
+ return Future.succeededFuture(redisReady.get());
+ }
+
+ /**
+ * Call the INFO
command in Redis with a constraint to persistence related information
+ *
+ * @param redisAPI
+ * @return async boolean true when Redis is ready, otherwise false
+ */
+ public Future updateRedisReadyState(RedisAPI redisAPI) {
+ return redisAPI.info(List.of("Persistence")).compose(response -> {
+ boolean ready = getReadyStateFromResponse(response.toString());
+ redisReady.set(ready);
+ return Future.succeededFuture(ready);
+ }, throwable -> {
+ log.error("Error reading redis info", throwable);
+ redisReady.set(false);
+ return Future.succeededFuture(false);
+ });
+ }
+
+ /**
+ * Check the response having a loading:0
entry. If so, Redis is ready. When the response contains a
+ * loading:1
entry or not related entry at all, we consider Redis to be not ready
+ *
+ * @param persistenceInfo the response from Redis _INFO_ command
+ * @return boolean true when Redis is ready, otherwise false
+ */
+ private boolean getReadyStateFromResponse(String persistenceInfo) {
+ byte loadingValue;
+ try {
+ Optional loadingOpt = persistenceInfo
+ .lines()
+ .filter(source -> source.startsWith(LOADING + DELIMITER))
+ .findAny();
+ if (loadingOpt.isEmpty()) {
+ log.warn("No 'loading' section received from redis. Unable to calculate ready state");
+ return false;
+ }
+ loadingValue = Byte.parseByte(loadingOpt.get().split(DELIMITER)[1]);
+ if (loadingValue == 0) {
+ return true;
+ }
+
+ } catch (NumberFormatException ex) {
+ log.warn("Invalid 'loading' section received from redis. Unable to calculate ready state");
+ return false;
+ }
+
+ return false;
+ }
+}
diff --git a/src/main/java/org/swisspush/reststorage/redis/RedisReadyProvider.java b/src/main/java/org/swisspush/reststorage/redis/RedisReadyProvider.java
new file mode 100644
index 0000000..62fd56a
--- /dev/null
+++ b/src/main/java/org/swisspush/reststorage/redis/RedisReadyProvider.java
@@ -0,0 +1,21 @@
+package org.swisspush.reststorage.redis;
+
+import io.vertx.core.Future;
+import io.vertx.redis.client.RedisAPI;
+
+/**
+ * Provides the "ready state" of the Redis database. The connection to Redis may be already established, but Redis is not
+ * yet ready to be used
+ *
+ * @author https://github.com/mcweba [Marc-Andre Weber]
+ */
+public interface RedisReadyProvider {
+
+ /**
+ * Get the "ready state" of the Redis database.
+ *
+ * @param redisAPI API to access redis database
+ * @return An async boolean true when Redis can be used. Returns async boolean false otherwise or in case of an error
+ */
+ Future ready(RedisAPI redisAPI);
+}
diff --git a/src/main/java/org/swisspush/reststorage/redis/RedisStorage.java b/src/main/java/org/swisspush/reststorage/redis/RedisStorage.java
index 447fdba..d5384c8 100644
--- a/src/main/java/org/swisspush/reststorage/redis/RedisStorage.java
+++ b/src/main/java/org/swisspush/reststorage/redis/RedisStorage.java
@@ -213,6 +213,7 @@ public Future> calculateCurrentMemoryUsage() {
log.error("Unable to get memory information from redis",
exceptionFactory.newException("redisProvider.redis() failed", ev.cause()));
promise.complete(Optional.empty());
+ return;
}
var redisAPI = ev.result();
redisAPI.info(Collections.singletonList("memory"), memoryInfo -> {
diff --git a/src/main/java/org/swisspush/reststorage/util/ModuleConfiguration.java b/src/main/java/org/swisspush/reststorage/util/ModuleConfiguration.java
index 5e5604f..2e13921 100644
--- a/src/main/java/org/swisspush/reststorage/util/ModuleConfiguration.java
+++ b/src/main/java/org/swisspush/reststorage/util/ModuleConfiguration.java
@@ -50,6 +50,7 @@ public enum StorageType {
private int redisReconnectAttempts = 0;
private int redisReconnectDelaySec = 30;
private int redisPoolRecycleTimeoutMs = 180_000;
+ private int redisReadyCheckIntervalMs = -1;
private String redisPassword = null;
private String redisUser = null;
private String expirablePrefix = "rest-storage:expirable";
@@ -170,6 +171,11 @@ public ModuleConfiguration redisPoolRecycleTimeoutMs(int redisPoolRecycleTimeout
return this;
}
+ public ModuleConfiguration redisReadyCheckIntervalMs(int redisReadyCheckIntervalMs) {
+ this.redisReadyCheckIntervalMs = redisReadyCheckIntervalMs;
+ return this;
+ }
+
@Deprecated(since = "3.0.17")
public ModuleConfiguration redisAuth(String redisAuth) {
this.redisAuth = redisAuth;
@@ -357,6 +363,10 @@ public int getRedisPoolRecycleTimeoutMs() {
return redisPoolRecycleTimeoutMs;
}
+ public int getRedisReadyCheckIntervalMs() {
+ return redisReadyCheckIntervalMs;
+ }
+
public boolean isRedisEnableTls() {
return redisEnableTls;
}
diff --git a/src/test/java/org/swisspush/reststorage/redis/DefaultRedisReadyProviderTest.java b/src/test/java/org/swisspush/reststorage/redis/DefaultRedisReadyProviderTest.java
new file mode 100644
index 0000000..a888d30
--- /dev/null
+++ b/src/test/java/org/swisspush/reststorage/redis/DefaultRedisReadyProviderTest.java
@@ -0,0 +1,106 @@
+package org.swisspush.reststorage.redis;
+
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.unit.Async;
+import io.vertx.ext.unit.TestContext;
+import io.vertx.ext.unit.junit.VertxUnitRunner;
+import io.vertx.redis.client.RedisAPI;
+import io.vertx.redis.client.impl.types.BulkType;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.swisspush.reststorage.util.ResourcesUtils;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the {@link DefaultRedisReadyProvider} class
+ *
+ * @author https://github.com/mcweba [Marc-Andre Weber]
+ */
+@RunWith(VertxUnitRunner.class)
+public class DefaultRedisReadyProviderTest {
+
+ private final String REDIS_INFO_LOADING = ResourcesUtils.loadResource("redis_info_persistance_loading_1", true);
+ private final String REDIS_INFO_NOT_LOADING = ResourcesUtils.loadResource("redis_info_persistance_loading_0", true);
+
+ private Vertx vertx;
+ private RedisAPI redisAPI;
+ private DefaultRedisReadyProvider readyProvider;
+
+ @Before
+ public void setUp() {
+ this.vertx = Vertx.vertx();
+ redisAPI = Mockito.mock(RedisAPI.class);
+ readyProvider = new DefaultRedisReadyProvider(vertx, 1000);
+ }
+
+ private void assertReadiness(TestContext testContext, AsyncResult event, Boolean expectedReadiness) {
+ testContext.assertTrue(event.succeeded());
+ testContext.assertEquals(expectedReadiness, event.result());
+ }
+
+ @Test
+ public void testRedisReady(TestContext testContext) {
+ Async async = testContext.async();
+ Mockito.when(redisAPI.info(any())).thenReturn(Future.succeededFuture(BulkType.create(Buffer.buffer(REDIS_INFO_NOT_LOADING), false)));
+
+ readyProvider.ready(redisAPI).onComplete(event -> {
+ assertReadiness(testContext, event, true);
+ async.complete();
+ });
+ }
+
+ @Test
+ public void testRedisReadyMultipleCalls(TestContext testContext) {
+ Async async = testContext.async();
+ Mockito.when(redisAPI.info(any())).thenReturn(Future.succeededFuture(BulkType.create(Buffer.buffer(REDIS_INFO_NOT_LOADING), false)));
+
+ readyProvider.ready(redisAPI).onComplete(event -> {
+ assertReadiness(testContext, event, true);
+ readyProvider.ready(redisAPI).onComplete(event2 -> {
+ assertReadiness(testContext, event2, true);
+ async.complete();
+ });
+ });
+
+ verify(redisAPI, times(1)).info(any());
+ }
+
+ @Test
+ public void testRedisNotReady(TestContext testContext) {
+ Async async = testContext.async();
+ Mockito.when(redisAPI.info(any())).thenReturn(Future.succeededFuture(BulkType.create(Buffer.buffer(REDIS_INFO_LOADING), false)));
+
+ readyProvider.ready(redisAPI).onComplete(event -> {
+ assertReadiness(testContext, event, false);
+ async.complete();
+ });
+ }
+
+ @Test
+ public void testRedisNotReadyInvalidInfoResponse(TestContext testContext) {
+ Async async = testContext.async();
+ Mockito.when(redisAPI.info(any())).thenReturn(Future.succeededFuture(BulkType.create(Buffer.buffer("some invalid info response"), false)));
+
+ readyProvider.ready(redisAPI).onComplete(event -> {
+ assertReadiness(testContext, event, false);
+ async.complete();
+ });
+ }
+
+ @Test
+ public void testRedisNotReadyExceptionWhenAccessingRedisAPI(TestContext testContext) {
+ Async async = testContext.async();
+ Mockito.when(redisAPI.info(any())).thenReturn(Future.failedFuture("Boooom"));
+
+ readyProvider.ready(redisAPI).onComplete(event -> {
+ assertReadiness(testContext, event, false);
+ async.complete();
+ });
+ }
+}
diff --git a/src/test/resources/redis_info_persistance_loading_0 b/src/test/resources/redis_info_persistance_loading_0
new file mode 100644
index 0000000..2521abd
--- /dev/null
+++ b/src/test/resources/redis_info_persistance_loading_0
@@ -0,0 +1,31 @@
+# Persistence
+loading:0
+async_loading:0
+current_cow_peak:0
+current_cow_size:0
+current_cow_size_age:0
+current_fork_perc:0.00
+current_save_keys_processed:0
+current_save_keys_total:0
+rdb_changes_since_last_save:108
+rdb_bgsave_in_progress:0
+rdb_last_save_time:1718713249
+rdb_last_bgsave_status:ok
+rdb_last_bgsave_time_sec:0
+rdb_current_bgsave_time_sec:-1
+rdb_saves:296
+rdb_last_cow_size:0
+rdb_last_load_keys_expired:0
+rdb_last_load_keys_loaded:0
+aof_enabled:0
+aof_rewrite_in_progress:0
+aof_rewrite_scheduled:0
+aof_last_rewrite_time_sec:-1
+aof_current_rewrite_time_sec:-1
+aof_last_bgrewrite_status:ok
+aof_rewrites:0
+aof_rewrites_consecutive_failures:0
+aof_last_write_status:ok
+aof_last_cow_size:0
+module_fork_in_progress:0
+module_fork_last_cow_size:0
diff --git a/src/test/resources/redis_info_persistance_loading_1 b/src/test/resources/redis_info_persistance_loading_1
new file mode 100644
index 0000000..de55047
--- /dev/null
+++ b/src/test/resources/redis_info_persistance_loading_1
@@ -0,0 +1,31 @@
+# Persistence
+loading:1
+async_loading:0
+current_cow_peak:0
+current_cow_size:0
+current_cow_size_age:0
+current_fork_perc:0.00
+current_save_keys_processed:0
+current_save_keys_total:0
+rdb_changes_since_last_save:108
+rdb_bgsave_in_progress:0
+rdb_last_save_time:1718713249
+rdb_last_bgsave_status:ok
+rdb_last_bgsave_time_sec:0
+rdb_current_bgsave_time_sec:-1
+rdb_saves:296
+rdb_last_cow_size:0
+rdb_last_load_keys_expired:0
+rdb_last_load_keys_loaded:0
+aof_enabled:0
+aof_rewrite_in_progress:0
+aof_rewrite_scheduled:0
+aof_last_rewrite_time_sec:-1
+aof_current_rewrite_time_sec:-1
+aof_last_bgrewrite_status:ok
+aof_rewrites:0
+aof_rewrites_consecutive_failures:0
+aof_last_write_status:ok
+aof_last_cow_size:0
+module_fork_in_progress:0
+module_fork_last_cow_size:0