-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17248 - KIP 1076 implementation #17021
base: trunk
Are you sure you want to change the base?
Conversation
4e8ffa7
to
d390f0a
Compare
61270fb
to
b566d34
Compare
d6f9919
to
b96b967
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @bbejeck. I have taken a pass and some comments.
@@ -621,6 +631,9 @@ private KafkaAdminClient(AdminClientConfig config, | |||
retryBackoffMaxMs, | |||
CommonClientConfigs.RETRY_BACKOFF_JITTER); | |||
this.clientTelemetryEnabled = config.getBoolean(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG); | |||
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(this.clientId, config); | |||
this.clientTelemetryReporter = clientTelemetryReporter; | |||
this.clientTelemetryReporter.ifPresent(reporters::add); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add close of the client telemetry reporter on admin client close?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should reporters not be closed when admin client calls metrics.close()
(I would assume it does correctly close it's metrics
object?)
Following up, because I don't see code changes about it yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added metrics.close()
to the KafkaAdmin.close()
- metrics.close()
does close all reporters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, but the close will not send the last telemetry push as ClientTelemetryReporter need initiateClose
method call to set state as terminating_push_needed
which sends the last telemetry metrics to broker. However if we decide not to send last telemetry push in admin client that's fine but we should write it in comments somewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 good catch, I've updated the close method to call initiateClose
, the use in the admin client should follow the same semantics of the other clients.
clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config); | ||
clientTelemetryReporter.ifPresent(telemetryReporter -> telemetryReporter.contextChange(metricsContext)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though it's good to have the integartion of KIP-714 with admin client but I left it last time because of adhoc connections by Admin client, this will establish telemetry connection to send metrics, do we require them? Secondly, does admin client has any metric of it's own in Kafka? At the time of KIP-714, I didn't see any relevant metric for admin client. Last month we added https://issues.apache.org/jira/browse/KAFKA-17239 to record node latency which was related to https://issues.apache.org/jira/browse/KAFKA-17230. But again should we have KIP-714 pipeline for this 1 admin metric, I am not sure.
Hence should we register additional metrics here? Seems more of an additional overhead for adhoc requests. Else if we want external applications to register additional metrics in Admin client anyways then we might want to consider only enabling client telemetry reporter if additional metrics are registered. Wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point, I'll take a look at doing that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we require them?
Yes, as we want to use Admin
to report KS instance metrics.
Secondly, does admin client has any metric of it's own in Kafka
I would assume so? Why would we create a new Metrics
object if we don't use it? (Even if I have to admit, the docs don't say anything about admin metrics... 🤔 -- maybe @cmccabe can help on this question?)
we might want to consider only enabling client telemetry reporter if additional metrics are registered. Wdyt?
I guess we could do this -- but it's kinda odd to have the Admin config "enable push metrics" but not enable it if set to true
...? -- If we don't want to enable it on the admin client, we should rather remove (or for now only deprecate) the existing config, and add a new one via KIP-1076 which would enable "application push metrics" which would only create a reporter if application metrics are registered -- otherwise, we end up with some weird behavior which can easily confuse users.
Btw: I was just looking into the consumer/producer code, and it seem there we just create the reporter and add to reporters
list before new Metrics
is called -- this way, metricsContext
will be "added" automatically to the reporter. Should simplify the code a little bit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm leaning toward not doing this, as I agree with @mjsax. Plus, there's additional complexity in the case where metrics are unregistered. Should that trigger the mirror action of disabling metrics pushing?
Btw: I was just looking into the consumer/producer code, and it seem there we just create the reporter and add to reporters list before new Metrics is called -- this way, metricsContext will be "added" automatically to the reporter. Should simplify the code a little bit?
I'm not sure I follow this comment. Are you referring to the reporters we create in Kafka Streams to act as proxies for passing the metrics to the clients and suggesting we create them in the consumer/producer clients themselves?
Those Reporter
instances are specific to Kafka Streams only. Also, the Metrics
API provides an addReporter
method, so it seems it was designed to add a reporter after instantiating the Metrics
object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the only metric I think admin client emits is node latency
metric, which also got recently added as regression. Though I understand that KS wants to send application metrics as part of admin client but we will be adding an additional overhead with short lived admin client operations. I am not very aligned regarding with KIP-714 integration in admin client. @AndrewJSchofield wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 I spoke offline with @mjsax and what we've come up with is that we'll disable metrics in the AdminClient by default (I'll update KIP-1076 with this detail). KafkaStreams will explicitly enable the metrics push for the AdminClient, avoiding any additional overhead for non-KafkaStreams usage. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds fair to me. For my knowledge, does admin client in streams application has a long running connection where metrics will be emitted? My concern is with if connections are adhoc and we do create too many admin client short lived connections then it might impact the connection cache i.e. connection cache keeps the client instance for maximum of push interval * 3 ms hence if too many admin client connections created in short span then it might hurt the performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kafka Streams uses the admin client to also send "delete record" request for repartition topics. For this case, the connection should be long lived, as we send a "delete record" request after each commit by default (it's actually configurable).
For the (more rare) case that there is no repartition topic, it could be a slightly different story, but IIRC, @bbejeck did some tests already, verifying that the connection is not closed by long lived, even for this case. So I think we are good?
We could run more test before the 4.0 release to double check, and if we really find an issue, we could still update the KIP, to let KS use different connection timeout default, to make sure that the connection will be long lived.
clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadDelegatingMetricsReporter.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadDelegatingMetricsReporter.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadDelegatingMetricsReporter.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadDelegatingMetricsReporter.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Show resolved
Hide resolved
Thanks for the comments @apoorvmittal10 - I've addressed your comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if I understand the end-to-end wiring already, so need to make another pass.
Btw: should we also verify state store metrics (or are they automatically verified as part of task metrics)?
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadDelegatingMetricsReporter.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
They are covered as part of task metrics, but I expanded the test to explicitly verify state store metrics are correct as well |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR update. Made another pass.
* - `Sum`: Monotonic total count meter (Counter). Suitable for metrics like total number of X, e.g., total bytes sent. | ||
* - `Gauge`: Non-monotonic current value meter (UpDownCounter). Suitable for metrics like current value of Y, e.g., current queue count. | ||
* Metrics not matching these types are silently ignored. | ||
* Executing this method for a previously registered metric is a benign operation and results in updating that metrics entry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
benign
I just learned a new word :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsClientMetricsDelegatingReporter.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
Outdated
Show resolved
Hide resolved
|
||
@Test | ||
@DisplayName("Calling unregisterMetric on metrics not registered should not cause an error") | ||
public void shouldNotThrowExceptionWhenRemovingNonExistingMetrics() throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to test this with a heavy-weight integration test? Or could we unit-test this in a more light weight fashion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. I used this test with a "live" consumer because the call to remove metrics goes into the ClientTelemetryReporter
, and I want to confirm no exceptions at this level. Mocking would miss this interaction.
|
||
@ParameterizedTest | ||
@MethodSource("singleAndMultiTaskParameters") | ||
@DisplayName("Streams metrics should get passed to Consumer") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to above? Should we add this to StreamThreadTest
as unit-test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly, but the KafkaStreams
constructor creates the admin client and the Reporter
instance to pass streams client metrics at this point as well. Given the integration test is needed anyway to verify metrics track with task assignment changes due to a rebalance, it isn't a burden to have this in an integration test.
} | ||
|
||
@Test | ||
@DisplayName("Streams metrics should not be visible in consumer metrics") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds unit-testable via StreamThreadTest
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll leave it as is for the same reason as above. This test asserts that passing stream metrics to the consumer, which in turn adds them the ClientTelemetryReporter
does not bleed over to the JMX reporter.
020d7c4
to
965fd23
Compare
@mjsax thanks for the second review, I've addressed your comments |
} | ||
|
||
private boolean isStreamsClientMetric(final KafkaMetric metric) { | ||
final boolean shouldInclude = metric.metricName().group().equals("stream-metrics"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about stream-thread-metrics
, stream-task-metrics
, stream-processor-node-metrics
, stream-state-metrics
and stream-record-cache-metrics
? Are they not needed or they fall in same stream-metrics
group?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10, those groups are not part of client-level grouping and get reported separately
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
public class StreamsClientMetricsDelegatingReporter implements MetricsReporter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And are we going to expect this reporter being registered additionally in metrics.reporter
server config to send Streams metrics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahhh, I see it's added as default metrics reporter on metrics instance.
965fd23
to
8d64d7d
Compare
Correct metric filtering to only include metrics with thread-id Remove unused method
…cs infra totally for adding removing metrics.
…ove setting consumer to null in reporter close
…rics, change visibility on test methods to public
Added a new class `StreamsThreadMetricsDelegatingReporter` to handle metrics reporting for Kafka Streams threads. Updated method names across multiple files to improve clarity and consistency by changing `registerMetric` to `registerMetricForSubscription` and `unregisterMetric` to `unregisterMetricFromSubscription`. Additionally, included new tests to verify the changes.
Refactored `KafkaStreamsTelemetryIntegrationTest` to use an added a custom telemetry reporter `TestingClientTelemetry` to integrate telemetry with the test environment. Updated checkstyle to permit new telemetry-related imports.
b3d420d
to
235ccb6
Compare
@mjsax, @apoorvmittal10, @AndrewJSchofield comments addressed |
.map(Map.Entry::getValue) | ||
.findFirst().get(); | ||
assertNotNull(adminInstanceId); | ||
assertNotNull(mainConsumerInstanceId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have more to do with this test, but having it complete should not block the PR progress.
Implementation of KIP-1076 to allow for adding client application metrics to the KIP-714 framework
Integration and unit tests forthcoming
Committer Checklist (excluded from commit message)