Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR for release #196

Merged
merged 10 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,10 @@ The following configuration values are available:
| deltaEtagsPrefix | redis | delta:etags | The prefix for delta etags redis keys |
| lockPrefix | redis | rest-storage:locks | The prefix for lock redis keys |
| resourceCleanupAmount | redis | 100000 | The maximum amount of resources to clean in a single cleanup run |
| resourceCleanupIntervalSec | redis | | The interval (in seconds) how often to peform the storage cleanup. When set to _null_ no periodic storage cleanup is peformed |
| resourceCleanupIntervalSec | redis | | The interval (in seconds) how often to perform the storage cleanup. When set to _null_ no periodic storage cleanup is performed |
| rejectStorageWriteOnLowMemory | redis | false | When set to _true_, PUT requests with the x-importance-level header can be rejected when memory gets low |
| freeMemoryCheckIntervalMs | redis | 60000 | The interval in milliseconds to calculate the actual memory usage |
| redisReadyCheckIntervalMs | redis | -1 | The interval in milliseconds to calculate the "ready state" of redis. When value < 1, no "ready state" will be calculated |

### Configuration util

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.swisspush</groupId>
<artifactId>rest-storage</artifactId>
<version>3.1.7-SNAPSHOT</version>
<version>3.1.8-SNAPSHOT</version>
<name>rest-storage</name>
<description>
Persistence for REST resources in the filesystem or a redis database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public static void main(String[] args) {
.storageType(ModuleConfiguration.StorageType.redis)
.redisReconnectAttempts(-1)
.redisPoolRecycleTimeoutMs(-1)
.redisReadyCheckIntervalMs(10000)
.resourceCleanupIntervalSec(10);

Vertx.vertx().deployVerticle(new RestStorageMod(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.spi.ExecutorServiceFactory;
import io.vertx.core.streams.Pump;
import io.vertx.ext.auth.authentication.AuthenticationProvider;
import io.vertx.ext.web.Router;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.reststorage.exception.RestStorageExceptionFactory;
import org.swisspush.reststorage.redis.DefaultRedisProvider;
import org.swisspush.reststorage.redis.RedisProvider;
import org.swisspush.reststorage.redis.RedisStorage;
import org.swisspush.reststorage.util.ModuleConfiguration;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.swisspush.reststorage;
package org.swisspush.reststorage.redis;

import io.vertx.core.Future;
import io.vertx.core.Promise;
Expand All @@ -11,7 +11,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.reststorage.exception.RestStorageExceptionFactory;
import org.swisspush.reststorage.redis.RedisProvider;
import org.swisspush.reststorage.util.ModuleConfiguration;

import java.util.ArrayList;
Expand All @@ -36,26 +35,42 @@ public class DefaultRedisProvider implements RedisProvider {
private Redis redis;
private final AtomicBoolean connecting = new AtomicBoolean();
private RedisConnection client;
private RedisReadyProvider readyProvider;

private final AtomicReference<Promise<RedisAPI>> connectPromiseRef = new AtomicReference<>();

public DefaultRedisProvider(
Vertx vertx,
ModuleConfiguration configuration,
RestStorageExceptionFactory exceptionFactory
Vertx vertx,
ModuleConfiguration configuration,
RestStorageExceptionFactory exceptionFactory
) {
this.vertx = vertx;
this.configuration = configuration;
this.exceptionFactory = exceptionFactory;

maybeInitRedisReadyProvider();
}

private void maybeInitRedisReadyProvider() {
if (configuration.getRedisReadyCheckIntervalMs() > 0) {
this.readyProvider = new DefaultRedisReadyProvider(vertx, configuration.getRedisReadyCheckIntervalMs());
}
}

@Override
public Future<RedisAPI> redis() {
if (redisAPI != null) {
return Future.succeededFuture(redisAPI);
} else {
if(redisAPI == null) {
return setupRedisClient();
}
if(readyProvider == null) {
return Future.succeededFuture(redisAPI);
}
return readyProvider.ready(redisAPI).compose(ready -> {
if (ready) {
return Future.succeededFuture(redisAPI);
}
return Future.failedFuture("Not yet ready!");
});
}

private boolean reconnectEnabled() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package org.swisspush.reststorage.redis;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.redis.client.RedisAPI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Default implementation of the {@link RedisReadyProvider} based on the <code>INFO</code> command in Redis
*
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
public class DefaultRedisReadyProvider implements RedisReadyProvider {

private static final Logger log = LoggerFactory.getLogger(DefaultRedisReadyProvider.class);
private static final String DELIMITER = ":";
private static final String LOADING = "loading";
final AtomicBoolean redisReady = new AtomicBoolean(true);
final AtomicBoolean updateRedisReady = new AtomicBoolean(true);

/**
* Constructor defining the "ready-state" update interval
* @param vertx
* @param updateIntervalMs interval in ms how often to update the "ready-state"
*/
public DefaultRedisReadyProvider(Vertx vertx, int updateIntervalMs) {
vertx.setPeriodic(updateIntervalMs, l -> {
updateRedisReady.set(true);
});
}

@Override
public Future<Boolean> ready(RedisAPI redisAPI) {
if(updateRedisReady.compareAndSet(true, false)){
return updateRedisReadyState(redisAPI);
}
return Future.succeededFuture(redisReady.get());
}

/**
* Call the <code>INFO</code> command in Redis with a constraint to persistence related information
*
* @param redisAPI
* @return async boolean true when Redis is ready, otherwise false
*/
public Future<Boolean> updateRedisReadyState(RedisAPI redisAPI) {
return redisAPI.info(List.of("Persistence")).compose(response -> {
boolean ready = getReadyStateFromResponse(response.toString());
redisReady.set(ready);
return Future.succeededFuture(ready);
}, throwable -> {
log.error("Error reading redis info", throwable);
redisReady.set(false);
return Future.succeededFuture(false);
});
}

/**
* Check the response having a <code>loading:0</code> entry. If so, Redis is ready. When the response contains a
* <code>loading:1</code> entry or not related entry at all, we consider Redis to be not ready
*
* @param persistenceInfo the response from Redis _INFO_ command
* @return boolean true when Redis is ready, otherwise false
*/
private boolean getReadyStateFromResponse(String persistenceInfo) {
byte loadingValue;
try {
Optional<String> loadingOpt = persistenceInfo
.lines()
.filter(source -> source.startsWith(LOADING + DELIMITER))
.findAny();
if (loadingOpt.isEmpty()) {
log.warn("No 'loading' section received from redis. Unable to calculate ready state");
return false;
}
loadingValue = Byte.parseByte(loadingOpt.get().split(DELIMITER)[1]);
if (loadingValue == 0) {
return true;
}

} catch (NumberFormatException ex) {
log.warn("Invalid 'loading' section received from redis. Unable to calculate ready state");
return false;
}

return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.swisspush.reststorage.redis;

import io.vertx.core.Future;
import io.vertx.redis.client.RedisAPI;

/**
* Provides the "ready state" of the Redis database. The connection to Redis may be already established, but Redis is not
* yet ready to be used
*
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
public interface RedisReadyProvider {

/**
* Get the "ready state" of the Redis database.
*
* @param redisAPI API to access redis database
* @return An async boolean true when Redis can be used. Returns async boolean false otherwise or in case of an error
*/
Future<Boolean> ready(RedisAPI redisAPI);
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ public Future<Optional<Float>> calculateCurrentMemoryUsage() {
log.error("Unable to get memory information from redis",
exceptionFactory.newException("redisProvider.redis() failed", ev.cause()));
promise.complete(Optional.empty());
return;
}
var redisAPI = ev.result();
redisAPI.info(Collections.singletonList("memory"), memoryInfo -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public enum StorageType {
private int redisReconnectAttempts = 0;
private int redisReconnectDelaySec = 30;
private int redisPoolRecycleTimeoutMs = 180_000;
private int redisReadyCheckIntervalMs = -1;
private String redisPassword = null;
private String redisUser = null;
private String expirablePrefix = "rest-storage:expirable";
Expand Down Expand Up @@ -170,6 +171,11 @@ public ModuleConfiguration redisPoolRecycleTimeoutMs(int redisPoolRecycleTimeout
return this;
}

public ModuleConfiguration redisReadyCheckIntervalMs(int redisReadyCheckIntervalMs) {
this.redisReadyCheckIntervalMs = redisReadyCheckIntervalMs;
return this;
}

@Deprecated(since = "3.0.17")
public ModuleConfiguration redisAuth(String redisAuth) {
this.redisAuth = redisAuth;
Expand Down Expand Up @@ -357,6 +363,10 @@ public int getRedisPoolRecycleTimeoutMs() {
return redisPoolRecycleTimeoutMs;
}

public int getRedisReadyCheckIntervalMs() {
return redisReadyCheckIntervalMs;
}

public boolean isRedisEnableTls() {
return redisEnableTls;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package org.swisspush.reststorage.redis;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.redis.client.RedisAPI;
import io.vertx.redis.client.impl.types.BulkType;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.swisspush.reststorage.util.ResourcesUtils;

import static org.mockito.Mockito.*;

/**
* Tests for the {@link DefaultRedisReadyProvider} class
*
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
@RunWith(VertxUnitRunner.class)
public class DefaultRedisReadyProviderTest {

private final String REDIS_INFO_LOADING = ResourcesUtils.loadResource("redis_info_persistance_loading_1", true);
private final String REDIS_INFO_NOT_LOADING = ResourcesUtils.loadResource("redis_info_persistance_loading_0", true);

private Vertx vertx;
private RedisAPI redisAPI;
private DefaultRedisReadyProvider readyProvider;

@Before
public void setUp() {
this.vertx = Vertx.vertx();
redisAPI = Mockito.mock(RedisAPI.class);
readyProvider = new DefaultRedisReadyProvider(vertx, 1000);
}

private void assertReadiness(TestContext testContext, AsyncResult<Boolean> event, Boolean expectedReadiness) {
testContext.assertTrue(event.succeeded());
testContext.assertEquals(expectedReadiness, event.result());
}

@Test
public void testRedisReady(TestContext testContext) {
Async async = testContext.async();
Mockito.when(redisAPI.info(any())).thenReturn(Future.succeededFuture(BulkType.create(Buffer.buffer(REDIS_INFO_NOT_LOADING), false)));

readyProvider.ready(redisAPI).onComplete(event -> {
assertReadiness(testContext, event, true);
async.complete();
});
}

@Test
public void testRedisReadyMultipleCalls(TestContext testContext) {
Async async = testContext.async();
Mockito.when(redisAPI.info(any())).thenReturn(Future.succeededFuture(BulkType.create(Buffer.buffer(REDIS_INFO_NOT_LOADING), false)));

readyProvider.ready(redisAPI).onComplete(event -> {
assertReadiness(testContext, event, true);
readyProvider.ready(redisAPI).onComplete(event2 -> {
assertReadiness(testContext, event2, true);
async.complete();
});
});

verify(redisAPI, times(1)).info(any());
}

@Test
public void testRedisNotReady(TestContext testContext) {
Async async = testContext.async();
Mockito.when(redisAPI.info(any())).thenReturn(Future.succeededFuture(BulkType.create(Buffer.buffer(REDIS_INFO_LOADING), false)));

readyProvider.ready(redisAPI).onComplete(event -> {
assertReadiness(testContext, event, false);
async.complete();
});
}

@Test
public void testRedisNotReadyInvalidInfoResponse(TestContext testContext) {
Async async = testContext.async();
Mockito.when(redisAPI.info(any())).thenReturn(Future.succeededFuture(BulkType.create(Buffer.buffer("some invalid info response"), false)));

readyProvider.ready(redisAPI).onComplete(event -> {
assertReadiness(testContext, event, false);
async.complete();
});
}

@Test
public void testRedisNotReadyExceptionWhenAccessingRedisAPI(TestContext testContext) {
Async async = testContext.async();
Mockito.when(redisAPI.info(any())).thenReturn(Future.failedFuture("Boooom"));

readyProvider.ready(redisAPI).onComplete(event -> {
assertReadiness(testContext, event, false);
async.complete();
});
}
}
Loading
Loading