Skip to content

Commit 5470970

Browse files
committed
feat(metrics): add opentelemetry support
Signed-off-by: ahkui <[email protected]> #574
1 parent cdddd60 commit 5470970

13 files changed

+410
-128
lines changed

components/metrics/labels.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ import (
44
"context"
55

66
"github.com/ThreeDotsLabs/watermill/message"
7-
8-
"github.com/prometheus/client_golang/prometheus"
97
)
108

119
const (
@@ -26,7 +24,7 @@ var (
2624
}
2725
)
2826

29-
func labelsFromCtx(ctx context.Context, labels ...string) prometheus.Labels {
27+
func labelsFromCtx(ctx context.Context, labels ...string) map[string]string {
3028
ctxLabels := map[string]string{}
3129

3230
for _, l := range labels {
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package metrics
2+
3+
import (
4+
"github.com/ThreeDotsLabs/watermill/internal"
5+
"github.com/ThreeDotsLabs/watermill/message"
6+
"github.com/pkg/errors"
7+
"go.opentelemetry.io/otel/metric"
8+
)
9+
10+
func NewOpenTelemetryMetricsBuilder(meter metric.Meter, namespace string, subsystem string) OpenTelemetryMetricsBuilder {
11+
return OpenTelemetryMetricsBuilder{
12+
Namespace: namespace,
13+
Subsystem: subsystem,
14+
meter: meter,
15+
}
16+
}
17+
18+
// OpenTelemetryMetricsBuilder provides methods to decorate publishers, subscribers and handlers.
19+
type OpenTelemetryMetricsBuilder struct {
20+
meter metric.Meter
21+
22+
Namespace string
23+
Subsystem string
24+
// PublishBuckets defines the histogram buckets for publish time histogram, defaulted if nil.
25+
PublishBuckets []float64
26+
// HandlerBuckets defines the histogram buckets for handle execution time histogram, defaulted to watermill's default.
27+
HandlerBuckets []float64
28+
}
29+
30+
// AddOpenTelemetryRouterMetrics is a convenience function that acts on the message router to add the metrics middleware
31+
// to all its handlers. The handlers' publishers and subscribers are also decorated.
32+
func (b OpenTelemetryMetricsBuilder) AddOpenTelemetryRouterMetrics(r *message.Router) {
33+
r.AddPublisherDecorators(b.DecoratePublisher)
34+
r.AddSubscriberDecorators(b.DecorateSubscriber)
35+
r.AddMiddleware(b.NewRouterMiddleware().Middleware)
36+
}
37+
38+
// DecoratePublisher wraps the underlying publisher with OpenTelemetry metrics.
39+
func (b OpenTelemetryMetricsBuilder) DecoratePublisher(pub message.Publisher) (message.Publisher, error) {
40+
var err error
41+
d := PublisherOpenTelemetryMetricsDecorator{
42+
pub: pub,
43+
publisherName: internal.StructName(pub),
44+
}
45+
46+
d.publishTimeSeconds, err = b.meter.Float64Histogram(
47+
b.name("publish_time_seconds"),
48+
metric.WithUnit("seconds"),
49+
metric.WithDescription("The time that a publishing attempt (success or not) took in seconds"),
50+
metric.WithExplicitBucketBoundaries(b.PublishBuckets...),
51+
)
52+
53+
if err != nil {
54+
return nil, errors.Wrap(err, "could not register publish time metric")
55+
}
56+
return d, nil
57+
}
58+
59+
// DecorateSubscriber wraps the underlying subscriber with OpenTelemetry metrics.
60+
func (b OpenTelemetryMetricsBuilder) DecorateSubscriber(sub message.Subscriber) (message.Subscriber, error) {
61+
var err error
62+
d := &SubscriberOpenTelemetryMetricsDecorator{
63+
subscriberName: internal.StructName(sub),
64+
}
65+
66+
d.subscriberMessagesReceivedTotal, err = b.meter.Int64Counter(
67+
b.name("subscriber_messages_received_total"),
68+
metric.WithDescription("The total number of messages received by the subscriber"),
69+
)
70+
if err != nil {
71+
return nil, errors.Wrap(err, "could not register time to ack metric")
72+
}
73+
74+
d.Subscriber, err = message.MessageTransformSubscriberDecorator(d.recordMetrics)(sub)
75+
if err != nil {
76+
return nil, errors.Wrap(err, "could not decorate subscriber with metrics decorator")
77+
}
78+
79+
return d, nil
80+
}
81+
func (b OpenTelemetryMetricsBuilder) name(name string) string {
82+
if b.Subsystem != "" {
83+
name = b.Subsystem + "_" + name
84+
}
85+
if b.Namespace != "" {
86+
name = b.Namespace + "_" + name
87+
}
88+
return name
89+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package metrics
2+
3+
import (
4+
"time"
5+
6+
"github.com/ThreeDotsLabs/watermill/message"
7+
"github.com/pkg/errors"
8+
"go.opentelemetry.io/otel/attribute"
9+
"go.opentelemetry.io/otel/metric"
10+
)
11+
12+
var (
13+
handlerLabelKeys = []string{
14+
labelKeyHandlerName,
15+
labelSuccess,
16+
}
17+
18+
// defaultHandlerExecutionTimeBuckets are one order of magnitude smaller than default buckets (5ms~10s),
19+
// because the handler execution times are typically shorter (µs~ms range).
20+
defaultHandlerExecutionTimeBuckets = []float64{
21+
0.0005,
22+
0.001,
23+
0.0025,
24+
0.005,
25+
0.01,
26+
0.025,
27+
0.05,
28+
0.1,
29+
0.25,
30+
0.5,
31+
1,
32+
}
33+
)
34+
35+
// HandlerOpenTelemetryMetricsMiddleware is a middleware that captures OpenTelemetry metrics.
36+
type HandlerOpenTelemetryMetricsMiddleware struct {
37+
handlerExecutionTimeSeconds metric.Float64Histogram
38+
}
39+
40+
// Middleware returns the middleware ready to be used with watermill's Router.
41+
func (m HandlerOpenTelemetryMetricsMiddleware) Middleware(h message.HandlerFunc) message.HandlerFunc {
42+
return func(msg *message.Message) (msgs []*message.Message, err error) {
43+
now := time.Now()
44+
ctx := msg.Context()
45+
labels := []attribute.KeyValue{
46+
attribute.String(labelKeyHandlerName, message.HandlerNameFromCtx(ctx)),
47+
}
48+
49+
defer func() {
50+
if err != nil {
51+
labels = append(labels, attribute.String(labelSuccess, "false"))
52+
} else {
53+
labels = append(labels, attribute.String(labelSuccess, "true"))
54+
}
55+
m.handlerExecutionTimeSeconds.Record(
56+
ctx,
57+
time.Since(now).Seconds(),
58+
metric.WithAttributes(labels...),
59+
)
60+
}()
61+
62+
return h(msg)
63+
}
64+
}
65+
66+
// NewRouterMiddleware returns new middleware.
67+
func (b OpenTelemetryMetricsBuilder) NewRouterMiddleware() HandlerOpenTelemetryMetricsMiddleware {
68+
var err error
69+
m := HandlerOpenTelemetryMetricsMiddleware{}
70+
71+
if b.HandlerBuckets == nil {
72+
b.HandlerBuckets = defaultHandlerExecutionTimeBuckets
73+
}
74+
75+
m.handlerExecutionTimeSeconds, err = b.meter.Float64Histogram(
76+
b.name("handler_execution_time_seconds"),
77+
metric.WithUnit("seconds"),
78+
metric.WithDescription("The total time elapsed while executing the handler function in seconds"),
79+
metric.WithExplicitBucketBoundaries(b.HandlerBuckets...),
80+
)
81+
if err != nil {
82+
panic(errors.Wrap(err, "could not register handler execution time metric"))
83+
}
84+
85+
return m
86+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package metrics
2+
3+
import (
4+
"time"
5+
6+
"github.com/ThreeDotsLabs/watermill/message"
7+
"go.opentelemetry.io/otel/attribute"
8+
"go.opentelemetry.io/otel/metric"
9+
)
10+
11+
// PublisherOpenTelemetryMetricsDecorator decorates a publisher to capture OpenTelemetry metrics.
12+
type PublisherOpenTelemetryMetricsDecorator struct {
13+
pub message.Publisher
14+
publisherName string
15+
publishTimeSeconds metric.Float64Histogram
16+
}
17+
18+
// Publish updates the relevant publisher metrics and calls the wrapped publisher's Publish.
19+
func (m PublisherOpenTelemetryMetricsDecorator) Publish(topic string, messages ...*message.Message) (err error) {
20+
if len(messages) == 0 {
21+
return m.pub.Publish(topic)
22+
}
23+
24+
// TODO: take ctx not only from first msg. Might require changing the signature of Publish, which is planned anyway.
25+
ctx := messages[0].Context()
26+
labelsMap := labelsFromCtx(ctx, publisherLabelKeys...)
27+
if labelsMap[labelKeyPublisherName] == "" {
28+
labelsMap[labelKeyPublisherName] = m.publisherName
29+
}
30+
if labelsMap[labelKeyHandlerName] == "" {
31+
labelsMap[labelKeyHandlerName] = labelValueNoHandler
32+
}
33+
labels := make([]attribute.KeyValue, len(labelsMap))
34+
for k, v := range labelsMap {
35+
labels = append(labels, attribute.String(k, v))
36+
}
37+
start := time.Now()
38+
39+
defer func() {
40+
if publishAlreadyObserved(ctx) {
41+
// decorator idempotency when applied decorator multiple times
42+
return
43+
}
44+
45+
if err != nil {
46+
labels = append(labels, attribute.String(labelSuccess, "false"))
47+
} else {
48+
labels = append(labels, attribute.String(labelSuccess, "true"))
49+
}
50+
51+
m.publishTimeSeconds.Record(
52+
ctx,
53+
time.Since(start).Seconds(),
54+
metric.WithAttributes(labels...),
55+
)
56+
}()
57+
58+
for _, msg := range messages {
59+
msg.SetContext(setPublishObservedToCtx(msg.Context()))
60+
}
61+
62+
return m.pub.Publish(topic, messages...)
63+
}
64+
65+
// Close decreases the total publisher count, closes the OpenTelemetry HTTP server and calls wrapped Close.
66+
func (m PublisherOpenTelemetryMetricsDecorator) Close() error {
67+
return m.pub.Close()
68+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package metrics
2+
3+
import (
4+
"github.com/ThreeDotsLabs/watermill/message"
5+
"go.opentelemetry.io/otel/attribute"
6+
"go.opentelemetry.io/otel/metric"
7+
)
8+
9+
// SubscriberOpenTelemetryMetricsDecorator decorates a subscriber to capture OpenTelemetry metrics.
10+
type SubscriberOpenTelemetryMetricsDecorator struct {
11+
message.Subscriber
12+
subscriberName string
13+
subscriberMessagesReceivedTotal metric.Int64Counter
14+
}
15+
16+
func (s SubscriberOpenTelemetryMetricsDecorator) recordMetrics(msg *message.Message) {
17+
if msg == nil {
18+
return
19+
}
20+
21+
ctx := msg.Context()
22+
labelsMap := labelsFromCtx(ctx, subscriberLabelKeys...)
23+
if labelsMap[labelKeySubscriberName] == "" {
24+
labelsMap[labelKeySubscriberName] = s.subscriberName
25+
}
26+
if labelsMap[labelKeyHandlerName] == "" {
27+
labelsMap[labelKeyHandlerName] = labelValueNoHandler
28+
}
29+
labels := make([]attribute.KeyValue, len(labelsMap))
30+
for k, v := range labelsMap {
31+
labels = append(labels, attribute.String(k, v))
32+
}
33+
34+
go func() {
35+
if subscribeAlreadyObserved(ctx) {
36+
// decorator idempotency when applied decorator multiple times
37+
return
38+
}
39+
40+
select {
41+
case <-msg.Acked():
42+
labels = append(labels, attribute.String(labelAcked, "acked"))
43+
case <-msg.Nacked():
44+
labels = append(labels, attribute.String(labelAcked, "nacked"))
45+
}
46+
s.subscriberMessagesReceivedTotal.Add(ctx, 1, metric.WithAttributes(labels...))
47+
}()
48+
49+
msg.SetContext(setSubscribeObservedToCtx(msg.Context()))
50+
}
File renamed without changes.

components/metrics/handler.go renamed to components/metrics/prometheus_handler.go

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,29 +9,6 @@ import (
99
"github.com/ThreeDotsLabs/watermill/message"
1010
)
1111

12-
var (
13-
handlerLabelKeys = []string{
14-
labelKeyHandlerName,
15-
labelSuccess,
16-
}
17-
18-
// defaultHandlerExecutionTimeBuckets are one order of magnitude smaller than default buckets (5ms~10s),
19-
// because the handler execution times are typically shorter (µs~ms range).
20-
defaultHandlerExecutionTimeBuckets = []float64{
21-
0.0005,
22-
0.001,
23-
0.0025,
24-
0.005,
25-
0.01,
26-
0.025,
27-
0.05,
28-
0.1,
29-
0.25,
30-
0.5,
31-
1,
32-
}
33-
)
34-
3512
// HandlerPrometheusMetricsMiddleware is a middleware that captures Prometheus metrics.
3613
type HandlerPrometheusMetricsMiddleware struct {
3714
handlerExecutionTimeSeconds *prometheus.HistogramVec

0 commit comments

Comments
 (0)