diff --git a/build.gradle b/build.gradle index d062351938a77..44194856dddc9 100644 --- a/build.gradle +++ b/build.gradle @@ -2639,7 +2639,7 @@ project(':streams') { } dependencies { - api project(':clients') + api project(path: ':clients', configuration: 'shadow') // `org.rocksdb.Options` is part of Kafka Streams public api via `RocksDBConfigSetter` api libs.rocksDBJni @@ -2659,6 +2659,7 @@ project(':streams') { testImplementation libs.mockitoCore testImplementation libs.mockitoJunitJupiter // supports MockitoExtension testImplementation libs.junitPlatformSuiteEngine // supports suite test + testImplementation project(':group-coordinator') testRuntimeOnly project(':streams:test-utils') testRuntimeOnly runtimeTestLibs diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 77d668c699465..36159347f4dd3 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -418,6 +418,11 @@ + + + + + diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 30fdac4687d96..7c7cbb99baaa3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -153,7 +153,6 @@ import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName; import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData; import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; -import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; import org.apache.kafka.common.message.ListClientMetricsResourcesRequestData; import org.apache.kafka.common.message.ListGroupsRequestData; @@ -228,8 +227,6 @@ import org.apache.kafka.common.requests.ElectLeadersResponse; import org.apache.kafka.common.requests.ExpireDelegationTokenRequest; import org.apache.kafka.common.requests.ExpireDelegationTokenResponse; -import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; -import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest; import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse; import org.apache.kafka.common.requests.JoinGroupRequest; @@ -254,6 +251,8 @@ import org.apache.kafka.common.security.scram.internals.ScramFormatter; import org.apache.kafka.common.security.token.delegation.DelegationToken; import org.apache.kafka.common.security.token.delegation.TokenInformation; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.ExponentialBackoff; import org.apache.kafka.common.utils.KafkaThread; @@ -409,6 +408,7 @@ public class KafkaAdminClient extends AdminClient { private final boolean clientTelemetryEnabled; private final MetadataRecoveryStrategy metadataRecoveryStrategy; private final AdminFetchMetricsManager adminFetchMetricsManager; + private final Optional clientTelemetryReporter; /** * The telemetry requests client instance id. @@ -529,6 +529,7 @@ static KafkaAdminClient createInternal( String clientId = generateClientId(config); ApiVersions apiVersions = new ApiVersions(); LogContext logContext = createLogContext(clientId); + Optional clientTelemetryReporter; try { // Since we only request node information, it's safe to pass true for allowAutoTopicCreation (and it @@ -540,6 +541,8 @@ static KafkaAdminClient createInternal( adminAddresses.usingBootstrapControllers()); metadataManager.update(Cluster.bootstrap(adminAddresses.addresses()), time.milliseconds()); List reporters = CommonClientConfigs.metricsReporters(clientId, config); + clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config); + clientTelemetryReporter.ifPresent(reporters::add); Map metricTags = Collections.singletonMap("client-id", clientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) @@ -557,10 +560,13 @@ static KafkaAdminClient createInternal( time, 1, (int) TimeUnit.HOURS.toMillis(1), + null, metadataManager.updater(), - (hostResolver == null) ? new DefaultHostResolver() : hostResolver); + (hostResolver == null) ? new DefaultHostResolver() : hostResolver, + null, + clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null)); return new KafkaAdminClient(config, clientId, time, metadataManager, metrics, networkClient, - timeoutProcessorFactory, logContext); + timeoutProcessorFactory, logContext, clientTelemetryReporter); } catch (Throwable exc) { closeQuietly(metrics, "Metrics"); closeQuietly(networkClient, "NetworkClient"); @@ -575,12 +581,13 @@ static KafkaAdminClient createInternal(AdminClientConfig config, Time time) { Metrics metrics = null; String clientId = generateClientId(config); - + Optional clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config); + try { metrics = new Metrics(new MetricConfig(), new LinkedList<>(), time); LogContext logContext = createLogContext(clientId); return new KafkaAdminClient(config, clientId, time, metadataManager, metrics, - client, null, logContext); + client, null, logContext, clientTelemetryReporter); } catch (Throwable exc) { closeQuietly(metrics, "Metrics"); throw new KafkaException("Failed to create new KafkaAdminClient", exc); @@ -598,7 +605,8 @@ private KafkaAdminClient(AdminClientConfig config, Metrics metrics, KafkaClient client, TimeoutProcessorFactory timeoutProcessorFactory, - LogContext logContext) { + LogContext logContext, + Optional clientTelemetryReporter) { this.clientId = clientId; this.log = logContext.logger(KafkaAdminClient.class); this.logContext = logContext; @@ -622,6 +630,9 @@ private KafkaAdminClient(AdminClientConfig config, retryBackoffMaxMs, CommonClientConfigs.RETRY_BACKOFF_JITTER); this.clientTelemetryEnabled = config.getBoolean(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG); + List reporters = CommonClientConfigs.metricsReporters(this.clientId, config); + this.clientTelemetryReporter = clientTelemetryReporter; + this.clientTelemetryReporter.ifPresent(reporters::add); this.metadataRecoveryStrategy = MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG)); this.adminFetchMetricsManager = new AdminFetchMetricsManager(metrics); config.logUnused(); @@ -664,6 +675,8 @@ public void close(Duration timeout) { long now = time.milliseconds(); long newHardShutdownTimeMs = now + waitTimeMs; long prev = INVALID_SHUTDOWN_TIME; + clientTelemetryReporter.ifPresent(ClientTelemetryReporter::initiateClose); + metrics.close(); while (true) { if (hardShutdownTimeMs.compareAndSet(prev, newHardShutdownTimeMs)) { if (prev == INVALID_SHUTDOWN_TIME) { @@ -4272,12 +4285,18 @@ private KafkaFutureImpl> getMembersFromGroup(String groupId @Override public void registerMetricForSubscription(KafkaMetric metric) { - throw new UnsupportedOperationException("not implemented"); + if (clientTelemetryReporter.isPresent()) { + ClientTelemetryReporter reporter = clientTelemetryReporter.get(); + reporter.metricChange(metric); + } } @Override public void unregisterMetricFromSubscription(KafkaMetric metric) { - throw new UnsupportedOperationException("not implemented"); + if (clientTelemetryReporter.isPresent()) { + ClientTelemetryReporter reporter = clientTelemetryReporter.get(); + reporter.metricRemoval(metric); + } } @Override @@ -5050,47 +5069,16 @@ public Uuid clientInstanceId(Duration timeout) { throw new IllegalArgumentException("The timeout cannot be negative."); } - if (!clientTelemetryEnabled) { + if (clientTelemetryReporter.isEmpty()) { throw new IllegalStateException("Telemetry is not enabled. Set config `" + AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG + "` to `true`."); + } if (clientInstanceId != null) { return clientInstanceId; } - final long now = time.milliseconds(); - final KafkaFutureImpl future = new KafkaFutureImpl<>(); - runnable.call(new Call("getTelemetrySubscriptions", calcDeadlineMs(now, (int) timeout.toMillis()), - new LeastLoadedNodeProvider()) { - - @Override - GetTelemetrySubscriptionsRequest.Builder createRequest(int timeoutMs) { - return new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true); - } - - @Override - void handleResponse(AbstractResponse abstractResponse) { - GetTelemetrySubscriptionsResponse response = (GetTelemetrySubscriptionsResponse) abstractResponse; - if (response.error() != Errors.NONE) { - future.completeExceptionally(response.error().exception()); - } else { - future.complete(response.data().clientInstanceId()); - } - } - - @Override - void handleFailure(Throwable throwable) { - future.completeExceptionally(throwable); - } - }, now); - - try { - clientInstanceId = future.get(); - } catch (Exception e) { - log.error("Error occurred while fetching client instance id", e); - throw new KafkaException("Error occurred while fetching client instance id", e); - } - + clientInstanceId = ClientTelemetryUtils.fetchClientInstanceId(clientTelemetryReporter.get(), timeout); return clientInstanceId; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 1b12ae3d4b47b..8297a5aaed637 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1427,6 +1427,46 @@ public void resume(Collection partitions) { delegate.resume(partitions); } + /** + * Add the provided application metric for subscription. + * This metric will be added to this client's metrics + * that are available for subscription and sent as + * telemetry data to the broker. + * The provided metric must map to an OTLP metric data point + * type in the OpenTelemetry v1 metrics protobuf message types. + * Specifically, the metric should be one of the following: + *
    + *
  • + * `Sum`: Monotonic total count meter (Counter). Suitable for metrics like total number of X, e.g., total bytes sent. + *
  • + *
  • + * `Gauge`: Non-monotonic current value meter (UpDownCounter). Suitable for metrics like current value of Y, e.g., current queue count. + *
  • + *
+ * Metrics not matching these types are silently ignored. + * Executing this method for a previously registered metric is a benign operation and results in updating that metrics entry. + * + * @param metric The application metric to register + */ + @Override + public void registerMetricForSubscription(KafkaMetric metric) { + delegate.registerMetricForSubscription(metric); + } + + /** + * Remove the provided application metric for subscription. + * This metric is removed from this client's metrics + * and will not be available for subscription any longer. + * Executing this method with a metric that has not been registered is a + * benign operation and does not result in any action taken (no-op). + * + * @param metric The application metric to remove + */ + @Override + public void unregisterMetricFromSubscription(KafkaMetric metric) { + delegate.unregisterMetricFromSubscription(metric); + } + /** * Get the set of partitions that were previously paused by a call to {@link #pause(Collection)}. * @@ -1706,14 +1746,4 @@ KafkaConsumerMetrics kafkaConsumerMetrics() { boolean updateAssignmentMetadataIfNeeded(final Timer timer) { return delegate.updateAssignmentMetadataIfNeeded(timer); } - - @Override - public void registerMetricForSubscription(KafkaMetric metric) { - throw new UnsupportedOperationException("not implemented"); - } - - @Override - public void unregisterMetricFromSubscription(KafkaMetric metric) { - throw new UnsupportedOperationException("not implemented"); - } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index fccd69c86b851..972eba8098906 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -647,12 +647,18 @@ private void updateGroupMetadata(final Optional memberEpoch, final Stri @Override public void registerMetricForSubscription(KafkaMetric metric) { - throw new UnsupportedOperationException("not implemented"); + if (clientTelemetryReporter.isPresent()) { + ClientTelemetryReporter reporter = clientTelemetryReporter.get(); + reporter.metricChange(metric); + } } @Override public void unregisterMetricFromSubscription(KafkaMetric metric) { - throw new UnsupportedOperationException("not implemented"); + if (clientTelemetryReporter.isPresent()) { + ClientTelemetryReporter reporter = clientTelemetryReporter.get(); + reporter.metricRemoval(metric); + } } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java index 4a732f0aeb04a..82a9bd2a53bfc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java @@ -430,12 +430,18 @@ public void subscribe(Collection topics, ConsumerRebalanceListener liste @Override public void registerMetricForSubscription(KafkaMetric metric) { - throw new UnsupportedOperationException("not implemented"); + if (clientTelemetryReporter.isPresent()) { + ClientTelemetryReporter reporter = clientTelemetryReporter.get(); + reporter.metricChange(metric); + } } @Override public void unregisterMetricFromSubscription(KafkaMetric metric) { - throw new UnsupportedOperationException("not implemented"); + if (clientTelemetryReporter.isPresent()) { + ClientTelemetryReporter reporter = clientTelemetryReporter.get(); + reporter.metricRemoval(metric); + } } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 0b290318b0bd0..e1bd7f03aca80 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -57,6 +57,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.KafkaMetricsContext; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -1300,6 +1301,53 @@ public List partitionsFor(String topic) { return Collections.unmodifiableMap(this.metrics.metrics()); } + + /** + * Add the provided application metric for subscription. + * This metric will be added to this client's metrics + * that are available for subscription and sent as + * telemetry data to the broker. + * The provided metric must map to an OTLP metric data point + * type in the OpenTelemetry v1 metrics protobuf message types. + * Specifically, the metric should be one of the following: + *
    + *
  • + * `Sum`: Monotonic total count meter (Counter). Suitable for metrics like total number of X, e.g., total bytes sent. + *
  • + *
  • + * `Gauge`: Non-monotonic current value meter (UpDownCounter). Suitable for metrics like current value of Y, e.g., current queue count. + *
  • + *
+ * Metrics not matching these types are silently ignored. + * Executing this method for a previously registered metric is a benign operation and results in updating that metrics entry. + * + * @param metric The application metric to register + */ + @Override + public void registerMetricForSubscription(KafkaMetric metric) { + if (clientTelemetryReporter.isPresent()) { + ClientTelemetryReporter reporter = clientTelemetryReporter.get(); + reporter.metricChange(metric); + } + } + + /** + * Remove the provided application metric for subscription. + * This metric is removed from this client's metrics + * and will not be available for subscription any longer. + * Executing this method with a metric that has not been registered is a + * benign operation and does not result in any action taken (no-op). + * + * @param metric The application metric to remove + */ + @Override + public void unregisterMetricFromSubscription(KafkaMetric metric) { + if (clientTelemetryReporter.isPresent()) { + ClientTelemetryReporter reporter = clientTelemetryReporter.get(); + reporter.metricRemoval(metric); + } + } + /** * Determines the client's unique client instance ID used for telemetry. This ID is unique to * this specific client instance and will not change after it is initially generated. diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 971ea0194c397..3d278c40cb067 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Time; @@ -36,6 +37,7 @@ import java.time.Duration; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collections; import java.util.Deque; import java.util.HashMap; import java.util.List; @@ -71,6 +73,7 @@ public class MockProducer implements Producer { private boolean producerFenced; private boolean sentOffsets; private long commitCount = 0L; + private final List addedMetrics = new ArrayList<>(); public RuntimeException initTransactionException = null; public RuntimeException beginTransactionException = null; @@ -334,7 +337,7 @@ public synchronized Future send(ProducerRecord record, Cal keySerializer.serialize(record.topic(), record.key()); valueSerializer.serialize(record.topic(), record.value()); } - + TopicPartition topicPartition = new TopicPartition(record.topic(), partition); ProduceRequestResult result = new ProduceRequestResult(topicPartition); FutureRecordMetadata future = new FutureRecordMetadata(result, 0, RecordBatch.NO_TIMESTAMP, @@ -607,4 +610,17 @@ public void complete(RuntimeException e) { } } + public List addedMetrics() { + return Collections.unmodifiableList(addedMetrics); + } + + @Override + public void registerMetricForSubscription(KafkaMetric metric) { + addedMetrics.add(metric); + } + + @Override + public void unregisterMetricFromSubscription(KafkaMetric metric) { + addedMetrics.remove(metric); + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index 2579fa0eeb9f2..87e9d6042eeb8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.metrics.KafkaMetric; import java.io.Closeable; import java.time.Duration; @@ -71,6 +72,16 @@ void sendOffsetsToTransaction(Map offsets, */ void abortTransaction() throws ProducerFencedException; + /** + * @see KafkaProducer#registerMetricForSubscription(KafkaMetric) + */ + void registerMetricForSubscription(KafkaMetric metric); + + /** + * @see KafkaProducer#unregisterMetricFromSubscription(KafkaMetric) + */ + void unregisterMetricFromSubscription(KafkaMetric metric); + /** * See {@link KafkaProducer#send(ProducerRecord)} */ diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index a864503e1c928..bf7a624cc4ea8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -121,7 +121,6 @@ import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.FindCoordinatorResponseData; -import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse; import org.apache.kafka.common.message.InitProducerIdResponseData; @@ -198,8 +197,6 @@ import org.apache.kafka.common.requests.ElectLeadersResponse; import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; -import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; -import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse; import org.apache.kafka.common.requests.InitProducerIdRequest; import org.apache.kafka.common.requests.InitProducerIdResponse; @@ -235,6 +232,8 @@ import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourcePatternFilter; import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -248,6 +247,8 @@ import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.MockedStatic; +import org.mockito.internal.stubbing.answers.CallsRealMethods; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -300,10 +301,14 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; /** * A unit test for KafkaAdminClient. @@ -469,6 +474,35 @@ public void testExplicitlyEnableJmxReporter() { admin.close(); } + @Test + public void testExplicitlyEnableTelemetryReporter() { + Properties props = new Properties(); + props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, "true"); + try (KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props)) { + List telemetryReporterList = admin.metrics.reporters().stream() + .filter(r -> r instanceof ClientTelemetryReporter) + .map(r -> (ClientTelemetryReporter) r) + .collect(Collectors.toList()); + + assertEquals(telemetryReporterList.size(), 1); + } + } + + @Test + public void testTelemetryReporterIsDisabledByDefault() { + Properties props = new Properties(); + props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + try (KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props)) { + List telemetryReporterList = admin.metrics.reporters().stream() + .filter(r -> r instanceof ClientTelemetryReporter) + .map(r -> (ClientTelemetryReporter) r) + .collect(Collectors.toList()); + + assertTrue(telemetryReporterList.isEmpty()); + } + } + private static Cluster mockCluster(int numNodes, int controllerIndex) { HashMap nodes = new HashMap<>(); for (int i = 0; i < numNodes; i++) @@ -8333,18 +8367,21 @@ public void testFenceProducers() throws Exception { @Test public void testClientInstanceId() { - try (AdminClientUnitTestEnv env = mockClientEnv(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, "true")) { - Uuid expected = Uuid.randomUuid(); - - GetTelemetrySubscriptionsResponseData responseData = - new GetTelemetrySubscriptionsResponseData().setClientInstanceId(expected).setErrorCode(Errors.NONE.code()); - env.kafkaClient().prepareResponse( - request -> request instanceof GetTelemetrySubscriptionsRequest, - new GetTelemetrySubscriptionsResponse(responseData)); - - Uuid result = env.adminClient().clientInstanceId(Duration.ofSeconds(1)); - assertEquals(expected, result); + try (MockedStatic mockedCommonClientConfigs = mockStatic(CommonClientConfigs.class, new CallsRealMethods())) { + ClientTelemetryReporter clientTelemetryReporter = mock(ClientTelemetryReporter.class); + clientTelemetryReporter.configure(any()); + mockedCommonClientConfigs.when(() -> CommonClientConfigs.telemetryReporter(anyString(), any())).thenReturn(Optional.of(clientTelemetryReporter)); + + try (AdminClientUnitTestEnv env = mockClientEnv(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, "true")) { + ClientTelemetrySender clientTelemetrySender = mock(ClientTelemetrySender.class); + Uuid expectedUuid = Uuid.randomUuid(); + when(clientTelemetryReporter.telemetrySender()).thenReturn(clientTelemetrySender); + when(clientTelemetrySender.clientInstanceId(any())).thenReturn(Optional.of(expectedUuid)); + + Uuid result = env.adminClient().clientInstanceId(Duration.ofSeconds(1)); + assertEquals(expectedUuid, result); + } } } diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java new file mode 100644 index 0000000000000..9b1854811b716 --- /dev/null +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java @@ -0,0 +1,506 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.server.authorizer.AuthorizableRequestContext; +import org.apache.kafka.server.telemetry.ClientTelemetry; +import org.apache.kafka.server.telemetry.ClientTelemetryPayload; +import org.apache.kafka.server.telemetry.ClientTelemetryReceiver; +import org.apache.kafka.shaded.io.opentelemetry.proto.metrics.v1.MetricsData; +import org.apache.kafka.streams.ClientInstanceIds; +import org.apache.kafka.streams.KafkaClientSupplier; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.test.TestUtils; +import org.apache.kafka.tools.ClientMetricsCommand; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkObjectProperties; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Timeout(600) +@Tag("integration") +public class KafkaStreamsTelemetryIntegrationTest { + private String appId; + private String inputTopicTwoPartitions; + private String outputTopicTwoPartitions; + private String inputTopicOnePartition; + private String outputTopicOnePartition; + private Properties streamsApplicationProperties = new Properties(); + private Properties streamsSecondApplicationProperties = new Properties(); + + private static EmbeddedKafkaCluster cluster; + private static final List> INTERCEPTING_CONSUMERS = new ArrayList<>(); + private static final List INTERCEPTING_ADMIN_CLIENTS = new ArrayList<>(); + private static final int NUM_BROKERS = 3; + private static final int FIRST_INSTANCE_CLIENT = 0; + private static final int SECOND_INSTANCE_CLIENT = 1; + private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsTelemetryIntegrationTest.class); + + + @BeforeAll + public static void startCluster() throws IOException { + final Properties properties = new Properties(); + properties.put("metric.reporters", TelemetryPlugin.class.getName()); + cluster = new EmbeddedKafkaCluster(NUM_BROKERS, properties); + cluster.start(); + } + + @BeforeEach + public void setUp(final TestInfo testInfo) throws InterruptedException { + appId = safeUniqueTestName(testInfo); + inputTopicTwoPartitions = appId + "-input-two"; + outputTopicTwoPartitions = appId + "-output-two"; + inputTopicOnePartition = appId + "-input-one"; + outputTopicOnePartition = appId + "-output-one"; + cluster.createTopic(inputTopicTwoPartitions, 2, 1); + cluster.createTopic(outputTopicTwoPartitions, 2, 1); + cluster.createTopic(inputTopicOnePartition, 1, 1); + cluster.createTopic(outputTopicOnePartition, 1, 1); + } + + @AfterAll + public static void closeCluster() { + cluster.stop(); + } + + @AfterEach + public void tearDown() throws Exception { + INTERCEPTING_CONSUMERS.clear(); + INTERCEPTING_ADMIN_CLIENTS.clear(); + IntegrationTestUtils.purgeLocalStreamsState(streamsApplicationProperties); + if (!streamsSecondApplicationProperties.isEmpty()) { + IntegrationTestUtils.purgeLocalStreamsState(streamsSecondApplicationProperties); + } + } + + @ParameterizedTest + @ValueSource(strings = {"INFO", "DEBUG", "TRACE"}) + @DisplayName("End-to-end test validating metrics pushed to broker") + public void shouldPushMetricsToBroker(final String recordingLevel) throws Exception { + streamsApplicationProperties = props(true); + streamsApplicationProperties.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, recordingLevel); + final Topology topology = simpleTopology(); + subscribeForStreamsMetrics(); + try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) { + IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); + final ClientInstanceIds clientInstanceIds = streams.clientInstanceIds(Duration.ofSeconds(60)); + final Uuid adminInstanceId = clientInstanceIds.adminInstanceId(); + + final Uuid mainConsumerInstanceId = clientInstanceIds.consumerInstanceIds().entrySet().stream() + .filter(entry -> !entry.getKey().endsWith("-restore-consumer") + && !entry.getKey().endsWith("GlobalStreamThread")) + .map(Map.Entry::getValue) + .findFirst().orElseThrow(); + assertNotNull(adminInstanceId); + assertNotNull(mainConsumerInstanceId); + LOG.info("Main consumer instance id {}", mainConsumerInstanceId); + + TestUtils.waitForCondition(() -> !TelemetryPlugin.SUBSCRIBED_METRICS.get(mainConsumerInstanceId).isEmpty(), + 30_000, + "Never received subscribed metrics"); + final List expectedMetrics = streams.metrics().values().stream().map(Metric::metricName) + .filter(metricName -> metricName.tags().containsKey("thread-id")).map(mn -> { + final String name = mn.name().replace('-', '.'); + final String group = mn.group().replace("-metrics", "").replace('-', '.'); + return "org.apache.kafka." + group + "." + name; + }).sorted().collect(Collectors.toList()); + final List actualMetrics = new ArrayList<>(TelemetryPlugin.SUBSCRIBED_METRICS.get(mainConsumerInstanceId)); + assertEquals(expectedMetrics, actualMetrics); + + TestUtils.waitForCondition(() -> !TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId).isEmpty(), + 30_000, + "Never received subscribed metrics"); + final List actualInstanceMetrics = TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId); + final List expectedInstanceMetrics = Arrays.asList("org.apache.kafka.stream.alive.stream.threads", "org.apache.kafka.stream.failed.stream.threads"); + assertEquals(expectedInstanceMetrics, actualInstanceMetrics); + } + } + + @ParameterizedTest + @MethodSource("singleAndMultiTaskParameters") + @DisplayName("Streams metrics should get passed to Admin and Consumer") + public void shouldPassMetrics(final String topologyType, final boolean stateUpdaterEnabled) throws Exception { + streamsApplicationProperties = props(stateUpdaterEnabled); + final Topology topology = topologyType.equals("simple") ? simpleTopology() : complexTopology(); + + try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) { + IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); + + final List streamsThreadMetrics = streams.metrics().values().stream().map(Metric::metricName) + .filter(metricName -> metricName.tags().containsKey("thread-id")).collect(Collectors.toList()); + + final List streamsClientMetrics = streams.metrics().values().stream().map(Metric::metricName) + .filter(metricName -> metricName.group().equals("stream-metrics")).collect(Collectors.toList()); + + + + final List consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).collect(Collectors.toList()); + final List adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).collect(Collectors.toList()); + + + assertEquals(streamsThreadMetrics.size(), consumerPassedStreamThreadMetricNames.size()); + consumerPassedStreamThreadMetricNames.forEach(metricName -> assertTrue(streamsThreadMetrics.contains(metricName), "Streams metrics doesn't contain " + metricName)); + + assertEquals(streamsClientMetrics.size(), adminPassedStreamClientMetricNames.size()); + adminPassedStreamClientMetricNames.forEach(metricName -> assertTrue(streamsClientMetrics.contains(metricName), "Client metrics doesn't contain " + metricName)); + } + } + + @ParameterizedTest + @MethodSource("multiTaskParameters") + @DisplayName("Correct streams metrics should get passed with dynamic membership") + public void shouldPassCorrectMetricsDynamicInstances(final boolean stateUpdaterEnabled) throws Exception { + streamsApplicationProperties = props(stateUpdaterEnabled); + streamsApplicationProperties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath() + "-ks1"); + streamsApplicationProperties.put(StreamsConfig.CLIENT_ID_CONFIG, appId + "-ks1"); + + + streamsSecondApplicationProperties = props(stateUpdaterEnabled); + streamsSecondApplicationProperties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath() + "-ks2"); + streamsSecondApplicationProperties.put(StreamsConfig.CLIENT_ID_CONFIG, appId + "-ks2"); + + + final Topology topology = complexTopology(); + try (final KafkaStreams streamsOne = new KafkaStreams(topology, streamsApplicationProperties)) { + IntegrationTestUtils.startApplicationAndWaitUntilRunning(streamsOne); + + final List streamsTaskMetricNames = streamsOne.metrics().values().stream().map(Metric::metricName) + .filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList()); + + final List consumerPassedStreamTaskMetricNames = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName) + .filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList()); + + /* + With only one instance, Kafka Streams should register task metrics for all tasks 0_0, 0_1, 1_0, 1_1 + */ + final List streamTaskIds = getTaskIdsAsStrings(streamsOne); + final long consumerPassedTaskMetricCount = consumerPassedStreamTaskMetricNames.stream().filter(metricName -> streamTaskIds.contains(metricName.tags().get("task-id"))).count(); + assertEquals(streamsTaskMetricNames.size(), consumerPassedStreamTaskMetricNames.size()); + assertEquals(consumerPassedTaskMetricCount, streamsTaskMetricNames.size()); + + + try (final KafkaStreams streamsTwo = new KafkaStreams(topology, streamsSecondApplicationProperties)) { + streamsTwo.start(); + waitForCondition(() -> KafkaStreams.State.RUNNING == streamsTwo.state() && KafkaStreams.State.RUNNING == streamsOne.state(), + IntegrationTestUtils.DEFAULT_TIMEOUT, + () -> "Kafka Streams one or two never transitioned to a RUNNING state."); + + /* + Now with 2 instances, the tasks will get split amongst both Kafka Streams applications + */ + final List streamOneTaskIds = getTaskIdsAsStrings(streamsOne); + final List streamTwoTasksIds = getTaskIdsAsStrings(streamsTwo); + + final List streamsOneTaskMetrics = streamsOne.metrics().values().stream().map(Metric::metricName) + .filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList()); + final List streamsOneStateMetrics = streamsOne.metrics().values().stream().map(Metric::metricName) + .filter(metricName -> metricName.group().equals("stream-state-metrics")).collect(Collectors.toList()); + + final List consumerOnePassedTaskMetrics = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT) + .passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList()); + final List consumerOnePassedStateMetrics = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT) + .passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> metricName.group().equals("stream-state-metrics")).collect(Collectors.toList()); + + final List streamsTwoTaskMetrics = streamsTwo.metrics().values().stream().map(Metric::metricName) + .filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList()); + final List streamsTwoStateMetrics = streamsTwo.metrics().values().stream().map(Metric::metricName) + .filter(metricName -> metricName.group().equals("stream-state-metrics")).collect(Collectors.toList()); + + final List consumerTwoPassedTaskMetrics = INTERCEPTING_CONSUMERS.get(SECOND_INSTANCE_CLIENT) + .passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList()); + final List consumerTwoPassedStateMetrics = INTERCEPTING_CONSUMERS.get(SECOND_INSTANCE_CLIENT) + .passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> metricName.group().equals("stream-state-metrics")).collect(Collectors.toList()); + /* + Confirm pre-existing KafkaStreams instance one only passes metrics for its tasks and has no metrics for previous tasks + */ + final long consumerOneStreamOneTaskCount = consumerOnePassedTaskMetrics.stream().filter(metricName -> streamOneTaskIds.contains(metricName.tags().get("task-id"))).count(); + final long consumerOneStateMetricCount = consumerOnePassedStateMetrics.stream().filter(metricName -> streamOneTaskIds.contains(metricName.tags().get("task-id"))).count(); + final long consumerOneTaskTwoMetricCount = consumerOnePassedTaskMetrics.stream().filter(metricName -> streamTwoTasksIds.contains(metricName.tags().get("task-id"))).count(); + final long consumerOneStateTwoMetricCount = consumerOnePassedStateMetrics.stream().filter(metricName -> streamTwoTasksIds.contains(metricName.tags().get("task-id"))).count(); + + /* + Confirm new KafkaStreams instance only passes metrics for the newly assigned tasks + */ + final long consumerTwoStreamTwoTaskCount = consumerTwoPassedTaskMetrics.stream().filter(metricName -> streamTwoTasksIds.contains(metricName.tags().get("task-id"))).count(); + final long consumerTwoStateMetricCount = consumerTwoPassedStateMetrics.stream().filter(metricName -> streamTwoTasksIds.contains(metricName.tags().get("task-id"))).count(); + final long consumerTwoTaskOneMetricCount = consumerTwoPassedTaskMetrics.stream().filter(metricName -> streamOneTaskIds.contains(metricName.tags().get("task-id"))).count(); + final long consumerTwoStateMetricOneCount = consumerTwoPassedStateMetrics.stream().filter(metricName -> streamOneTaskIds.contains(metricName.tags().get("task-id"))).count(); + + assertEquals(streamsOneTaskMetrics.size(), consumerOneStreamOneTaskCount); + assertEquals(streamsOneStateMetrics.size(), consumerOneStateMetricCount); + assertEquals(0, consumerOneTaskTwoMetricCount); + assertEquals(0, consumerOneStateTwoMetricCount); + + assertEquals(streamsTwoTaskMetrics.size(), consumerTwoStreamTwoTaskCount); + assertEquals(streamsTwoStateMetrics.size(), consumerTwoStateMetricCount); + assertEquals(0, consumerTwoTaskOneMetricCount); + assertEquals(0, consumerTwoStateMetricOneCount); + } + } + } + + @Test + @DisplayName("Streams metrics should not be visible in client metrics") + public void passedMetricsShouldNotLeakIntoClientMetrics() throws Exception { + streamsApplicationProperties = props(true); + final Topology topology = complexTopology(); + + try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) { + IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); + + final List streamsThreadMetrics = streams.metrics().values().stream().map(Metric::metricName) + .filter(metricName -> metricName.tags().containsKey("thread-id")).collect(Collectors.toList()); + + final List streamsClientMetrics = streams.metrics().values().stream().map(Metric::metricName) + .filter(metricName -> metricName.group().equals("stream-metrics")).collect(Collectors.toList()); + + final Map embeddedConsumerMetrics = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).metrics(); + final Map embeddedAdminMetrics = INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).metrics(); + + streamsThreadMetrics.forEach(metricName -> assertFalse(embeddedConsumerMetrics.containsKey(metricName), "Stream thread metric found in client metrics" + metricName)); + streamsClientMetrics.forEach(metricName -> assertFalse(embeddedAdminMetrics.containsKey(metricName), "Stream client metric found in client metrics" + metricName)); + } + } + + private void subscribeForStreamsMetrics() throws Exception { + final Properties clientProps = new Properties(); + clientProps.put("bootstrap.servers", cluster.bootstrapServers()); + try (final ClientMetricsCommand.ClientMetricsService clientMetricsService = new ClientMetricsCommand.ClientMetricsService(clientProps)) { + final String[] metricsSubscriptionParameters = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--metrics", "org.apache.kafka.stream", "--alter", "--name", "streams-task-metrics-subscription", "--interval", "1000"}; + final ClientMetricsCommand.ClientMetricsCommandOptions commandOptions = new ClientMetricsCommand.ClientMetricsCommandOptions(metricsSubscriptionParameters); + clientMetricsService.alterClientMetrics(commandOptions); + } + } + private List getTaskIdsAsStrings(final KafkaStreams streams) { + return streams.metadataForLocalThreads().stream() + .flatMap(threadMeta -> threadMeta.activeTasks().stream() + .map(taskMeta -> taskMeta.taskId().toString())) + .collect(Collectors.toList()); + } + + private static Stream singleAndMultiTaskParameters() { + return Stream.of(Arguments.of("simple", true), + Arguments.of("simple", false), + Arguments.of("complex", true), + Arguments.of("complex", false)); + } + + private static Stream multiTaskParameters() { + return Stream.of(Arguments.of(true), + Arguments.of(false)); + } + + private Properties props(final boolean stateUpdaterEnabled) { + return props(mkObjectProperties(mkMap(mkEntry(StreamsConfig.InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled)))); + } + + private Properties props(final Properties extraProperties) { + final Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + streamsConfiguration.put(StreamsConfig.DEFAULT_CLIENT_SUPPLIER_CONFIG, TestClientSupplier.class); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfiguration.putAll(extraProperties); + return streamsConfiguration; + } + + private Topology complexTopology() { + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream(inputTopicTwoPartitions, Consumed.with(Serdes.String(), Serdes.String())) + .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) + .groupBy((key, value) -> value) + .count() + .toStream().to(outputTopicTwoPartitions, Produced.with(Serdes.String(), Serdes.Long())); + return builder.build(); + } + + private Topology simpleTopology() { + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream(inputTopicOnePartition, Consumed.with(Serdes.String(), Serdes.String())) + .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) + .to(outputTopicOnePartition, Produced.with(Serdes.String(), Serdes.String())); + return builder.build(); + } + + + public static class TestClientSupplier implements KafkaClientSupplier { + + @Override + public Producer getProducer(final Map config) { + return new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer()); + } + + @Override + public Consumer getConsumer(final Map config) { + final TestingMetricsInterceptingConsumer consumer = new TestingMetricsInterceptingConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + INTERCEPTING_CONSUMERS.add(consumer); + return consumer; + } + + @Override + public Consumer getRestoreConsumer(final Map config) { + return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + } + + @Override + public Consumer getGlobalConsumer(final Map config) { + return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + } + + @Override + public Admin getAdmin(final Map config) { + assertTrue((Boolean) config.get(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG)); + final TestingMetricsInterceptingAdminClient adminClient = new TestingMetricsInterceptingAdminClient(config); + INTERCEPTING_ADMIN_CLIENTS.add(adminClient); + return adminClient; + } + } + + public static class TestingMetricsInterceptingConsumer extends KafkaConsumer { + + public List passedMetrics = new ArrayList<>(); + + public TestingMetricsInterceptingConsumer(final Map configs, final Deserializer keyDeserializer, final Deserializer valueDeserializer) { + super(configs, keyDeserializer, valueDeserializer); + } + + @Override + public void registerMetricForSubscription(final KafkaMetric metric) { + passedMetrics.add(metric); + super.registerMetricForSubscription(metric); + } + + @Override + public void unregisterMetricFromSubscription(final KafkaMetric metric) { + passedMetrics.remove(metric); + super.unregisterMetricFromSubscription(metric); + } + } + + public static class TelemetryPlugin implements ClientTelemetry, MetricsReporter, ClientTelemetryReceiver { + + public static final Map> SUBSCRIBED_METRICS = new ConcurrentHashMap<>(); + public TelemetryPlugin() { + } + + @Override + public void init(final List metrics) { + } + + @Override + public void metricChange(final KafkaMetric metric) { + } + + @Override + public void metricRemoval(final KafkaMetric metric) { + } + + @Override + public void close() { + + } + + @Override + public void configure(final Map configs) { + + } + + @Override + public ClientTelemetryReceiver clientReceiver() { + return this; + } + + @Override + public void exportMetrics(final AuthorizableRequestContext context, final ClientTelemetryPayload payload) { + try { + final MetricsData data = MetricsData.parseFrom(payload.data()); + final Uuid clientId = payload.clientInstanceId(); + final List metricNames = data.getResourceMetricsList() + .stream() + .map(rm -> rm.getScopeMetricsList().get(0).getMetrics(0).getName()) + .sorted() + .collect(Collectors.toList()); + LOG.info("Found metrics {} for clientId={}", metricNames, clientId); + SUBSCRIBED_METRICS.put(clientId, metricNames); + } catch (final Exception e) { + e.printStackTrace(System.err); + } + } + } +} diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java index e1cb7411f8082..a01324ef7fd85 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java @@ -427,12 +427,14 @@ public ListShareGroupsResult listShareGroups(final ListShareGroupsOptions option @Override public void registerMetricForSubscription(final KafkaMetric metric) { - throw new UnsupportedOperationException("not implemented"); + passedMetrics.add(metric); + adminDelegate.registerMetricForSubscription(metric); } @Override public void unregisterMetricFromSubscription(final KafkaMetric metric) { - throw new UnsupportedOperationException("not implemented"); + passedMetrics.remove(metric); + adminDelegate.unregisterMetricFromSubscription(metric); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index cb6cbbfb1f404..5a59a79ef5aca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -53,6 +53,7 @@ import org.apache.kafka.streams.errors.UnknownStateStoreException; import org.apache.kafka.streams.internals.ClientInstanceIdsImpl; import org.apache.kafka.streams.internals.metrics.ClientMetrics; +import org.apache.kafka.streams.internals.metrics.StreamsClientMetricsDelegatingReporter; import org.apache.kafka.streams.processor.StandbyUpdateListener; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; @@ -1016,6 +1017,9 @@ private KafkaStreams(final TopologyMetadata topologyMetadata, log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId()); metrics = createMetrics(applicationConfigs, time, clientId); + final StreamsClientMetricsDelegatingReporter reporter = new StreamsClientMetricsDelegatingReporter(adminClient, clientId); + metrics.addReporter(reporter); + streamsMetrics = new StreamsMetricsImpl( metrics, clientId, diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index ae12240f9976d..d24575a05d88f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1230,7 +1230,7 @@ public class StreamsConfig extends AbstractConfig { private static final Map ADMIN_CLIENT_OVERRIDES; static { final Map tempAdminClientDefaultOverrides = new HashMap<>(); - tempAdminClientDefaultOverrides.put(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, "true"); + tempAdminClientDefaultOverrides.put(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, true); ADMIN_CLIENT_OVERRIDES = Collections.unmodifiableMap(tempAdminClientDefaultOverrides); } @@ -1811,7 +1811,6 @@ public Map getGlobalConsumerConfigs(final String clientId) { // add client id with stream client id prefix baseConsumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-global-consumer"); baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); - return baseConsumerProps; } @@ -1857,7 +1856,6 @@ public Map getAdminConfigs(final String clientId) { // add client id with stream client id prefix props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId); - return props; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index e45021f25c374..e492d180175ef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -48,6 +48,7 @@ import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.internals.StreamsConfigUtils; import org.apache.kafka.streams.internals.metrics.ClientMetrics; +import org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter; import org.apache.kafka.streams.processor.StandbyUpdateListener; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.TaskId; @@ -87,6 +88,9 @@ public class StreamThread extends Thread implements ProcessingThread { + private static final String THREAD_ID_SUBSTRING = "-StreamThread-"; + private static final String STATE_UPDATER_ID_SUBSTRING = "-StateUpdater-"; + /** * Stream thread states are the possible states that a stream thread can be in. * A thread must only be in one state at a time @@ -367,7 +371,8 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, final int threadIdx, final Runnable shutdownErrorHook, final BiConsumer streamsUncaughtExceptionHandler) { - final String threadId = clientId + "-StreamThread-" + threadIdx; + + final String threadId = clientId + THREAD_ID_SUBSTRING + threadIdx; final String logPrefix = String.format("stream-thread [%s] ", threadId); final LogContext logContext = new LogContext(logPrefix); @@ -473,6 +478,10 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, taskManager.setMainConsumer(mainConsumer); referenceContainer.mainConsumer = mainConsumer; + final String stateUpdaterId = threadId.replace(THREAD_ID_SUBSTRING, STATE_UPDATER_ID_SUBSTRING); + final StreamsThreadMetricsDelegatingReporter reporter = new StreamsThreadMetricsDelegatingReporter(mainConsumer, threadId, stateUpdaterId); + streamsMetrics.metricsRegistry().addReporter(reporter); + final StreamThread streamThread = new StreamThread( time, config, @@ -533,7 +542,7 @@ private static StateUpdater maybeCreateAndStartStateUpdater(final boolean stateU final String clientId, final int threadIdx) { if (stateUpdaterEnabled) { - final String name = clientId + "-StateUpdater-" + threadIdx; + final String name = clientId + STATE_UPDATER_ID_SUBSTRING + threadIdx; final StateUpdater stateUpdater = new DefaultStateUpdater( name, streamsMetrics.metricsRegistry(), diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 95948de7f4d46..31e755c3b1f8e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -383,7 +383,7 @@ public void shouldOverrideNonPrefixedCustomConfigsWithPrefixedConfigs() { @Test public void shouldOverrideAdminDefaultAdminClientEnableTelemetry() { final Map returnedProps = streamsConfig.getAdminConfigs(clientId); - assertThat(returnedProps.get(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG), equalTo("true")); + assertTrue((boolean) returnedProps.get(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG)); } @Test