Skip to content

Commit

Permalink
Add some more extensive integration test coverage and fix a few bugs …
Browse files Browse the repository at this point in the history
…which were shaken out.
  • Loading branch information
dinowernli committed Jun 12, 2016
1 parent 4eb093c commit 937ed78
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
import io.grpc.ForwardingServerCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor.MethodType;
import me.dinowernli.grpc.prometheus.MonitoringServerInterceptor.Configuration;
import io.grpc.ServerCall;
import io.grpc.Status;
import me.dinowernli.grpc.prometheus.MonitoringServerInterceptor.Configuration;

/**
* A {@link ForwardingServerCall} which update Prometheus metrics based on the server-side actions
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package me.dinowernli.grpc.prometheus;

import io.grpc.ForwardingServerCallListener;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.ServerCall;

/**
Expand All @@ -10,12 +11,14 @@
class MonitoringServerCallListener<R>
extends ForwardingServerCallListener<R> {
private final ServerCall.Listener<R> delegate;
private final MethodType methodType;
private final ServerMetrics serverMetrics;

MonitoringServerCallListener(
ServerCall.Listener<R> delegate, ServerMetrics serverMetrics) {
ServerCall.Listener<R> delegate, ServerMetrics serverMetrics, MethodType methodType) {
this.delegate = delegate;
this.serverMetrics = serverMetrics;
this.methodType = methodType;
}

@Override
Expand All @@ -25,7 +28,9 @@ protected ServerCall.Listener<R> delegate() {

@Override
public void onMessage(R request) {
serverMetrics.recordMessageReceived();
if (methodType == MethodType.CLIENT_STREAMING || methodType == MethodType.BIDI_STREAMING) {
serverMetrics.recordStreamMessageReceived();
}
super.onMessage(request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@
public class MonitoringServerInterceptor implements ServerInterceptor {
private final Clock clock;
private final Configuration configuration;
private final ServerMetrics.Factory serverMetricsFactory;

public static MonitoringServerInterceptor create(Configuration configuration) {
return new MonitoringServerInterceptor(Clock.systemDefaultZone(), configuration);
return new MonitoringServerInterceptor(
Clock.systemDefaultZone(), configuration, new ServerMetrics.Factory(configuration));
}

private MonitoringServerInterceptor(Clock clock, Configuration configuration) {
private MonitoringServerInterceptor(
Clock clock, Configuration configuration, ServerMetrics.Factory serverMetricsFactory) {
this.clock = clock;
this.configuration = configuration;
this.serverMetricsFactory = serverMetricsFactory;
}

@Override
Expand All @@ -32,13 +36,11 @@ public <R, S> ServerCall.Listener<R> interceptCall(
ServerCall<S> call,
Metadata requestHeaders,
ServerCallHandler<R, S> next) {
// TODO(dino): If we cache the ServerMetrics instance, we can achieve an initial 0 value on
// registration and save some cycles here where we always create a new one per-request.
ServerMetrics metrics = ServerMetrics.create(method, configuration.getCollectorRegistry());
ServerMetrics metrics = serverMetricsFactory.createMetricsForMethod(method);
ServerCall<S> monitoringCall = new MonitoringServerCall<S>(
call, clock, method.getType(), metrics, configuration);
return new MonitoringServerCallListener<R>(
next.startCall(method, monitoringCall, requestHeaders), metrics);
next.startCall(method, monitoringCall, requestHeaders), metrics, method.getType());
}

/**
Expand All @@ -61,7 +63,7 @@ public static Configuration cheapMetricsOnly() {
*/
public static Configuration allMetrics() {
return new Configuration(
false /* isIncludeLatencyHistograms */, Optional.empty() /* collectorRegistry */);
true /* isIncludeLatencyHistograms */, Optional.empty() /* collectorRegistry */);
}

/**
Expand All @@ -78,8 +80,8 @@ public boolean isIncludeLatencyHistograms() {
}

/** Returns the {@link CollectorRegistry} used to record stats. */
public Optional<CollectorRegistry> getCollectorRegistry() {
return collectorRegistry;
public CollectorRegistry getCollectorRegistry() {
return collectorRegistry.orElse(CollectorRegistry.defaultRegistry);
}

private Configuration(
Expand Down
94 changes: 67 additions & 27 deletions src/main/java/me/dinowernli/grpc/prometheus/ServerMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.prometheus.client.Counter;
import io.prometheus.client.Histogram;
import io.prometheus.client.SimpleCollector;
import me.dinowernli.grpc.prometheus.MonitoringServerInterceptor.Configuration;

/**
* Prometheus metric definitions used for server-side monitoring of grpc services.
Expand Down Expand Up @@ -47,53 +48,42 @@ class ServerMetrics {
.subsystem("server")
.name("msg_received_total")
.labelNames("grpc_type", "grpc_service", "grpc_method")
.help("Total number of messages received from the client.");
.help("Total number of stream messages received from the client.");

private static final Counter.Builder serverStreamMessagesSentBuilder = Counter.build()
.namespace("grpc")
.subsystem("server")
.name("msg_sent_total")
.labelNames("grpc_type", "grpc_service", "grpc_method")
.help("Total number of gRPC stream messages sent by the server.");
.help("Total number of stream messages sent by the server.");

private final Counter serverStarted;
private final Counter serverHandled;
private final Histogram serverHandledLatencySeconds;
private final Counter serverStreamMessagesReceived;
private final Counter serverStreamMessagesSent;
private final Optional<Histogram> serverHandledLatencySeconds;

private final String methodTypeLabel;
private final String serviceNameLabel;
private final String methodNameLabel;

/**
* Creates an instance of {@link ServerMetrics} for the supplied method. If the
* {@link CollectorRegistry} is empty, the default global registry is used.
*/
static <R, S> ServerMetrics create(
MethodDescriptor<R, S> method, Optional<CollectorRegistry> collectorRegistry) {
CollectorRegistry registry = collectorRegistry.orElse(CollectorRegistry.defaultRegistry);
String serviceName = MethodDescriptor.extractFullServiceName(method.getFullMethodName());

// Full method names are of the form: "full.serviceName/MethodName". We extract the last part.
String methodName = method.getFullMethodName().substring(serviceName.length() + 1);
return new ServerMetrics(method.getType().toString(), serviceName, methodName, registry);
}

private ServerMetrics(
String methodTypeLabel,
String serviceNameLabel,
String methodNameLabel,
CollectorRegistry registry) {
Counter serverStarted,
Counter serverHandled,
Counter serverStreamMessagesReceived,
Counter serverStreamMessagesSent,
Optional<Histogram> serverHandledLatencySeconds) {
this.methodNameLabel = methodNameLabel;
this.methodTypeLabel = methodTypeLabel;
this.serviceNameLabel = serviceNameLabel;

this.serverStarted = serverStartedBuilder.register(registry);
this.serverHandled = serverHandledBuilder.register(registry);
this.serverHandledLatencySeconds = serverHandledLatencySecondsBuilder.register(registry);
this.serverStreamMessagesReceived = serverStreamMessagesReceivedBuilder.register(registry);
this.serverStreamMessagesSent = serverStreamMessagesSentBuilder.register(registry);
this.serverStarted = serverStarted;
this.serverHandled = serverHandled;
this.serverStreamMessagesReceived = serverStreamMessagesReceived;
this.serverStreamMessagesSent = serverStreamMessagesSent;
this.serverHandledLatencySeconds = serverHandledLatencySeconds;
}

public void recordCallStarted() {
Expand All @@ -108,12 +98,62 @@ public void recordStreamMessageSent() {
addLabels(serverStreamMessagesSent).inc();
}

public void recordStreamMessageReceived() {
addLabels(serverStreamMessagesReceived).inc();
}

/**
* Only has any effect if monitoring is configured to include latency histograms. Otherwise, this
* does nothing.
*/
public void recordLatency(double latencySec) {
addLabels(serverHandledLatencySeconds).observe(latencySec);
if (!this.serverHandledLatencySeconds.isPresent()) {
return;
}
addLabels(this.serverHandledLatencySeconds.get()).observe(latencySec);
}

public void recordMessageReceived() {
addLabels(serverStreamMessagesReceived).inc();
/**
* Knows how to produce {@link ServerMetrics} instances for individual methods.
*/
static class Factory {
private final Counter serverStarted;
private final Counter serverHandled;
private final Counter serverStreamMessagesReceived;
private final Counter serverStreamMessagesSent;
private final Optional<Histogram> serverHandledLatencySeconds;

Factory(Configuration configuration) {
CollectorRegistry registry = configuration.getCollectorRegistry();
this.serverStarted = serverStartedBuilder.register(registry);
this.serverHandled = serverHandledBuilder.register(registry);
this.serverStreamMessagesReceived = serverStreamMessagesReceivedBuilder.register(registry);
this.serverStreamMessagesSent = serverStreamMessagesSentBuilder.register(registry);

if (configuration.isIncludeLatencyHistograms()) {
this.serverHandledLatencySeconds =
Optional.of(serverHandledLatencySecondsBuilder.register(registry));
} else {
this.serverHandledLatencySeconds = Optional.empty();
}
}

/** Creates a {@link ServerMetrics} for the supplied method. */
public <R, S> ServerMetrics createMetricsForMethod(MethodDescriptor<R, S> method) {
String serviceName = MethodDescriptor.extractFullServiceName(method.getFullMethodName());

// Full method names are of the form: "full.serviceName/MethodName". We extract the last part.
String methodName = method.getFullMethodName().substring(serviceName.length() + 1);
return new ServerMetrics(
method.getType().toString(),
serviceName,
methodName,
serverStarted,
serverHandled,
serverStreamMessagesReceived,
serverStreamMessagesSent,
serverHandledLatencySeconds);
}
}

private <T> T addLabels(SimpleCollector<T> collector, String... labels) {
Expand Down
Loading

0 comments on commit 937ed78

Please sign in to comment.