Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17248 - KIP 1076 implementation #17021

Open
wants to merge 33 commits into
base: trunk
Choose a base branch
from

Conversation

bbejeck
Copy link
Contributor

@bbejeck bbejeck commented Aug 27, 2024

Implementation of KIP-1076 to allow for adding client application metrics to the KIP-714 framework

Integration and unit tests forthcoming

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@bbejeck bbejeck marked this pull request as draft August 27, 2024 22:26
@bbejeck bbejeck added streams kip Requires or implements a KIP KIP-1076 Metrics for client applications KIP-714 extension labels Aug 27, 2024
@bbejeck bbejeck changed the title [WIP NO MERGE] KAFKA-17248 - KIP 1076 implementation KAFKA-17248 - KIP 1076 implementation Sep 2, 2024
@bbejeck bbejeck marked this pull request as ready for review September 3, 2024 20:33
Copy link
Collaborator

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @bbejeck. I have taken a pass and some comments.

@@ -621,6 +631,9 @@ private KafkaAdminClient(AdminClientConfig config,
retryBackoffMaxMs,
CommonClientConfigs.RETRY_BACKOFF_JITTER);
this.clientTelemetryEnabled = config.getBoolean(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG);
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(this.clientId, config);
this.clientTelemetryReporter = clientTelemetryReporter;
this.clientTelemetryReporter.ifPresent(reporters::add);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add close of the client telemetry reporter on admin client close?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should reporters not be closed when admin client calls metrics.close() (I would assume it does correctly close it's metrics object?)

Following up, because I don't see code changes about it yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added metrics.close()to the KafkaAdmin.close() - metrics.close() does close all reporters

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, but the close will not send the last telemetry push as ClientTelemetryReporter need initiateClose method call to set state as terminating_push_needed which sends the last telemetry metrics to broker. However if we decide not to send last telemetry push in admin client that's fine but we should write it in comments somewhere.

Copy link
Contributor Author

@bbejeck bbejeck Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 good catch, I've updated the close method to call initiateClose, the use in the admin client should follow the same semantics of the other clients.

Comment on lines 554 to 555
clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config);
clientTelemetryReporter.ifPresent(telemetryReporter -> telemetryReporter.contextChange(metricsContext));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though it's good to have the integartion of KIP-714 with admin client but I left it last time because of adhoc connections by Admin client, this will establish telemetry connection to send metrics, do we require them? Secondly, does admin client has any metric of it's own in Kafka? At the time of KIP-714, I didn't see any relevant metric for admin client. Last month we added https://issues.apache.org/jira/browse/KAFKA-17239 to record node latency which was related to https://issues.apache.org/jira/browse/KAFKA-17230. But again should we have KIP-714 pipeline for this 1 admin metric, I am not sure.

Hence should we register additional metrics here? Seems more of an additional overhead for adhoc requests. Else if we want external applications to register additional metrics in Admin client anyways then we might want to consider only enabling client telemetry reporter if additional metrics are registered. Wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, I'll take a look at doing that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we require them?

Yes, as we want to use Admin to report KS instance metrics.

Secondly, does admin client has any metric of it's own in Kafka

I would assume so? Why would we create a new Metrics object if we don't use it? (Even if I have to admit, the docs don't say anything about admin metrics... 🤔 -- maybe @cmccabe can help on this question?)

we might want to consider only enabling client telemetry reporter if additional metrics are registered. Wdyt?

I guess we could do this -- but it's kinda odd to have the Admin config "enable push metrics" but not enable it if set to true...? -- If we don't want to enable it on the admin client, we should rather remove (or for now only deprecate) the existing config, and add a new one via KIP-1076 which would enable "application push metrics" which would only create a reporter if application metrics are registered -- otherwise, we end up with some weird behavior which can easily confuse users.

Btw: I was just looking into the consumer/producer code, and it seem there we just create the reporter and add to reporters list before new Metrics is called -- this way, metricsContext will be "added" automatically to the reporter. Should simplify the code a little bit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm leaning toward not doing this, as I agree with @mjsax. Plus, there's additional complexity in the case where metrics are unregistered. Should that trigger the mirror action of disabling metrics pushing?

Btw: I was just looking into the consumer/producer code, and it seem there we just create the reporter and add to reporters list before new Metrics is called -- this way, metricsContext will be "added" automatically to the reporter. Should simplify the code a little bit?

I'm not sure I follow this comment. Are you referring to the reporters we create in Kafka Streams to act as proxies for passing the metrics to the clients and suggesting we create them in the consumer/producer clients themselves?
Those Reporter instances are specific to Kafka Streams only. Also, the Metrics API provides an addReporter method, so it seems it was designed to add a reporter after instantiating the Metrics object.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the only metric I think admin client emits is node latency metric, which also got recently added as regression. Though I understand that KS wants to send application metrics as part of admin client but we will be adding an additional overhead with short lived admin client operations. I am not very aligned regarding with KIP-714 integration in admin client. @AndrewJSchofield wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 I spoke offline with @mjsax and what we've come up with is that we'll disable metrics in the AdminClient by default (I'll update KIP-1076 with this detail). KafkaStreams will explicitly enable the metrics push for the AdminClient, avoiding any additional overhead for non-KafkaStreams usage. WDYT?

Copy link
Collaborator

@apoorvmittal10 apoorvmittal10 Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds fair to me. For my knowledge, does admin client in streams application has a long running connection where metrics will be emitted? My concern is with if connections are adhoc and we do create too many admin client short lived connections then it might impact the connection cache i.e. connection cache keeps the client instance for maximum of push interval * 3 ms hence if too many admin client connections created in short span then it might hurt the performance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka Streams uses the admin client to also send "delete record" request for repartition topics. For this case, the connection should be long lived, as we send a "delete record" request after each commit by default (it's actually configurable).

For the (more rare) case that there is no repartition topic, it could be a slightly different story, but IIRC, @bbejeck did some tests already, verifying that the connection is not closed by long lived, even for this case. So I think we are good?

We could run more test before the 4.0 release to double check, and if we really find an issue, we could still update the KIP, to let KS use different connection timeout default, to make sure that the connection will be long lived.

@bbejeck
Copy link
Contributor Author

bbejeck commented Sep 5, 2024

Thanks for the comments @apoorvmittal10 - I've addressed your comments.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I understand the end-to-end wiring already, so need to make another pass.

Btw: should we also verify state store metrics (or are they automatically verified as part of task metrics)?

@bbejeck
Copy link
Contributor Author

bbejeck commented Sep 9, 2024

should we also verify state store metrics (or are they automatically verified as part of task metrics)?

They are covered as part of task metrics, but I expanded the test to explicitly verify state store metrics are correct as well

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR update. Made another pass.

* - `Sum`: Monotonic total count meter (Counter). Suitable for metrics like total number of X, e.g., total bytes sent.
* - `Gauge`: Non-monotonic current value meter (UpDownCounter). Suitable for metrics like current value of Y, e.g., current queue count.
* Metrics not matching these types are silently ignored.
* Executing this method for a previously registered metric is a benign operation and results in updating that metrics entry.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

benign

I just learned a new word :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.


@Test
@DisplayName("Calling unregisterMetric on metrics not registered should not cause an error")
public void shouldNotThrowExceptionWhenRemovingNonExistingMetrics() throws InterruptedException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to test this with a heavy-weight integration test? Or could we unit-test this in a more light weight fashion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. I used this test with a "live" consumer because the call to remove metrics goes into the ClientTelemetryReporter, and I want to confirm no exceptions at this level. Mocking would miss this interaction.


@ParameterizedTest
@MethodSource("singleAndMultiTaskParameters")
@DisplayName("Streams metrics should get passed to Consumer")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to above? Should we add this to StreamThreadTest as unit-test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly, but the KafkaStreams constructor creates the admin client and the Reporter instance to pass streams client metrics at this point as well. Given the integration test is needed anyway to verify metrics track with task assignment changes due to a rebalance, it isn't a burden to have this in an integration test.

}

@Test
@DisplayName("Streams metrics should not be visible in consumer metrics")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds unit-testable via StreamThreadTest ?

Copy link
Contributor Author

@bbejeck bbejeck Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave it as is for the same reason as above. This test asserts that passing stream metrics to the consumer, which in turn adds them the ClientTelemetryReporter does not bleed over to the JMX reporter.

@bbejeck
Copy link
Contributor Author

bbejeck commented Sep 13, 2024

@mjsax thanks for the second review, I've addressed your comments

}

private boolean isStreamsClientMetric(final KafkaMetric metric) {
final boolean shouldInclude = metric.metricName().group().equals("stream-metrics");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about stream-thread-metrics, stream-task-metrics, stream-processor-node-metrics, stream-state-metrics and stream-record-cache-metrics? Are they not needed or they fall in same stream-metrics group?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10, those groups are not part of client-level grouping and get reported separately

import java.util.Map;
import java.util.Objects;

public class StreamsClientMetricsDelegatingReporter implements MetricsReporter {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And are we going to expect this reporter being registered additionally in metrics.reporter server config to send Streams metrics?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhh, I see it's added as default metrics reporter on metrics instance.

…ove setting consumer to null in reporter close
…rics, change visibility on test methods to public
Added a new class `StreamsThreadMetricsDelegatingReporter` to handle metrics reporting for Kafka Streams threads. Updated method names across multiple files to improve clarity and consistency by changing `registerMetric` to `registerMetricForSubscription` and `unregisterMetric` to `unregisterMetricFromSubscription`. Additionally, included new tests to verify the changes.
Refactored `KafkaStreamsTelemetryIntegrationTest` to use an added a custom telemetry reporter `TestingClientTelemetry` to integrate telemetry with the test environment. Updated checkstyle to permit new telemetry-related imports.
@github-actions github-actions bot added the core Kafka Broker label Sep 28, 2024
@bbejeck
Copy link
Contributor Author

bbejeck commented Sep 30, 2024

@mjsax, @apoorvmittal10, @AndrewJSchofield comments addressed

.map(Map.Entry::getValue)
.findFirst().get();
assertNotNull(adminInstanceId);
assertNotNull(mainConsumerInstanceId);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have more to do with this test, but having it complete should not block the PR progress.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
clients consumer core Kafka Broker kip Requires or implements a KIP KIP-1076 Metrics for client applications KIP-714 extension producer streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants