Skip to content

Commit a5a8628

Browse files
author
A.Samet İleri
authored
feat: expose error metric when fetching message (#132)
* feat: expose error metric when fetching message * chore: fix lint
1 parent ca225fb commit a5a8628

File tree

5 files changed

+27
-8
lines changed

5 files changed

+27
-8
lines changed

README.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,8 @@ Kafka Konsumer offers an API that handles exposing several metrics.
280280

281281
### Exposed Metrics
282282

283-
| Metric Name | Description | Value Type |
284-
|---------------------------------------------------|---------------------------------------|------------|
285-
| kafka_konsumer_processed_messages_total_current | Total number of processed messages. | Counter |
286-
| kafka_konsumer_unprocessed_messages_total_current | Total number of unprocessed messages. | Counter |
283+
| Metric Name | Description | Value Type |
284+
|------------------------------------------------------------------|------------------------------------------------|------------|
285+
| kafka_konsumer_processed_messages_total_current | Total number of processed messages. | Counter |
286+
| kafka_konsumer_unprocessed_messages_total_current | Total number of unprocessed messages. | Counter |
287+
| kafka_konsumer_error_count_during_fetching_message_total_current | Total number of error during fetching message. | Counter |

collector.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ const Name = "kafka_konsumer"
1111
type MetricCollector struct {
1212
consumerMetric *ConsumerMetric
1313

14-
totalUnprocessedMessagesCounter *prometheus.Desc
15-
totalProcessedMessagesCounter *prometheus.Desc
14+
totalUnprocessedMessagesCounter *prometheus.Desc
15+
totalProcessedMessagesCounter *prometheus.Desc
16+
totalErrorCountDuringFetchingMessage *prometheus.Desc
1617
}
1718

1819
func NewMetricCollector(metricPrefix string, consumerMetric *ConsumerMetric) *MetricCollector {
@@ -35,6 +36,12 @@ func NewMetricCollector(metricPrefix string, consumerMetric *ConsumerMetric) *Me
3536
emptyStringList,
3637
nil,
3738
),
39+
totalErrorCountDuringFetchingMessage: prometheus.NewDesc(
40+
prometheus.BuildFQName(metricPrefix, "error_count_during_fetching_message_total", "current"),
41+
"Total number of error during fetching message.",
42+
emptyStringList,
43+
nil,
44+
),
3845
}
3946
}
4047

@@ -58,6 +65,13 @@ func (s *MetricCollector) Collect(ch chan<- prometheus.Metric) {
5865
float64(s.consumerMetric.TotalUnprocessedMessagesCounter),
5966
emptyStringList...,
6067
)
68+
69+
ch <- prometheus.MustNewConstMetric(
70+
s.totalErrorCountDuringFetchingMessage,
71+
prometheus.CounterValue,
72+
float64(s.consumerMetric.TotalErrorCountDuringFetchingMessage),
73+
emptyStringList...,
74+
)
6175
}
6276

6377
func NewMetricMiddleware(cfg *ConsumerConfig,

consumer_base.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,8 @@ func (c *base) startConsume() {
199199
if c.context.Err() != nil {
200200
continue
201201
}
202+
203+
c.metric.TotalErrorCountDuringFetchingMessage++
202204
c.logger.Warnf("Message could not read, err %s", err.Error())
203205
continue
204206
}

consumer_base_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ func Test_base_startConsume(t *testing.T) {
2222
pause: make(chan struct{}),
2323
logger: NewZapLogger(LogLevelError),
2424
consumerState: stateRunning,
25+
metric: &ConsumerMetric{},
2526
}
2627
b.context, b.cancelFn = context.WithCancel(context.Background())
2728

metric.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package kafka
22

33
type ConsumerMetric struct {
4-
TotalUnprocessedMessagesCounter int64
5-
TotalProcessedMessagesCounter int64
4+
TotalUnprocessedMessagesCounter int64
5+
TotalProcessedMessagesCounter int64
6+
TotalErrorCountDuringFetchingMessage int64
67
}

0 commit comments

Comments
 (0)