Skip to content

Commit

Permalink
#176 log redis metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
mcweba committed Apr 2, 2024
1 parent 30d8ca4 commit 30a41f4
Show file tree
Hide file tree
Showing 10 changed files with 495 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

<!-- TEST dependencies -->
<dependency>
Expand Down Expand Up @@ -406,6 +411,7 @@
<commons-codec.version>1.16.0</commons-codec.version>
<commons-lang.version>2.6</commons-lang.version>
<commons-io.version>2.15.1</commons-io.version>
<guava.version>33.1.0-jre</guava.version>
<mockito.version>1.10.19</mockito.version>
<rest-assured.version>5.4.0</rest-assured.version>
<awaitility.version>4.2.0</awaitility.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.swisspush.reststorage.redis;

import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;

public class EventBusRedisMetricsPublisher implements RedisMetricsPublisher {

private final Vertx vertx;
private final String monitoringAddress;
private final String prefix;

public EventBusRedisMetricsPublisher(Vertx vertx, String monitoringAddress, String prefix) {
this.vertx = vertx;
this.monitoringAddress = monitoringAddress;
this.prefix = prefix;
}

Check warning on line 16 in src/main/java/org/swisspush/reststorage/redis/EventBusRedisMetricsPublisher.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/swisspush/reststorage/redis/EventBusRedisMetricsPublisher.java#L12-L16

Added lines #L12 - L16 were not covered by tests

@Override
public void publishMetric(String name, long value) {
vertx.eventBus().publish(monitoringAddress,
new JsonObject().put("name", prefix + name).put("action", "set").put("n", value));
}

Check warning on line 22 in src/main/java/org/swisspush/reststorage/redis/EventBusRedisMetricsPublisher.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/swisspush/reststorage/redis/EventBusRedisMetricsPublisher.java#L20-L22

Added lines #L20 - L22 were not covered by tests
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.swisspush.reststorage.redis;

public interface RedisMetricsPublisher {

void publishMetric(String name, long value);
}
122 changes: 122 additions & 0 deletions src/main/java/org/swisspush/reststorage/redis/RedisMonitor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package org.swisspush.reststorage.redis;

import com.google.common.base.Splitter;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class RedisMonitor {
private final Vertx vertx;
private final RedisProvider redisProvider;
private final int periodMs;
private final String expirableKey;
private long timer;
private final Logger log = LoggerFactory.getLogger(RedisMonitor.class);

private static final String DELIMITER = ":";

private final RedisMetricsPublisher publisher;

/**
* @param vertx vertx
* @param redisProvider RedisProvider
* @param monitoringAddress The EventBus address to send metrics to
* @param name name used in the metrics EventBus message
* @param expirableKey name of the expirable resources entry
* @param periodSec period in seconds to gather redis metrics
*/
public RedisMonitor(Vertx vertx, RedisProvider redisProvider, String monitoringAddress, String name, String expirableKey, int periodSec) {
this(vertx, redisProvider, expirableKey, periodSec,

Check warning on line 35 in src/main/java/org/swisspush/reststorage/redis/RedisMonitor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/swisspush/reststorage/redis/RedisMonitor.java#L35

Added line #L35 was not covered by tests
new EventBusRedisMetricsPublisher(vertx, monitoringAddress, "redis." + name + ".")
);
}

Check warning on line 38 in src/main/java/org/swisspush/reststorage/redis/RedisMonitor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/swisspush/reststorage/redis/RedisMonitor.java#L38

Added line #L38 was not covered by tests

public RedisMonitor(Vertx vertx, RedisProvider redisProvider, String expirableKey, int periodSec, RedisMetricsPublisher publisher) {
this.vertx = vertx;
this.redisProvider = redisProvider;
this.expirableKey = expirableKey;
this.periodMs = periodSec * 1000;
this.publisher = publisher;
}

public void start() {
timer = vertx.setPeriodic(periodMs, timer -> redisProvider.redis().onSuccess(redisAPI -> {
redisAPI.info(new ArrayList<>()).onComplete(event -> {
if (event.succeeded()) {
collectMetrics(event.result().toBuffer());
} else {
log.warn("Cannot collect INFO from redis");

Check warning on line 54 in src/main/java/org/swisspush/reststorage/redis/RedisMonitor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/swisspush/reststorage/redis/RedisMonitor.java#L54

Added line #L54 was not covered by tests
}
});

redisAPI.zcard(expirableKey, reply -> {
if (reply.succeeded()) {
long value = reply.result().toLong();
publisher.publishMetric("expirable", value);
} else {
log.warn("Cannot collect zcard from redis for key {}", expirableKey);

Check warning on line 63 in src/main/java/org/swisspush/reststorage/redis/RedisMonitor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/swisspush/reststorage/redis/RedisMonitor.java#L60-L63

Added lines #L60 - L63 were not covered by tests
}
});

Check warning on line 65 in src/main/java/org/swisspush/reststorage/redis/RedisMonitor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/swisspush/reststorage/redis/RedisMonitor.java#L65

Added line #L65 was not covered by tests

}).onFailure(throwable -> log.warn("Cannot collect INFO from redis", throwable)));
}

public void stop() {
if (timer != 0) {
vertx.cancelTimer(timer);
timer = 0;

Check warning on line 73 in src/main/java/org/swisspush/reststorage/redis/RedisMonitor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/swisspush/reststorage/redis/RedisMonitor.java#L72-L73

Added lines #L72 - L73 were not covered by tests
}
}

Check warning on line 75 in src/main/java/org/swisspush/reststorage/redis/RedisMonitor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/swisspush/reststorage/redis/RedisMonitor.java#L75

Added line #L75 was not covered by tests

private void collectMetrics(Buffer buffer) {
Map<String, String> map = new HashMap<>();

Splitter.on(System.lineSeparator()).omitEmptyStrings()
.trimResults().splitToList(buffer.toString()).stream()
.filter(input -> input != null && input.contains(DELIMITER)
&& !input.contains("executable")
&& !input.contains("config_file")).forEach(entry -> {
List<String> keyValue = Splitter.on(DELIMITER).omitEmptyStrings().trimResults().splitToList(entry);
if (keyValue.size() == 2) {
map.put(keyValue.get(0), keyValue.get(1));
}
});

log.debug("got redis metrics {}", map);

map.forEach((key, valueStr) -> {
long value;
try {
if (key.startsWith("db")) {
String[] pairs = valueStr.split(",");
for (String pair : pairs) {
String[] tokens = pair.split("=");
if (tokens.length == 2) {
value = Long.parseLong(tokens[1]);
publisher.publishMetric("keyspace." + key + "." + tokens[0], value);
} else {
log.warn("Invalid keyspace property. Will be ignored");
}
}
} else if (key.contains("_cpu_")) {
value = (long) (Double.parseDouble(valueStr) * 1000.0);
publisher.publishMetric(key, value);
} else if (key.contains("fragmentation_ratio")) {
value = (long) (Double.parseDouble(valueStr));
publisher.publishMetric(key, value);
} else {
value = Long.parseLong(valueStr);
publisher.publishMetric(key, value);
}
} catch (NumberFormatException e) {
log.warn("ignore field '{}' because '{}' doesnt look number-ish enough", key, valueStr);
}
});
}
}
18 changes: 18 additions & 0 deletions src/main/java/org/swisspush/reststorage/redis/RedisStorage.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.swisspush.reststorage.redis;

import com.google.common.base.Strings;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
Expand Down Expand Up @@ -76,6 +77,8 @@ public class RedisStorage implements Storage {
private final Lock lock;

private final RedisProvider redisProvider;

private RedisMonitor redisMonitor;
private final Map<LuaScript, LuaScriptState> luaScripts = new HashMap<>();
private final DecimalFormat decimalFormat;

Expand Down Expand Up @@ -132,6 +135,21 @@ public RedisStorage(Vertx vertx, ModuleConfiguration config, RedisProvider redis
if (resourceCleanupIntervalSec != null) {
startPeriodicStorageCleanup(resourceCleanupIntervalSec * 1000L);
}

registerMetricsGathering(config);
}

private void registerMetricsGathering(ModuleConfiguration configuration){
String metricsAddress = configuration.getRedisPublishMetrcisAddress();
if(Strings.isNullOrEmpty(metricsAddress)) {
return;
}
String metricsPrefix = configuration.getRedisPublishMetrcisPrefix();
int metricRefreshPeriodSec = configuration.getRedisPublishMetrcisRefreshPeriodSec();

Check warning on line 148 in src/main/java/org/swisspush/reststorage/redis/RedisStorage.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/swisspush/reststorage/redis/RedisStorage.java#L147-L148

Added lines #L147 - L148 were not covered by tests

redisMonitor = new RedisMonitor(vertx, redisProvider, metricsAddress, metricsPrefix,
configuration.getExpirablePrefix(), metricRefreshPeriodSec);
redisMonitor.start();

Check warning on line 152 in src/main/java/org/swisspush/reststorage/redis/RedisStorage.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/swisspush/reststorage/redis/RedisStorage.java#L150-L152

Added lines #L150 - L152 were not covered by tests
}

private void startPeriodicStorageCleanup(long intervalMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public enum StorageType {
*/
@Deprecated(since = "3.0.17")
private String redisAuth = null;
private String redisPublishMetrcisAddress = null;
private String redisPublishMetrcisPrefix = "storage";
private int redisPublishMetrcisRefreshPeriodSec = 10;
private int redisReconnectAttempts = 0;
private int redisReconnectDelaySec = 30;
private int redisPoolRecycleTimeoutMs = 180_000;
Expand Down Expand Up @@ -172,6 +175,21 @@ public ModuleConfiguration redisAuth(String redisAuth) {
return this;
}

public ModuleConfiguration redisPublishMetrcisAddress(String redisPublishMetrcisAddress) {
this.redisPublishMetrcisAddress = redisPublishMetrcisAddress;
return this;
}

public ModuleConfiguration redisPublishMetrcisPrefix(String redisPublishMetrcisPrefix) {
this.redisPublishMetrcisPrefix = redisPublishMetrcisPrefix;
return this;
}

public ModuleConfiguration redisPublishMetrcisRefreshPeriodSec(int redisPublishMetrcisRefreshPeriodSec) {
this.redisPublishMetrcisRefreshPeriodSec = redisPublishMetrcisRefreshPeriodSec;
return this;
}

public ModuleConfiguration redisPassword(String redisPassword) {
this.redisPassword = redisPassword;
return this;
Expand Down Expand Up @@ -341,6 +359,22 @@ public String getRedisAuth() {
return redisAuth;
}

public String getRedisPublishMetrcisAddress() {
return redisPublishMetrcisAddress;
}

public String getRedisPublishMetrcisPrefix() {
return redisPublishMetrcisPrefix;
}

public int getRedisPublishMetrcisRefreshPeriodSec() {
if (redisPublishMetrcisRefreshPeriodSec < 1) {
log.debug("Ignoring value {}s for redisPublishMetrcisRefreshPersiodSec (too small) and use 1 instead", redisPublishMetrcisRefreshPeriodSec);
return 1;
}
return redisPublishMetrcisRefreshPeriodSec;
}

public String getRedisPassword() {
return redisPassword;
}
Expand Down
48 changes: 48 additions & 0 deletions src/main/java/org/swisspush/reststorage/util/ResourcesUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.swisspush.reststorage.util;

import com.google.common.base.Charsets;
import com.google.common.io.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URL;

/**
* <p>
* Utility class providing handy methods to deal with Resources.
* </p>
*
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
public class ResourcesUtils {

private static Logger log = LoggerFactory.getLogger(ResourcesUtils.class);

private ResourcesUtils() {
// prevent instantiation
}

/**
* <p>
* Loads the resource with the provided name from the classpath. When param {@code exceptionWhenNotFound}
* set to true, a {@link RuntimeException} is thrown when the resource cannot be loaded.
* </p>
*
* @param resourceName the name of the resource to load
* @param exceptionWhenNotFound throw a {@link RuntimeException} when the resource could not be loaded
* @throws RuntimeException when {@code exceptionWhenNotFound} is set to true and resource cannot be loaded
* @return The content of the resource or null
*/
public static String loadResource(String resourceName, boolean exceptionWhenNotFound) {
try {
URL url = Resources.getResource(resourceName);
return Resources.toString(url, Charsets.UTF_8);
} catch (Exception e) {
log.error("Error loading resource '"+resourceName+"'", e);

Check warning on line 41 in src/main/java/org/swisspush/reststorage/util/ResourcesUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/swisspush/reststorage/util/ResourcesUtils.java#L40-L41

Added lines #L40 - L41 were not covered by tests
if(exceptionWhenNotFound){
throw new RuntimeException("Error loading required resource '"+resourceName+"'");

Check warning on line 43 in src/main/java/org/swisspush/reststorage/util/ResourcesUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/swisspush/reststorage/util/ResourcesUtils.java#L43

Added line #L43 was not covered by tests
}
return null;

Check warning on line 45 in src/main/java/org/swisspush/reststorage/util/ResourcesUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/swisspush/reststorage/util/ResourcesUtils.java#L45

Added line #L45 was not covered by tests
}
}
}
Loading

0 comments on commit 30a41f4

Please sign in to comment.