Skip to content

Commit

Permalink
Simplify filtering for adding metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
bbejeck committed Aug 28, 2024
1 parent 9947822 commit d390f0a
Showing 1 changed file with 4 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;

public class StreamsDelegatingMetricsReporter implements MetricsReporter {

Expand All @@ -49,24 +48,20 @@ public void init(final List<KafkaMetric> metrics) {

@Override
public void metricChange(final KafkaMetric metric) {
if (filteredMetric(metric).isPresent()) {
if (tagMatchesCurrentThread(metric)) {
LOG.info("Registering metric {} for thread={}", metric.metricName().name(), threadId);
consumer.registerMetric(metric);
}
}

Optional<KafkaMetric> filteredMetric(final KafkaMetric kafkaMetric) {
boolean tagMatchesCurrentThread(final KafkaMetric kafkaMetric) {
final Map<String, String> tags = kafkaMetric.metricName().tags();
KafkaMetric maybeKafkaMetric = null;
if (tags.containsKey(THREAD_ID_TAG) && tags.get(THREAD_ID_TAG).equals(threadId)) {
maybeKafkaMetric = kafkaMetric;
}
return Optional.ofNullable(maybeKafkaMetric);
return tags.containsKey(THREAD_ID_TAG) && tags.get(THREAD_ID_TAG).equals(threadId);
}

@Override
public void metricRemoval(final KafkaMetric metric) {
if (filteredMetric(metric).isPresent()) {
if (tagMatchesCurrentThread(metric)) {
LOG.info("Unregistering metric {} for thread={}", metric.metricName().name(), threadId);
consumer.unregisterMetric(metric);
}
Expand Down

0 comments on commit d390f0a

Please sign in to comment.