Skip to content

Commit

Permalink
feat(cloudmonitoring): add grpc metric middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
ericwenn committed Mar 1, 2022
1 parent 2190112 commit e4b09bc
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 3 deletions.
147 changes: 147 additions & 0 deletions cloudmonitoring/metricmiddleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package cloudmonitoring

import (
"context"
"fmt"
"strings"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/global"
"go.opentelemetry.io/otel/metric/unit"
semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// Metric names are based on OTEL semantic conventions for metrics.
// See:
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics
const (
serverRequestDurationMetricName = "rpc.server.duration"
clientRequestDurationMetricName = "rpc.client.duration"

// there is no rpc_count equivalent int OTEL semantic conventions yet.
serverRequestCountMetricName = "rpc.server.rpc_count"
clientRequestCountMetricName = "rpc.client.rpc_count"
)

func NewMetricMiddleware() (MetricMiddleware, error) {
meter := global.Meter("cloudrunner-go/cloudmonitoring")
serverRequestCount, err := meter.NewInt64Counter(
serverRequestCountMetricName,
metric.WithUnit(unit.Dimensionless),
metric.WithDescription("Count of RPCs received by a gRPC server."),
)
if err != nil {
return MetricMiddleware{}, fmt.Errorf("create server request count counter: %w", err)
}
serverRequestDuration, err := meter.NewInt64Histogram(
serverRequestDurationMetricName,
metric.WithUnit(unit.Milliseconds),
metric.WithDescription("Duration of RPCs received by a gRPC server."),
)
if err != nil {
return MetricMiddleware{}, fmt.Errorf("create server request duration histogram: %w", err)
}
clientRequestCount, err := meter.NewInt64Counter(
clientRequestCountMetricName,
metric.WithUnit(unit.Dimensionless),
metric.WithDescription("Count of RPCs sent by a gRPC client."),
)
if err != nil {
return MetricMiddleware{}, fmt.Errorf("create client request count counter: %w", err)
}
clientRequestDuration, err := meter.NewInt64Histogram(
clientRequestDurationMetricName,
metric.WithUnit(unit.Milliseconds),
metric.WithDescription("Duration of RPCs sent by a gRPC client."),
)
if err != nil {
return MetricMiddleware{}, fmt.Errorf("create client request duration histogram: %w", err)
}
return MetricMiddleware{
serverRequestCount: serverRequestCount,
serverRequestDuration: serverRequestDuration,
clientRequestCount: clientRequestCount,
clientRequestDuration: clientRequestDuration,
}, nil
}

type MetricMiddleware struct {
serverRequestCount metric.Int64Counter
serverRequestDuration metric.Int64Histogram
clientRequestCount metric.Int64Counter
clientRequestDuration metric.Int64Histogram
}

// GRPCUnaryServerInterceptor implements grpc.UnaryServerInterceptor and
// emits metrics for request count and request duration when a gRPC server
// receives requests.
func (m *MetricMiddleware) GRPCUnaryServerInterceptor(
ctx context.Context,
request interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
startTime := time.Now()
response, err := handler(ctx, request)
duration := time.Since(startTime)
code := status.Code(err)

attrs := rpcAttrs(info.FullMethod, code)
m.serverRequestCount.Add(ctx, 1, attrs...)
m.serverRequestDuration.Record(ctx, duration.Milliseconds(), attrs...)
return response, err
}

// GRPCUnaryClientInterceptor provides request logging as a grpc.UnaryClientInterceptor.
func (m *MetricMiddleware) GRPCUnaryClientInterceptor(
ctx context.Context,
fullMethod string,
request interface{},
response interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
startTime := time.Now()
err := invoker(ctx, fullMethod, request, response, cc, opts...)
code := status.Code(err)
duration := time.Since(startTime)

attrs := rpcAttrs(fullMethod, code)
m.clientRequestCount.Add(ctx, 1, attrs...)
m.clientRequestDuration.Record(ctx, duration.Milliseconds(), attrs...)
return err
}

func rpcAttrs(fullMethod string, code codes.Code) []attribute.KeyValue {
attrs := make([]attribute.KeyValue, 0, 5)
attrs = append(
attrs,
semconv.RPCSystemKey.String("grpc"),
semconv.RPCGRPCStatusCodeKey.Int64(int64(code)),
// Google Cloud Monitoring does not recognize semconv status code enum,
// so add an attributes with string representation of status code.
attribute.Stringer("rpc.grpc.code", code),
)
if service, method, ok := splitFullMethod(fullMethod); ok {
attrs = append(
attrs,
semconv.RPCServiceKey.String(service),
semconv.RPCMethodKey.String(method),
)
}
return attrs
}

func splitFullMethod(fullMethod string) (service, method string, ok bool) {
serviceAndMethod := strings.SplitN(strings.TrimPrefix(fullMethod, "/"), "/", 2)
if len(serviceAndMethod) != 2 {
return "", "", false
}
return serviceAndMethod[0], serviceAndMethod[1], true
}
1 change: 1 addition & 0 deletions dialservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func DialService(ctx context.Context, target string, opts ...grpc.DialOption) (*
grpc.WithDefaultServiceConfig(run.config.Client.AsServiceConfigJSON()),
grpc.WithChainUnaryInterceptor(
otelgrpc.UnaryClientInterceptor(),
run.metricMiddleware.GRPCUnaryClientInterceptor,
run.requestLoggerMiddleware.GRPCUnaryClientInterceptor,
run.clientMiddleware.GRPCUnaryClientInterceptor,
),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.29.0
go.opentelemetry.io/contrib/instrumentation/runtime v0.29.0
go.opentelemetry.io/otel v1.4.1
go.opentelemetry.io/otel/metric v0.27.0
go.opentelemetry.io/otel/sdk v1.4.1
go.opentelemetry.io/otel/sdk/metric v0.26.0
go.uber.org/zap v1.21.0
Expand Down Expand Up @@ -51,7 +52,6 @@ require (
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/otel/internal/metric v0.27.0 // indirect
go.opentelemetry.io/otel/metric v0.27.0 // indirect
go.opentelemetry.io/otel/sdk/export/metric v0.26.0 // indirect
go.opentelemetry.io/otel/trace v1.4.1 // indirect
go.uber.org/atomic v1.7.0 // indirect
Expand Down
5 changes: 3 additions & 2 deletions grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ func NewGRPCServer(ctx context.Context, opts ...grpc.ServerOption) *grpc.Server
serverOptions := []grpc.ServerOption{
grpc.ChainUnaryInterceptor(
otelgrpc.UnaryServerInterceptor(),
run.loggerMiddleware.GRPCUnaryServerInterceptor, // adds context logger
run.traceMiddleware.GRPCServerUnaryInterceptor, // needs the context logger
run.loggerMiddleware.GRPCUnaryServerInterceptor, // adds context logger
run.traceMiddleware.GRPCServerUnaryInterceptor, // needs the context logger
run.metricMiddleware.GRPCUnaryServerInterceptor,
run.requestLoggerMiddleware.GRPCUnaryServerInterceptor, // needs to run after trace
run.serverMiddleware.GRPCUnaryServerInterceptor, // needs to run after request logger
),
Expand Down
5 changes: 5 additions & 0 deletions run.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func Run(fn func(context.Context) error, options ...Option) error {
run.traceMiddleware.ProjectID = run.config.Runtime.ProjectID
run.serverMiddleware.Config = run.config.Server
run.requestLoggerMiddleware.Config = run.config.RequestLogger
run.metricMiddleware, err = cloudmonitoring.NewMetricMiddleware()
if err != nil {
return fmt.Errorf("cloudrunner.Run: %w", err)
}
ctx = withRunContext(ctx, &run)
ctx = cloudruntime.WithConfig(ctx, run.config.Runtime)
logger, err := cloudzap.NewLogger(run.config.Logger)
Expand Down Expand Up @@ -120,6 +124,7 @@ type runContext struct {
clientMiddleware cloudclient.Middleware
requestLoggerMiddleware cloudrequestlog.Middleware
traceMiddleware cloudtrace.Middleware
metricMiddleware cloudmonitoring.MetricMiddleware
}

type runContextKey struct{}
Expand Down

0 comments on commit e4b09bc

Please sign in to comment.