Skip to content

Commit

Permalink
consumer and publisher metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
hmoragrega committed Nov 19, 2021
1 parent b41c04b commit 69e2d69
Show file tree
Hide file tree
Showing 6 changed files with 305 additions and 117 deletions.
75 changes: 47 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,72 +15,91 @@ import (
pubsubprom "github.com/hmoragrega/pubsub-prometheus"
)

// Initialize a router will default metrics.
router := pubsubprom.MustInstrument(pubsub.Router{})
// Initialize a router with default metrics.
router := pubsubprom.MustInstrumentRouter(pubsub.Router{})

// Add the publisher middleware for publishing metrics.
publisher := pubsubprom.InstrumentPublisher(publisher)
```

## List of provided metrics:

* `pubsub_message_processed` (`histogram`)
* `pubsub_message_acknowledgements` (`counter`)
* `pubsub_message_checkpoint` (`counter`)
* `pubsub_message_consumed` (`counter`)
* `pubsub_message_published` (`counter`)
* `pubsub_message_publishing` (`histogram`)

## Configuration

Metrics can be tweaked using a custom monitor
```go
import (
"github.com/hmoragrega/pubsub"
pubsubprom "github.com/hmoragrega/pubsub-prometheus"
)
The names of the metrics can be tweaked in multiple ways:

var router pubsub.Router
err := pubsubprom.InstrumentWithMonitor(&pubsubprom.Monitor{
// tweak options
}, &router)
```
The monitor accepts a namespace and subsystem option that will be applied to all metrics.

## Provided Metrics
You can also tweak the name of a single metric and other parameters by adjusting the prometheus option struct.

### Common labels:
## Provided Metrics

All metrics come with these labels
Most metrics share these common labels when available in the operation:

* `consumer`: name of the consumer.
* `msg_name`: name of the message (if any).
* `msg_name`: name of the message.
* `msg_version`: version of the message.
* `error`: `true|false` if the monitored operation reported an error.

### Processed message histogram
### Processed messages histogram

The amount of time it took to process a message in the consumer.

* Name: `pubsub_message_processed`
* Default name: `pubsub_message_processed`

### Checkpoint counters

For every executed checkpoint while processing a message

* Name: `pubsub_message_checkpoint`
* Extra Labels:
* Default name: `pubsub_message_checkpoint`
* Extra labels:
* `checkpoint`: the name of the checkpoint

### Acknowledgements counter
### Message acknowledgements counter

The acknowledgements operations.
The message acknowledgements operations.

* Name: `pubsub_message_acknowledgements`
* Extra Labels:
* Default name: `pubsub_message_acknowledgements`
* Extra labels:
* `operation`: `ack|nack|re-schedule`

## Customize Metrics
### Consumed messages

Metrics can be customized using the exported options, for example changing the metric name or adding the namespace and
subsystem information.
Number of messages consumed by consumer.

* Default name: `pubsub_message_consumed`

### Published messages

Number of messages published by publisher.

* Default name: `pubsub_message_published`

### Publishing operations

A histogram with the result of publishing operations.

* Default name: `pubsub_message_publishing`

[ci-badge]: https://github.com/hmoragrega/pubsub-prometheus/workflows/CI/badge.svg

[ci-url]: https://github.com/hmoragrega/pubsub-prometheus/actions?query=workflow%3ACI

[coverage-badge]: https://coveralls.io/repos/github/hmoragrega/pubsub-prometheus/badge.svg?branch=main

[coverage-url]: https://coveralls.io/github/hmoragrega/pubsub-prometheus?branch=main

[godoc-badge]: https://pkg.go.dev/badge/github.com/hmoragrega/pubsub-prometheus.svg

[godoc-url]: https://pkg.go.dev/github.com/hmoragrega/pubsub-prometheus

[goreport-badge]: https://goreportcard.com/badge/github.com/hmoragrega/pubsub-prometheus

[goreport-url]: https://goreportcard.com/report/github.com/hmoragrega/pubsub-prometheus
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.15

require (
github.com/hashicorp/go-multierror v1.1.0
github.com/hmoragrega/pubsub v0.7.0
github.com/hmoragrega/pubsub v0.9.0
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/client_model v0.2.0
github.com/stretchr/testify v1.7.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI=
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
github.com/hmoragrega/pubsub v0.7.0 h1:GXWbTAe+Pq302ZVnYFqoLwTt0K12KN+K4uAWC5oDQIE=
github.com/hmoragrega/pubsub v0.7.0/go.mod h1:/Ee+F0Dyk1ehhnHvRO5k7TMl0Yhj5GJ/okhRm2jpwQM=
github.com/hmoragrega/pubsub v0.9.0 h1:1lzGruIWR9y1QiRfen21CJQdSz/vNKHAMzD3hixSJ6A=
github.com/hmoragrega/pubsub v0.9.0/go.mod h1:/Ee+F0Dyk1ehhnHvRO5k7TMl0Yhj5GJ/okhRm2jpwQM=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
Expand Down
106 changes: 80 additions & 26 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pubsub_prometheus

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -21,6 +20,7 @@ const (

checkpointKey labelKey = "checkpoint"
operationKey labelKey = "operation"
topicKey labelKey = "topic"
)

var (
Expand All @@ -44,6 +44,15 @@ type Monitor struct {
// AckOpts optional options for the message acknowledgements counter.
AckOpts prometheus.CounterOpts

// PublishOpts optional options for the publishing attempts histogram.
PublishOpts prometheus.HistogramOpts

// PublishedOpts optional options for the published messages counter.
PublishedOpts prometheus.CounterOpts

// ConsumedOpts optional options for the subscriber next message counter.
ConsumedOpts prometheus.CounterOpts

// Namespace will be used on all metrics unless overwritten by the
// specific metric config.
Namespace string
Expand All @@ -55,6 +64,9 @@ type Monitor struct {
processed *prometheus.HistogramVec
checkpoint *prometheus.CounterVec
ack *prometheus.CounterVec
publish *prometheus.HistogramVec
published *prometheus.CounterVec
consumed *prometheus.CounterVec

once sync.Once
registerErr error
Expand All @@ -78,6 +90,9 @@ func (m *Monitor) Register() error {
m.buildProcessed(m.ProcessedOpts),
m.buildCheckpoint(m.CheckpointOpts),
m.buildAck(m.AckOpts),
m.buildPublish(m.PublishOpts),
m.buildPublished(m.PublishedOpts),
m.buildConsumed(m.ConsumedOpts),
}

result := make(chan error, 4)
Expand All @@ -99,44 +114,44 @@ func (m *Monitor) Register() error {
return m.registerErr
}

// MustInstrumentWithMonitor helper to instrument a router and returns the same instance,
// MustInstrumentRouterWithMonitor helper to instrument a router and returns the same instance,
// use it for one line router initializations.
//
// It will panic on metric registration error.
func MustInstrumentWithMonitor(monitor *Monitor, router *pubsub.Router) *pubsub.Router {
return monitor.MustInstrument(router)
func MustInstrumentRouterWithMonitor(monitor *Monitor, router *pubsub.Router) *pubsub.Router {
return monitor.MustInstrumentRouter(router)
}

// MustInstrument helper to instrument a router and returns the same instance,
// MustInstrumentRouter helper to instrument a router and returns the same instance,
// use it for one line router initializations.
//
// It will panic on metric registration error.
func MustInstrument(router *pubsub.Router) *pubsub.Router {
return MustInstrumentWithMonitor(&Monitor{}, router)
func MustInstrumentRouter(router *pubsub.Router) *pubsub.Router {
return MustInstrumentRouterWithMonitor(&Monitor{}, router)
}

// MustInstrument helper to instrument a router and returns the same instance.
// MustInstrumentRouter helper to instrument a router and returns the same instance.
//
// It will panic on metric registration error.
func (m *Monitor) MustInstrument(router *pubsub.Router) *pubsub.Router {
if err := m.Instrument(router); err != nil {
func (m *Monitor) MustInstrumentRouter(router *pubsub.Router) *pubsub.Router {
if err := m.InstrumentRouter(router); err != nil {
panic(err)
}
return router
}

// InstrumentWithMonitor helper to instrument a router returning any errors that may happen.
func InstrumentWithMonitor(monitor *Monitor, router *pubsub.Router) error {
return monitor.Instrument(router)
// InstrumentRouterWithMonitor helper to instrument a router returning any errors that may happen.
func InstrumentRouterWithMonitor(monitor *Monitor, router *pubsub.Router) error {
return monitor.InstrumentRouter(router)
}

// Instrument helper to instrument a router returning any errors that may happen.
func Instrument(router *pubsub.Router) error {
return InstrumentWithMonitor(&Monitor{}, router)
// InstrumentRouter helper to instrument a router returning any errors that may happen.
func InstrumentRouter(router *pubsub.Router) error {
return InstrumentRouterWithMonitor(&Monitor{}, router)
}

// Instrument a router returning any errors that may happen.
func (m *Monitor) Instrument(router *pubsub.Router) error {
// InstrumentRouter a router returning any errors that may happen.
func (m *Monitor) InstrumentRouter(router *pubsub.Router) error {
if err := m.Register(); err != nil {
return err
}
Expand All @@ -150,16 +165,22 @@ func (m *Monitor) Instrument(router *pubsub.Router) error {
// processed messages
router.OnProcess = pubsub.WrapOnProcess(router.OnProcess, m.onProcess)

// acknowledgements
router.MessageModifier = pubsub.WrapMessageModifier(router.MessageModifier, messageWrapper(m.ack))
// acknowledgements and consume operations
router.OnNext = pubsub.WrapNext(router.OnNext, m.onNext)

return nil
}

func messageWrapper(ack *prometheus.CounterVec) pubsub.MessageModifier {
return func(ctx context.Context, consumerName string, message pubsub.ReceivedMessage) pubsub.ReceivedMessage {
return wrap(message, consumerName, ack)
func (m *Monitor) onNext(_ context.Context, consumerName string, next pubsub.Next) pubsub.Next {
if msg := next.Message; msg != nil {
next.Message = wrap(msg, consumerName, m.ack)
}

m.consumed.With(
metricLabels(nil, consumerName, next.Message, next.Err)).
Add(1)

return next
}

// OnProcess will sent a histogram metric
Expand Down Expand Up @@ -236,19 +257,52 @@ func (m *Monitor) buildAck(opts prometheus.CounterOpts) *prometheus.CounterVec {
return h
}

func (m *Monitor) buildConsumed(opts prometheus.CounterOpts) *prometheus.CounterVec {
if opts.Name == "" {
opts.Name = "pubsub_message_consumed"
}
if opts.Help == "" {
opts.Help = "Counter consumed next message"
}
if opts.Namespace == "" {
opts.Namespace = m.Namespace
}
if opts.Subsystem == "" {
opts.Subsystem = m.Subsystem
}

h := prometheus.NewCounterVec(opts, metricKeys())
m.consumed = h
return h
}

func metricKeys(keys ...string) []string {
return append(keys, commonLabels...)
}

func metricLabels(custom map[string]string, consumerName string, msg pubsub.ReceivedMessage, err error) map[string]string {
var version string
var msgName string
if msg != nil {
version = msg.Version()
msgName = msg.Name()
}

labels := map[string]string{
consumerKey: consumerName,
msgNameKey: msg.Name(),
msgVersionKey: msg.Version(),
errorKey: fmt.Sprintf("%v", err != nil),
msgNameKey: msgName,
msgVersionKey: version,
errorKey: errorLabel(err),
}
for k, v := range custom {
labels[k] = v
}
return labels
}

func errorLabel(err error) string {
if err != nil {
return "true"
}
return "false"
}
Loading

0 comments on commit 69e2d69

Please sign in to comment.