KAFKA-17248 - KIP 1076 implementation#17021
Conversation
4e8ffa7 to
d390f0a
Compare
61270fb to
b566d34
Compare
d6f9919 to
b96b967
Compare
There was a problem hiding this comment.
Thanks for the PR @bbejeck. I have taken a pass and some comments.
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadDelegatingMetricsReporter.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadDelegatingMetricsReporter.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadDelegatingMetricsReporter.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadDelegatingMetricsReporter.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
|
Thanks for the comments @apoorvmittal10 - I've addressed your comments. |
mjsax
left a comment
There was a problem hiding this comment.
Not sure if I understand the end-to-end wiring already, so need to make another pass.
Btw: should we also verify state store metrics (or are they automatically verified as part of task metrics)?
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadDelegatingMetricsReporter.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
They are covered as part of task metrics, but I expanded the test to explicitly verify state store metrics are correct as well |
mjsax
left a comment
There was a problem hiding this comment.
Thanks for the PR update. Made another pass.
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsClientMetricsDelegatingReporter.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
020d7c4 to
965fd23
Compare
|
@mjsax thanks for the second review, I've addressed your comments |
.../java/org/apache/kafka/streams/internals/metrics/StreamsClientMetricsDelegatingReporter.java
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsClientMetricsDelegatingReporter.java
Show resolved
Hide resolved
965fd23 to
8d64d7d
Compare
b3d420d to
235ccb6
Compare
|
@mjsax, @apoorvmittal10, @AndrewJSchofield comments addressed |
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Show resolved
Hide resolved
AndrewJSchofield
left a comment
There was a problem hiding this comment.
Just one remaining comment.
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Outdated
Show resolved
Hide resolved
|
@AndrewJSchofield, thanks for the review; I've addressed your comment. |
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
… dependency, update import control, fix test
623cb7b to
21667fe
Compare
| final List<String> expectedMetrics = streams.metrics().values().stream().map(Metric::metricName) | ||
| .filter(metricName -> metricName.tags().containsKey("thread-id")).map(mn -> { | ||
| final String name = mn.name().replace('-', '.'); | ||
| final String group = mn.group().replace("-metrics", "").replace('-', '.'); | ||
| return "org.apache.kafka." + group + "." + name; | ||
| }).sorted().collect(Collectors.toList()); |
There was a problem hiding this comment.
Needed to convert the metric names into the expected KIP-714 naming format
|
@mjsax rebased on trunk - ready for final review |
|
|
||
| final Uuid mainConsumerInstanceId = clientInstanceIds.consumerInstanceIds().entrySet().stream() | ||
| .filter(entry -> !entry.getKey().endsWith("-restore-consumer") | ||
| && !entry.getKey().endsWith("GlobalStreamThread")) |
There was a problem hiding this comment.
We don't use -global-consumer in the key for the global consumer?
This sounds like a bug to me -- checking the code, it seems we use getName() + "-consumer" inside StreamThread while we use getName() only for the global case inside KafkaStreams.
Out of scope for this PR, but I think we should fix it in a follow up PR (given that it is technically a breaking change, we should think about it twice though... But as we head into 4.0, it might be ok -- however, we should call it out in the upgrade docs and we need a Jira for it)
| 30_000, | ||
| "Never received subscribed metrics"); | ||
| final List<String> actualInstanceMetrics = TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId); | ||
| final List<String> expectedInstanceMetrics = Arrays.asList("org.apache.kafka.stream.alive.stream.threads", "org.apache.kafka.stream.failed.stream.threads"); |
There was a problem hiding this comment.
Unrelated, but "alive stream thread" seems to be missing in the docs: https://kafka.apache.org/documentation/#kafka_streams_client_monitoring
Would you be interested to do a follow up PR about it?
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
mjsax
left a comment
There was a problem hiding this comment.
Only some nits. Overall LGTM. Feel free to merge.
|
Merged #17021 into trunk |
Implementation of KIP-1076 to allow for adding client application metrics to the KIP-714 framework Reviewers: Apoorv Mittal <amittal@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Matthias Sax <mjsax@apache.org>
Implementation of KIP-1076 to allow for adding client application metrics to the KIP-714 framework Reviewers: Apoorv Mittal <amittal@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Matthias Sax <mjsax@apache.org>
Implementation of KIP-1076 to allow for adding client application metrics to the KIP-714 framework
Integration and unit tests forthcoming
Committer Checklist (excluded from commit message)