-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-17248 - KIP 1076 implementation #17021
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
4e8ffa7
to
d390f0a
Compare
61270fb
to
b566d34
Compare
d6f9919
to
b96b967
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @bbejeck. I have taken a pass and some comments.
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadDelegatingMetricsReporter.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadDelegatingMetricsReporter.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadDelegatingMetricsReporter.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadDelegatingMetricsReporter.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
Thanks for the comments @apoorvmittal10 - I've addressed your comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if I understand the end-to-end wiring already, so need to make another pass.
Btw: should we also verify state store metrics (or are they automatically verified as part of task metrics)?
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadDelegatingMetricsReporter.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
They are covered as part of task metrics, but I expanded the test to explicitly verify state store metrics are correct as well |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR update. Made another pass.
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsClientMetricsDelegatingReporter.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
020d7c4
to
965fd23
Compare
@mjsax thanks for the second review, I've addressed your comments |
.../java/org/apache/kafka/streams/internals/metrics/StreamsClientMetricsDelegatingReporter.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsClientMetricsDelegatingReporter.java
Outdated
Show resolved
Hide resolved
965fd23
to
8d64d7d
Compare
b3d420d
to
235ccb6
Compare
@mjsax, @apoorvmittal10, @AndrewJSchofield comments addressed |
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one remaining comment.
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Outdated
Show resolved
Hide resolved
@AndrewJSchofield, thanks for the review; I've addressed your comment. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
… dependency, update import control, fix test
623cb7b
to
21667fe
Compare
final List<String> expectedMetrics = streams.metrics().values().stream().map(Metric::metricName) | ||
.filter(metricName -> metricName.tags().containsKey("thread-id")).map(mn -> { | ||
final String name = mn.name().replace('-', '.'); | ||
final String group = mn.group().replace("-metrics", "").replace('-', '.'); | ||
return "org.apache.kafka." + group + "." + name; | ||
}).sorted().collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed to convert the metric names into the expected KIP-714 naming format
@mjsax rebased on trunk - ready for final review |
|
||
final Uuid mainConsumerInstanceId = clientInstanceIds.consumerInstanceIds().entrySet().stream() | ||
.filter(entry -> !entry.getKey().endsWith("-restore-consumer") | ||
&& !entry.getKey().endsWith("GlobalStreamThread")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't use -global-consumer
in the key for the global consumer?
This sounds like a bug to me -- checking the code, it seems we use getName() + "-consumer"
inside StreamThread
while we use getName()
only for the global case inside KafkaStreams
.
Out of scope for this PR, but I think we should fix it in a follow up PR (given that it is technically a breaking change, we should think about it twice though... But as we head into 4.0, it might be ok -- however, we should call it out in the upgrade docs and we need a Jira for it)
30_000, | ||
"Never received subscribed metrics"); | ||
final List<String> actualInstanceMetrics = TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId); | ||
final List<String> expectedInstanceMetrics = Arrays.asList("org.apache.kafka.stream.alive.stream.threads", "org.apache.kafka.stream.failed.stream.threads"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated, but "alive stream thread" seems to be missing in the docs: https://kafka.apache.org/documentation/#kafka_streams_client_monitoring
Would you be interested to do a follow up PR about it?
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only some nits. Overall LGTM. Feel free to merge.
Merged #17021 into trunk |
Implementation of KIP-1076 to allow for adding client application metrics to the KIP-714 framework Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield <[email protected]>, Matthias Sax <[email protected]>
Implementation of KIP-1076 to allow for adding client application metrics to the KIP-714 framework Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield <[email protected]>, Matthias Sax <[email protected]>
Implementation of KIP-1076 to allow for adding client application metrics to the KIP-714 framework
Integration and unit tests forthcoming
Committer Checklist (excluded from commit message)