Skip to content

Commit

Permalink
Merge pull request #177 from swisspost/feature/issue176_log_redis_met…
Browse files Browse the repository at this point in the history
…rics

#176 log redis metrics
  • Loading branch information
mcweba authored Apr 2, 2024
2 parents 30d8ca4 + 4e0bec7 commit dc61a4a
Show file tree
Hide file tree
Showing 10 changed files with 494 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

<!-- TEST dependencies -->
<dependency>
Expand Down Expand Up @@ -406,6 +411,7 @@
<commons-codec.version>1.16.0</commons-codec.version>
<commons-lang.version>2.6</commons-lang.version>
<commons-io.version>2.15.1</commons-io.version>
<guava.version>33.1.0-jre</guava.version>
<mockito.version>1.10.19</mockito.version>
<rest-assured.version>5.4.0</rest-assured.version>
<awaitility.version>4.2.0</awaitility.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.swisspush.reststorage.redis;

import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;

public class EventBusRedisMetricsPublisher implements RedisMetricsPublisher {

private final Vertx vertx;
private final String monitoringAddress;
private final String prefix;

public EventBusRedisMetricsPublisher(Vertx vertx, String monitoringAddress, String prefix) {
this.vertx = vertx;
this.monitoringAddress = monitoringAddress;
this.prefix = prefix;
}

@Override
public void publishMetric(String name, long value) {
vertx.eventBus().publish(monitoringAddress,
new JsonObject().put("name", prefix + name).put("action", "set").put("n", value));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.swisspush.reststorage.redis;

public interface RedisMetricsPublisher {

void publishMetric(String name, long value);
}
121 changes: 121 additions & 0 deletions src/main/java/org/swisspush/reststorage/redis/RedisMonitor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package org.swisspush.reststorage.redis;

import com.google.common.base.Splitter;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

import static java.util.Collections.emptyList;

public class RedisMonitor {
private final Vertx vertx;
private final RedisProvider redisProvider;
private final int periodMs;
private final String expirableKey;
private long timer;
private final Logger log = LoggerFactory.getLogger(RedisMonitor.class);

private static final String DELIMITER = ":";

private final RedisMetricsPublisher publisher;

/**
* @param vertx vertx
* @param redisProvider RedisProvider
* @param monitoringAddress The EventBus address to send metrics to
* @param name name used in the metrics EventBus message
* @param expirableKey name of the expirable resources entry
* @param periodSec period in seconds to gather redis metrics
*/
public RedisMonitor(Vertx vertx, RedisProvider redisProvider, String monitoringAddress, String name, String expirableKey, int periodSec) {
this(vertx, redisProvider, expirableKey, periodSec,
new EventBusRedisMetricsPublisher(vertx, monitoringAddress, "redis." + name + ".")
);
}

public RedisMonitor(Vertx vertx, RedisProvider redisProvider, String expirableKey, int periodSec, RedisMetricsPublisher publisher) {
this.vertx = vertx;
this.redisProvider = redisProvider;
this.expirableKey = expirableKey;
this.periodMs = periodSec * 1000;
this.publisher = publisher;
}

public void start() {
timer = vertx.setPeriodic(periodMs, timer -> redisProvider.redis().onSuccess(redisAPI -> {
redisAPI.info(emptyList()).onComplete(event -> {
if (event.succeeded()) {
collectMetrics(event.result().toBuffer());
} else {
log.warn("Cannot collect INFO from redis", event.cause());
}
});

redisAPI.zcard(expirableKey, reply -> {
if (reply.succeeded()) {
long value = reply.result().toLong();
publisher.publishMetric("expirable", value);
} else {
log.warn("Cannot collect zcard from redis for key {}", expirableKey, reply.cause());
}
});

}).onFailure(throwable -> log.warn("Cannot collect INFO from redis", throwable)));
}

public void stop() {
if (timer != 0) {
vertx.cancelTimer(timer);
timer = 0;
}
}

private void collectMetrics(Buffer buffer) {
Map<String, String> map = new HashMap<>();

Splitter.on(System.lineSeparator()).omitEmptyStrings()
.trimResults().splitToList(buffer.toString()).stream()
.filter(input -> input != null && input.contains(DELIMITER)
&& !input.contains("executable")
&& !input.contains("config_file")).forEach(entry -> {
List<String> keyValue = Splitter.on(DELIMITER).omitEmptyStrings().trimResults().splitToList(entry);
if (keyValue.size() == 2) {
map.put(keyValue.get(0), keyValue.get(1));
}
});

log.debug("got redis metrics {}", map);

map.forEach((key, valueStr) -> {
long value;
try {
if (key.startsWith("db")) {
String[] pairs = valueStr.split(",");
for (String pair : pairs) {
String[] tokens = pair.split("=");
if (tokens.length == 2) {
value = Long.parseLong(tokens[1]);
publisher.publishMetric("keyspace." + key + "." + tokens[0], value);
} else {
log.warn("Invalid keyspace property. Will be ignored");
}
}
} else if (key.contains("_cpu_")) {
value = (long) (Double.parseDouble(valueStr) * 1000.0);
publisher.publishMetric(key, value);
} else if (key.contains("fragmentation_ratio")) {
value = (long) (Double.parseDouble(valueStr));
publisher.publishMetric(key, value);
} else {
value = Long.parseLong(valueStr);
publisher.publishMetric(key, value);
}
} catch (NumberFormatException e) {
log.warn("ignore field '{}' because '{}' doesnt look number-ish enough", key, valueStr);
}
});
}
}
18 changes: 18 additions & 0 deletions src/main/java/org/swisspush/reststorage/redis/RedisStorage.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.swisspush.reststorage.redis;

import com.google.common.base.Strings;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
Expand Down Expand Up @@ -76,6 +77,8 @@ public class RedisStorage implements Storage {
private final Lock lock;

private final RedisProvider redisProvider;

private RedisMonitor redisMonitor;
private final Map<LuaScript, LuaScriptState> luaScripts = new HashMap<>();
private final DecimalFormat decimalFormat;

Expand Down Expand Up @@ -132,6 +135,21 @@ public RedisStorage(Vertx vertx, ModuleConfiguration config, RedisProvider redis
if (resourceCleanupIntervalSec != null) {
startPeriodicStorageCleanup(resourceCleanupIntervalSec * 1000L);
}

registerMetricsGathering(config);
}

private void registerMetricsGathering(ModuleConfiguration configuration){
String metricsAddress = configuration.getRedisPublishMetrcisAddress();
if(Strings.isNullOrEmpty(metricsAddress)) {
return;
}
String metricsPrefix = configuration.getRedisPublishMetrcisPrefix();
int metricRefreshPeriodSec = configuration.getRedisPublishMetrcisRefreshPeriodSec();

redisMonitor = new RedisMonitor(vertx, redisProvider, metricsAddress, metricsPrefix,
configuration.getExpirablePrefix(), metricRefreshPeriodSec);
redisMonitor.start();
}

private void startPeriodicStorageCleanup(long intervalMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public enum StorageType {
*/
@Deprecated(since = "3.0.17")
private String redisAuth = null;
private String redisPublishMetrcisAddress = null;
private String redisPublishMetrcisPrefix = "storage";
private int redisPublishMetrcisRefreshPeriodSec = 10;
private int redisReconnectAttempts = 0;
private int redisReconnectDelaySec = 30;
private int redisPoolRecycleTimeoutMs = 180_000;
Expand Down Expand Up @@ -172,6 +175,21 @@ public ModuleConfiguration redisAuth(String redisAuth) {
return this;
}

public ModuleConfiguration redisPublishMetrcisAddress(String redisPublishMetrcisAddress) {
this.redisPublishMetrcisAddress = redisPublishMetrcisAddress;
return this;
}

public ModuleConfiguration redisPublishMetrcisPrefix(String redisPublishMetrcisPrefix) {
this.redisPublishMetrcisPrefix = redisPublishMetrcisPrefix;
return this;
}

public ModuleConfiguration redisPublishMetrcisRefreshPeriodSec(int redisPublishMetrcisRefreshPeriodSec) {
this.redisPublishMetrcisRefreshPeriodSec = redisPublishMetrcisRefreshPeriodSec;
return this;
}

public ModuleConfiguration redisPassword(String redisPassword) {
this.redisPassword = redisPassword;
return this;
Expand Down Expand Up @@ -341,6 +359,22 @@ public String getRedisAuth() {
return redisAuth;
}

public String getRedisPublishMetrcisAddress() {
return redisPublishMetrcisAddress;
}

public String getRedisPublishMetrcisPrefix() {
return redisPublishMetrcisPrefix;
}

public int getRedisPublishMetrcisRefreshPeriodSec() {
if (redisPublishMetrcisRefreshPeriodSec < 1) {
log.debug("Ignoring value {}s for redisPublishMetrcisRefreshPersiodSec (too small) and use 1 instead", redisPublishMetrcisRefreshPeriodSec);
return 1;
}
return redisPublishMetrcisRefreshPeriodSec;
}

public String getRedisPassword() {
return redisPassword;
}
Expand Down
48 changes: 48 additions & 0 deletions src/main/java/org/swisspush/reststorage/util/ResourcesUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.swisspush.reststorage.util;

import com.google.common.base.Charsets;
import com.google.common.io.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URL;

/**
* <p>
* Utility class providing handy methods to deal with Resources.
* </p>
*
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
public class ResourcesUtils {

private static Logger log = LoggerFactory.getLogger(ResourcesUtils.class);

private ResourcesUtils() {
// prevent instantiation
}

/**
* <p>
* Loads the resource with the provided name from the classpath. When param {@code exceptionWhenNotFound}
* set to true, a {@link RuntimeException} is thrown when the resource cannot be loaded.
* </p>
*
* @param resourceName the name of the resource to load
* @param exceptionWhenNotFound throw a {@link RuntimeException} when the resource could not be loaded
* @throws RuntimeException when {@code exceptionWhenNotFound} is set to true and resource cannot be loaded
* @return The content of the resource or null
*/
public static String loadResource(String resourceName, boolean exceptionWhenNotFound) {
try {
URL url = Resources.getResource(resourceName);
return Resources.toString(url, Charsets.UTF_8);
} catch (Exception e) {
log.error("Error loading resource '{}'", resourceName, e);
if(exceptionWhenNotFound){
throw new RuntimeException("Error loading required resource '"+resourceName+"'");
}
return null;
}
}
}
Loading

0 comments on commit dc61a4a

Please sign in to comment.