Skip to content

Commit a73ddbe

Browse files
author
A.Samet İleri
authored
feat: add metric prefix config and expose metricCollectors (#106)
1 parent bce9002 commit a73ddbe

File tree

16 files changed

+335
-76
lines changed

16 files changed

+335
-76
lines changed

README.md

Lines changed: 49 additions & 47 deletions
Large diffs are not rendered by default.

batch_consumer.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package kafka
33
import (
44
"time"
55

6+
"github.com/prometheus/client_golang/prometheus"
7+
68
"github.com/segmentio/kafka-go"
79

810
kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
@@ -51,8 +53,8 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) {
5153
return &c, nil
5254
}
5355

54-
func (b *batchConsumer) GetMetric() *ConsumerMetric {
55-
return b.metric
56+
func (b *batchConsumer) GetMetricCollectors() []prometheus.Collector {
57+
return b.base.GetMetricCollectors()
5658
}
5759

5860
func (b *batchConsumer) Consume() {

collector.go

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,43 @@ import (
88

99
const Name = "kafka_konsumer"
1010

11-
type metricCollector struct {
11+
type MetricCollector struct {
1212
consumerMetric *ConsumerMetric
1313

1414
totalUnprocessedMessagesCounter *prometheus.Desc
1515
totalProcessedMessagesCounter *prometheus.Desc
1616
}
1717

18-
func (s *metricCollector) Describe(ch chan<- *prometheus.Desc) {
18+
func NewMetricCollector(metricPrefix string, consumerMetric *ConsumerMetric) *MetricCollector {
19+
if metricPrefix == "" {
20+
metricPrefix = Name
21+
}
22+
23+
return &MetricCollector{
24+
consumerMetric: consumerMetric,
25+
26+
totalProcessedMessagesCounter: prometheus.NewDesc(
27+
prometheus.BuildFQName(metricPrefix, "processed_messages_total", "current"),
28+
"Total number of processed messages.",
29+
emptyStringList,
30+
nil,
31+
),
32+
totalUnprocessedMessagesCounter: prometheus.NewDesc(
33+
prometheus.BuildFQName(metricPrefix, "unprocessed_messages_total", "current"),
34+
"Total number of unprocessed messages.",
35+
emptyStringList,
36+
nil,
37+
),
38+
}
39+
}
40+
41+
func (s *MetricCollector) Describe(ch chan<- *prometheus.Desc) {
1942
prometheus.DescribeByCollect(s, ch)
2043
}
2144

2245
var emptyStringList []string
2346

24-
func (s *metricCollector) Collect(ch chan<- prometheus.Metric) {
47+
func (s *MetricCollector) Collect(ch chan<- prometheus.Metric) {
2548
ch <- prometheus.MustNewConstMetric(
2649
s.totalProcessedMessagesCounter,
2750
prometheus.CounterValue,
@@ -37,31 +60,12 @@ func (s *metricCollector) Collect(ch chan<- prometheus.Metric) {
3760
)
3861
}
3962

40-
func newMetricCollector(consumerMetric *ConsumerMetric) *metricCollector {
41-
return &metricCollector{
42-
consumerMetric: consumerMetric,
43-
44-
totalProcessedMessagesCounter: prometheus.NewDesc(
45-
prometheus.BuildFQName(Name, "processed_messages_total", "current"),
46-
"Total number of processed messages.",
47-
emptyStringList,
48-
nil,
49-
),
50-
totalUnprocessedMessagesCounter: prometheus.NewDesc(
51-
prometheus.BuildFQName(Name, "unprocessed_messages_total", "current"),
52-
"Total number of unprocessed messages.",
53-
emptyStringList,
54-
nil,
55-
),
56-
}
57-
}
58-
5963
func NewMetricMiddleware(cfg *ConsumerConfig,
6064
app *fiber.App,
6165
consumerMetric *ConsumerMetric,
6266
metricCollectors ...prometheus.Collector,
6367
) (func(ctx *fiber.Ctx) error, error) {
64-
prometheus.DefaultRegisterer.MustRegister(newMetricCollector(consumerMetric))
68+
prometheus.DefaultRegisterer.MustRegister(NewMetricCollector(cfg.MetricPrefix, consumerMetric))
6569
prometheus.DefaultRegisterer.MustRegister(metricCollectors...)
6670

6771
fiberPrometheus := fiberprometheus.New(cfg.Reader.GroupID)

collector_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package kafka
2+
3+
import (
4+
"reflect"
5+
"testing"
6+
7+
"github.com/prometheus/client_golang/prometheus"
8+
)
9+
10+
func Test_NewCollector(t *testing.T) {
11+
t.Run("When_Default_Prefix_Value_Used", func(t *testing.T) {
12+
cronsumerMetric := &ConsumerMetric{}
13+
expectedTotalProcessedMessagesCounter := prometheus.NewDesc(
14+
prometheus.BuildFQName(Name, "processed_messages_total", "current"),
15+
"Total number of processed messages.",
16+
emptyStringList,
17+
nil,
18+
)
19+
expectedTotalUnprocessedMessagesCounter := prometheus.NewDesc(
20+
prometheus.BuildFQName(Name, "unprocessed_messages_total", "current"),
21+
"Total number of unprocessed messages.",
22+
emptyStringList,
23+
nil,
24+
)
25+
26+
collector := NewMetricCollector("", cronsumerMetric)
27+
28+
if !reflect.DeepEqual(collector.totalProcessedMessagesCounter, expectedTotalProcessedMessagesCounter) {
29+
t.Errorf("Expected: %+v, Actual: %+v", collector.totalProcessedMessagesCounter, expectedTotalProcessedMessagesCounter)
30+
}
31+
if !reflect.DeepEqual(collector.totalUnprocessedMessagesCounter, expectedTotalUnprocessedMessagesCounter) {
32+
t.Errorf("Expected: %+v, Actual: %+v", collector.totalUnprocessedMessagesCounter, expectedTotalUnprocessedMessagesCounter)
33+
}
34+
})
35+
t.Run("When_Custom_Prefix_Value_Used", func(t *testing.T) {
36+
cronsumerMetric := &ConsumerMetric{}
37+
expectedTotalProcessedMessagesCounter := prometheus.NewDesc(
38+
prometheus.BuildFQName("custom_prefix", "processed_messages_total", "current"),
39+
"Total number of processed messages.",
40+
emptyStringList,
41+
nil,
42+
)
43+
expectedTotalUnprocessedMessagesCounter := prometheus.NewDesc(
44+
prometheus.BuildFQName("custom_prefix", "unprocessed_messages_total", "current"),
45+
"Total number of unprocessed messages.",
46+
emptyStringList,
47+
nil,
48+
)
49+
50+
collector := NewMetricCollector("custom_prefix", cronsumerMetric)
51+
52+
if !reflect.DeepEqual(collector.totalProcessedMessagesCounter, expectedTotalProcessedMessagesCounter) {
53+
t.Errorf("Expected: %+v, Actual: %+v", collector.totalProcessedMessagesCounter, expectedTotalProcessedMessagesCounter)
54+
}
55+
if !reflect.DeepEqual(collector.totalUnprocessedMessagesCounter, expectedTotalUnprocessedMessagesCounter) {
56+
t.Errorf("Expected: %+v, Actual: %+v", collector.totalUnprocessedMessagesCounter, expectedTotalUnprocessedMessagesCounter)
57+
}
58+
})
59+
}

consumer.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package kafka
33
import (
44
"time"
55

6+
"github.com/prometheus/client_golang/prometheus"
7+
68
"github.com/segmentio/kafka-go"
79

810
kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
@@ -46,6 +48,10 @@ func newSingleConsumer(cfg *ConsumerConfig) (Consumer, error) {
4648
return &c, nil
4749
}
4850

51+
func (c *consumer) GetMetricCollectors() []prometheus.Collector {
52+
return c.base.GetMetricCollectors()
53+
}
54+
4955
func (c *consumer) Consume() {
5056
go c.subprocesses.Start()
5157

consumer_base.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ type Consumer interface {
2626
// Resume function resumes consumer, it is start to working
2727
Resume()
2828

29+
// GetMetricCollectors for the purpose of making metric collectors available.
30+
// You can register these collectors on your own http server.
31+
// Please look at the examples/with-metric-collector directory.
32+
GetMetricCollectors() []prometheus.Collector
33+
2934
// WithLogger for injecting custom log implementation
3035
WithLogger(logger LoggerInterface)
3136

@@ -72,6 +77,7 @@ type base struct {
7277
transactionalRetry bool
7378
distributedTracingEnabled bool
7479
consumerState state
80+
metricPrefix string
7581
}
7682

7783
func NewConsumer(cfg *ConsumerConfig) (Consumer, error) {
@@ -109,6 +115,7 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) {
109115
batchConsumingStream: make(chan []*Message, cfg.Concurrency),
110116
consumerState: stateRunning,
111117
skipMessageByHeaderFn: cfg.SkipMessageByHeaderFn,
118+
metricPrefix: cfg.MetricPrefix,
112119
}
113120

114121
if cfg.DistributedTracingEnabled {
@@ -127,6 +134,18 @@ func (c *base) setupCronsumer(cfg *ConsumerConfig, retryFn func(kcronsumer.Messa
127134
c.subprocesses.Add(c.cronsumer)
128135
}
129136

137+
func (c *base) GetMetricCollectors() []prometheus.Collector {
138+
var metricCollectors []prometheus.Collector
139+
140+
if c.retryEnabled {
141+
metricCollectors = c.cronsumer.GetMetricCollectors()
142+
}
143+
144+
metricCollectors = append(metricCollectors, NewMetricCollector(c.metricPrefix, c.metric))
145+
146+
return metricCollectors
147+
}
148+
130149
func (c *base) setupAPI(cfg *ConsumerConfig, consumerMetric *ConsumerMetric) {
131150
c.logger.Debug("Initializing API")
132151

consumer_config.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,20 @@ type ConsumerConfig struct {
5151
DistributedTracingEnabled bool
5252
RetryEnabled bool
5353
APIEnabled bool
54+
55+
// MetricPrefix is used for prometheus fq name prefix.
56+
// If not provided, default metric prefix value is `kafka_konsumer`.
57+
// Currently, there are two exposed prometheus metrics. `processed_messages_total_current` and `unprocessed_messages_total_current`.
58+
// So, if default metric prefix used, metrics names are `kafka_konsumer_processed_messages_total_current` and
59+
// `kafka_konsumer_unprocessed_messages_total_current`.
60+
MetricPrefix string
5461
}
5562

5663
func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config {
5764
cronsumerCfg := kcronsumer.Config{
58-
ClientID: cfg.RetryConfiguration.ClientID,
59-
Brokers: cfg.RetryConfiguration.Brokers,
65+
MetricPrefix: cfg.RetryConfiguration.MetricPrefix,
66+
ClientID: cfg.RetryConfiguration.ClientID,
67+
Brokers: cfg.RetryConfiguration.Brokers,
6068
Consumer: kcronsumer.ConsumerConfig{
6169
ClientID: cfg.ClientID,
6270
GroupID: cfg.Reader.GroupID,
@@ -131,6 +139,13 @@ func toHeaders(cronsumerHeaders []kcronsumer.Header) []Header {
131139
}
132140

133141
type RetryConfiguration struct {
142+
// MetricPrefix is used for prometheus fq name prefix.
143+
// If not provided, default metric prefix value is `kafka_cronsumer`.
144+
// Currently, there are two exposed prometheus metrics. `retried_messages_total_current` and `discarded_messages_total_current`.
145+
// So, if default metric prefix used, metrics names are `kafka_cronsumer_retried_messages_total_current` and
146+
// `kafka_cronsumer_discarded_messages_total_current`.
147+
MetricPrefix string
148+
134149
SASL *SASLConfig
135150
TLS *TLSConfig
136151
ClientID string

examples/docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ services:
4949
command: "bash -c 'echo Waiting for Kafka to be ready... && \
5050
cub kafka-ready -b kafka:9092 1 20 && \
5151
kafka-topics --create --topic standart-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \
52+
kafka-topics --create --topic another-standart-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \
5253
kafka-topics --create --topic retry-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \
5354
kafka-topics --create --topic error-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \
5455
sleep infinity'"
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
If you run this example and go to http://localhost:8000/metrics,
2+
3+
you can see first and second consumers metrics as shown below
4+
5+
```
6+
# HELP first_discarded_messages_total_current Total number of discarded messages.
7+
# TYPE first_discarded_messages_total_current counter
8+
first_discarded_messages_total_current 0
9+
# HELP first_processed_messages_total_current Total number of processed messages.
10+
# TYPE first_processed_messages_total_current counter
11+
first_processed_messages_total_current 0
12+
# HELP first_retried_messages_total_current Total number of retried messages.
13+
# TYPE first_retried_messages_total_current counter
14+
first_retried_messages_total_current 0
15+
# HELP first_unprocessed_messages_total_current Total number of unprocessed messages.
16+
# TYPE first_unprocessed_messages_total_current counter
17+
first_unprocessed_messages_total_current 0
18+
# HELP second_discarded_messages_total_current Total number of discarded messages.
19+
# TYPE second_discarded_messages_total_current counter
20+
second_discarded_messages_total_current 0
21+
# HELP second_processed_messages_total_current Total number of processed messages.
22+
# TYPE second_processed_messages_total_current counter
23+
second_processed_messages_total_current 0
24+
# HELP second_retried_messages_total_current Total number of retried messages.
25+
# TYPE second_retried_messages_total_current counter
26+
second_retried_messages_total_current 0
27+
# HELP second_unprocessed_messages_total_current Total number of unprocessed messages.
28+
# TYPE second_unprocessed_messages_total_current counter
29+
second_unprocessed_messages_total_current 0
30+
```
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"github.com/gofiber/fiber/v2"
6+
"github.com/prometheus/client_golang/prometheus"
7+
)
8+
9+
const port = 8000
10+
11+
func StartAPI(metricCollectors ...prometheus.Collector) {
12+
f := fiber.New(
13+
fiber.Config{
14+
DisableStartupMessage: true,
15+
DisableDefaultDate: true,
16+
DisableHeaderNormalizing: true,
17+
},
18+
)
19+
20+
metricMiddleware, err := NewMetricMiddleware(f, metricCollectors...)
21+
22+
if err == nil {
23+
f.Use(metricMiddleware)
24+
} else {
25+
fmt.Printf("metric middleware cannot be initialized: %v", err)
26+
}
27+
28+
fmt.Printf("server starting on port %d", port)
29+
30+
go listen(f)
31+
}
32+
33+
func listen(f *fiber.App) {
34+
if err := f.Listen(fmt.Sprintf(":%d", port)); err != nil {
35+
fmt.Printf("server cannot start on port %d, err: %v", port, err)
36+
}
37+
}

0 commit comments

Comments
 (0)