You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
We wanted to collect, graph, and alert on lag for the kafka receiver, but observed unexpected behavior when observing the otelcol_kafka_receiver_offset_lag's last values, compared to the values observed using kafka's consumer-groups utility.
Description
The value of last for the measurement otelcol_kafka_receiver_offset_lag does not appear to be calculated correctly.
Also, for context, we are seeing an issue in the otel collector where it keeps emitting lag metrics for partitions it's no longer consuming.
We understand that topic is not present as a tag in this metric, per issue 35336
Steps to Reproduce
We are emitting and collecting the otelcol_kafka_receiver_offset_lag metrics in our collector deployments
We are using grafana to view this metric
Upon querying for this metric's last value, we were surprised to see values that didn't correspond to kafka utility outputs.
Expected Result
Per this screen shot, the value of partition 4's lag as shown using kafka's consumer-groups.sh utility changes over time, and is in the low hundreds or smaller:
you can see starting and ending offsets and the calculated lag of 117, 74, 112, and 129
Actual Result
Per this screen shot, the value of partition 4's lag does not match what true lag is, per kafka's tools:
We think the reason for the incorrect data is the fact that the gauge exists within OTEL's registry even after a rebalance and the metric is not receiving updates
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited but before the offsets are committed for the very last time.
Cleanup(ConsumerGroupSession) error
We have to be potentially careful here with using Cleanup func to solve this issue. By the time Cleanup is called, the session's claims may no longer reflect the partitions that were assigned, or session.Claims() might be empty. We may need to check that assumption as well...
That said... I agree with your approach as long as we are guaranteed on Cleanup(), that the session is accurately populated, we can do something like this:
for topic, partitions := range session.claims() {
for _, partition := range partitions {
c.telemetryBuilder.KafkaReceiverPartitionClose.Add(session.Context(), 1, metric.WithAttributes(
attribute.String(attrInstanceName, c.id.Name()),
attribute.String(attrTopic, topic),
attribute.String(attrPartition, strconv.Itoa(int(partition))),
))
// add cleanup for _offset_lag_ metric here as well.
}
}
Component(s)
receiver/kafka
What happened?
We wanted to collect, graph, and alert on lag for the kafka receiver, but observed unexpected behavior when observing the
otelcol_kafka_receiver_offset_lag
'slast
values, compared to the values observed using kafka's consumer-groups utility.Description
The value of
last
for the measurementotelcol_kafka_receiver_offset_lag
does not appear to be calculated correctly.Also, for context, we are seeing an issue in the otel collector where it keeps emitting lag metrics for partitions it's no longer consuming.
Steps to Reproduce
last
value, we were surprised to see values that didn't correspond to kafka utility outputs.Expected Result
Per this screen shot, the value of partition 4's lag as shown using kafka's consumer-groups.sh utility changes over time, and is in the low hundreds or smaller:
Actual Result
Per this screen shot, the value of partition 4's lag does not match what true lag is, per kafka's tools:
Query in Grafana query builder:
Query as Grafana is running it:
Collector version
otelcol_version: 0.109.0
Environment information
Environment
OS: linux
container
OpenTelemetry Collector configuration
Log output
Additional context
We think the reason for the incorrect data is the fact that the gauge exists within OTEL's registry even after a rebalance and the metric is not receiving updates
Where this gauge is defined:
opentelemetry-collector-contrib/receiver/kafkareceiver/internal/metadata/generated_telemetry.go
Lines 75 to 79 in 0d28558
One of the places where this gauge is updated:
opentelemetry-collector-contrib/receiver/kafkareceiver/kafka_receiver.go
Line 560 in 0d28558
Where this could be addressed:
opentelemetry-collector-contrib/receiver/kafkareceiver/kafka_receiver.go
Lines 529 to 531 in 0d28558
The text was updated successfully, but these errors were encountered: