Skip to content

Commit

Permalink
Disable metrics push in AdminClient by default; update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
bbejeck committed Sep 27, 2024
1 parent 17d12db commit 6d8db41
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public class AdminClientConfig extends AbstractConfig {
RETRY_BACKOFF_MAX_MS_DOC)
.define(ENABLE_METRICS_PUSH_CONFIG,
Type.BOOLEAN,
true,
false,
Importance.LOW,
ENABLE_METRICS_PUSH_DOC)
.define(REQUEST_TIMEOUT_MS_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
Expand Down Expand Up @@ -445,7 +446,7 @@ public void testMetricsReporterAutoGeneratedClientId() {
MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) admin.metrics.reporters().get(0);

assertEquals(admin.getClientId(), mockMetricsReporter.clientId);
assertEquals(3, admin.metrics.reporters().size());
assertEquals(2, admin.metrics.reporters().size());
admin.close();
}

Expand All @@ -455,7 +456,6 @@ public void testDisableJmxReporter() {
Properties props = new Properties();
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(AdminClientConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
props.setProperty(CommonClientConfigs.ENABLE_METRICS_PUSH_CONFIG, "false");
KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props);
assertTrue(admin.metrics.reporters().isEmpty());
admin.close();
Expand All @@ -466,9 +466,21 @@ public void testExplicitlyEnableJmxReporter() {
Properties props = new Properties();
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter");
props.setProperty(CommonClientConfigs.ENABLE_METRICS_PUSH_CONFIG, "true");
KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props);
assertEquals(1, admin.metrics.reporters().size());
admin.close();
}

@Test
public void testExplicitlyEnableTelemetryReporter() {
Properties props = new Properties();
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter");
props.setProperty(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, "true");
KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props);
assertEquals(2, admin.metrics.reporters().size());
//ClientTelemetryReporter always added after metrics reporters created with JmxReporter
assertInstanceOf(ClientTelemetryReporter.class, admin.metrics.reporters().get(1));
admin.close();
}

Expand Down Expand Up @@ -8103,7 +8115,7 @@ public void testFenceProducers() throws Exception {

@Test
public void testClientInstanceId() {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
try (AdminClientUnitTestEnv env = mockClientEnv(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, "true")) {
Uuid expected = Uuid.randomUuid();

GetTelemetrySubscriptionsResponseData responseData =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1812,6 +1812,7 @@ public Map<String, Object> getAdminConfigs(final String clientId) {

// add client id with stream client id prefix
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
props.put(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, true);

return props;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kafka.streams.integration;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand Down Expand Up @@ -391,6 +392,7 @@ public Consumer<byte[], byte[]> getGlobalConsumer(final Map<String, Object> conf

@Override
public Admin getAdmin(final Map<String, Object> config) {
assertTrue((Boolean) config.get(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG));
final TestingMetricsInterceptingAdminClient adminClient = new TestingMetricsInterceptingAdminClient(config);
INTERCEPTING_ADMIN_CLIENTS.add(adminClient);
return adminClient;
Expand Down

0 comments on commit 6d8db41

Please sign in to comment.