diff --git a/build.gradle b/build.gradle
index d062351938a77..44194856dddc9 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2639,7 +2639,7 @@ project(':streams') {
}
dependencies {
- api project(':clients')
+ api project(path: ':clients', configuration: 'shadow')
// `org.rocksdb.Options` is part of Kafka Streams public api via `RocksDBConfigSetter`
api libs.rocksDBJni
@@ -2659,6 +2659,7 @@ project(':streams') {
testImplementation libs.mockitoCore
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
testImplementation libs.junitPlatformSuiteEngine // supports suite test
+ testImplementation project(':group-coordinator')
testRuntimeOnly project(':streams:test-utils')
testRuntimeOnly runtimeTestLibs
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 77d668c699465..36159347f4dd3 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -418,6 +418,11 @@
+
+
+
+
+
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 30fdac4687d96..7c7cbb99baaa3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -153,7 +153,6 @@
import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName;
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
-import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.ListClientMetricsResourcesRequestData;
import org.apache.kafka.common.message.ListGroupsRequestData;
@@ -228,8 +227,6 @@
import org.apache.kafka.common.requests.ElectLeadersResponse;
import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
-import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
-import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
@@ -254,6 +251,8 @@
import org.apache.kafka.common.security.scram.internals.ScramFormatter;
import org.apache.kafka.common.security.token.delegation.DelegationToken;
import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.KafkaThread;
@@ -409,6 +408,7 @@ public class KafkaAdminClient extends AdminClient {
private final boolean clientTelemetryEnabled;
private final MetadataRecoveryStrategy metadataRecoveryStrategy;
private final AdminFetchMetricsManager adminFetchMetricsManager;
+ private final Optional clientTelemetryReporter;
/**
* The telemetry requests client instance id.
@@ -529,6 +529,7 @@ static KafkaAdminClient createInternal(
String clientId = generateClientId(config);
ApiVersions apiVersions = new ApiVersions();
LogContext logContext = createLogContext(clientId);
+ Optional clientTelemetryReporter;
try {
// Since we only request node information, it's safe to pass true for allowAutoTopicCreation (and it
@@ -540,6 +541,8 @@ static KafkaAdminClient createInternal(
adminAddresses.usingBootstrapControllers());
metadataManager.update(Cluster.bootstrap(adminAddresses.addresses()), time.milliseconds());
List reporters = CommonClientConfigs.metricsReporters(clientId, config);
+ clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config);
+ clientTelemetryReporter.ifPresent(reporters::add);
Map metricTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
@@ -557,10 +560,13 @@ static KafkaAdminClient createInternal(
time,
1,
(int) TimeUnit.HOURS.toMillis(1),
+ null,
metadataManager.updater(),
- (hostResolver == null) ? new DefaultHostResolver() : hostResolver);
+ (hostResolver == null) ? new DefaultHostResolver() : hostResolver,
+ null,
+ clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null));
return new KafkaAdminClient(config, clientId, time, metadataManager, metrics, networkClient,
- timeoutProcessorFactory, logContext);
+ timeoutProcessorFactory, logContext, clientTelemetryReporter);
} catch (Throwable exc) {
closeQuietly(metrics, "Metrics");
closeQuietly(networkClient, "NetworkClient");
@@ -575,12 +581,13 @@ static KafkaAdminClient createInternal(AdminClientConfig config,
Time time) {
Metrics metrics = null;
String clientId = generateClientId(config);
-
+ Optional clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config);
+
try {
metrics = new Metrics(new MetricConfig(), new LinkedList<>(), time);
LogContext logContext = createLogContext(clientId);
return new KafkaAdminClient(config, clientId, time, metadataManager, metrics,
- client, null, logContext);
+ client, null, logContext, clientTelemetryReporter);
} catch (Throwable exc) {
closeQuietly(metrics, "Metrics");
throw new KafkaException("Failed to create new KafkaAdminClient", exc);
@@ -598,7 +605,8 @@ private KafkaAdminClient(AdminClientConfig config,
Metrics metrics,
KafkaClient client,
TimeoutProcessorFactory timeoutProcessorFactory,
- LogContext logContext) {
+ LogContext logContext,
+ Optional clientTelemetryReporter) {
this.clientId = clientId;
this.log = logContext.logger(KafkaAdminClient.class);
this.logContext = logContext;
@@ -622,6 +630,9 @@ private KafkaAdminClient(AdminClientConfig config,
retryBackoffMaxMs,
CommonClientConfigs.RETRY_BACKOFF_JITTER);
this.clientTelemetryEnabled = config.getBoolean(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG);
+ List reporters = CommonClientConfigs.metricsReporters(this.clientId, config);
+ this.clientTelemetryReporter = clientTelemetryReporter;
+ this.clientTelemetryReporter.ifPresent(reporters::add);
this.metadataRecoveryStrategy = MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG));
this.adminFetchMetricsManager = new AdminFetchMetricsManager(metrics);
config.logUnused();
@@ -664,6 +675,8 @@ public void close(Duration timeout) {
long now = time.milliseconds();
long newHardShutdownTimeMs = now + waitTimeMs;
long prev = INVALID_SHUTDOWN_TIME;
+ clientTelemetryReporter.ifPresent(ClientTelemetryReporter::initiateClose);
+ metrics.close();
while (true) {
if (hardShutdownTimeMs.compareAndSet(prev, newHardShutdownTimeMs)) {
if (prev == INVALID_SHUTDOWN_TIME) {
@@ -4272,12 +4285,18 @@ private KafkaFutureImpl> getMembersFromGroup(String groupId
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
- throw new UnsupportedOperationException("not implemented");
+ if (clientTelemetryReporter.isPresent()) {
+ ClientTelemetryReporter reporter = clientTelemetryReporter.get();
+ reporter.metricChange(metric);
+ }
}
@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
- throw new UnsupportedOperationException("not implemented");
+ if (clientTelemetryReporter.isPresent()) {
+ ClientTelemetryReporter reporter = clientTelemetryReporter.get();
+ reporter.metricRemoval(metric);
+ }
}
@Override
@@ -5050,47 +5069,16 @@ public Uuid clientInstanceId(Duration timeout) {
throw new IllegalArgumentException("The timeout cannot be negative.");
}
- if (!clientTelemetryEnabled) {
+ if (clientTelemetryReporter.isEmpty()) {
throw new IllegalStateException("Telemetry is not enabled. Set config `" + AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG + "` to `true`.");
+
}
if (clientInstanceId != null) {
return clientInstanceId;
}
- final long now = time.milliseconds();
- final KafkaFutureImpl future = new KafkaFutureImpl<>();
- runnable.call(new Call("getTelemetrySubscriptions", calcDeadlineMs(now, (int) timeout.toMillis()),
- new LeastLoadedNodeProvider()) {
-
- @Override
- GetTelemetrySubscriptionsRequest.Builder createRequest(int timeoutMs) {
- return new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true);
- }
-
- @Override
- void handleResponse(AbstractResponse abstractResponse) {
- GetTelemetrySubscriptionsResponse response = (GetTelemetrySubscriptionsResponse) abstractResponse;
- if (response.error() != Errors.NONE) {
- future.completeExceptionally(response.error().exception());
- } else {
- future.complete(response.data().clientInstanceId());
- }
- }
-
- @Override
- void handleFailure(Throwable throwable) {
- future.completeExceptionally(throwable);
- }
- }, now);
-
- try {
- clientInstanceId = future.get();
- } catch (Exception e) {
- log.error("Error occurred while fetching client instance id", e);
- throw new KafkaException("Error occurred while fetching client instance id", e);
- }
-
+ clientInstanceId = ClientTelemetryUtils.fetchClientInstanceId(clientTelemetryReporter.get(), timeout);
return clientInstanceId;
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 1b12ae3d4b47b..8297a5aaed637 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1427,6 +1427,46 @@ public void resume(Collection partitions) {
delegate.resume(partitions);
}
+ /**
+ * Add the provided application metric for subscription.
+ * This metric will be added to this client's metrics
+ * that are available for subscription and sent as
+ * telemetry data to the broker.
+ * The provided metric must map to an OTLP metric data point
+ * type in the OpenTelemetry v1 metrics protobuf message types.
+ * Specifically, the metric should be one of the following:
+ *
+ * -
+ * `Sum`: Monotonic total count meter (Counter). Suitable for metrics like total number of X, e.g., total bytes sent.
+ *
+ * -
+ * `Gauge`: Non-monotonic current value meter (UpDownCounter). Suitable for metrics like current value of Y, e.g., current queue count.
+ *
+ *
+ * Metrics not matching these types are silently ignored.
+ * Executing this method for a previously registered metric is a benign operation and results in updating that metrics entry.
+ *
+ * @param metric The application metric to register
+ */
+ @Override
+ public void registerMetricForSubscription(KafkaMetric metric) {
+ delegate.registerMetricForSubscription(metric);
+ }
+
+ /**
+ * Remove the provided application metric for subscription.
+ * This metric is removed from this client's metrics
+ * and will not be available for subscription any longer.
+ * Executing this method with a metric that has not been registered is a
+ * benign operation and does not result in any action taken (no-op).
+ *
+ * @param metric The application metric to remove
+ */
+ @Override
+ public void unregisterMetricFromSubscription(KafkaMetric metric) {
+ delegate.unregisterMetricFromSubscription(metric);
+ }
+
/**
* Get the set of partitions that were previously paused by a call to {@link #pause(Collection)}.
*
@@ -1706,14 +1746,4 @@ KafkaConsumerMetrics kafkaConsumerMetrics() {
boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
return delegate.updateAssignmentMetadataIfNeeded(timer);
}
-
- @Override
- public void registerMetricForSubscription(KafkaMetric metric) {
- throw new UnsupportedOperationException("not implemented");
- }
-
- @Override
- public void unregisterMetricFromSubscription(KafkaMetric metric) {
- throw new UnsupportedOperationException("not implemented");
- }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index fccd69c86b851..972eba8098906 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -647,12 +647,18 @@ private void updateGroupMetadata(final Optional memberEpoch, final Stri
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
- throw new UnsupportedOperationException("not implemented");
+ if (clientTelemetryReporter.isPresent()) {
+ ClientTelemetryReporter reporter = clientTelemetryReporter.get();
+ reporter.metricChange(metric);
+ }
}
@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
- throw new UnsupportedOperationException("not implemented");
+ if (clientTelemetryReporter.isPresent()) {
+ ClientTelemetryReporter reporter = clientTelemetryReporter.get();
+ reporter.metricRemoval(metric);
+ }
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index 4a732f0aeb04a..82a9bd2a53bfc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -430,12 +430,18 @@ public void subscribe(Collection topics, ConsumerRebalanceListener liste
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
- throw new UnsupportedOperationException("not implemented");
+ if (clientTelemetryReporter.isPresent()) {
+ ClientTelemetryReporter reporter = clientTelemetryReporter.get();
+ reporter.metricChange(metric);
+ }
}
@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
- throw new UnsupportedOperationException("not implemented");
+ if (clientTelemetryReporter.isPresent()) {
+ ClientTelemetryReporter reporter = clientTelemetryReporter.get();
+ reporter.metricRemoval(metric);
+ }
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 0b290318b0bd0..e1bd7f03aca80 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -57,6 +57,7 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -1300,6 +1301,53 @@ public List partitionsFor(String topic) {
return Collections.unmodifiableMap(this.metrics.metrics());
}
+
+ /**
+ * Add the provided application metric for subscription.
+ * This metric will be added to this client's metrics
+ * that are available for subscription and sent as
+ * telemetry data to the broker.
+ * The provided metric must map to an OTLP metric data point
+ * type in the OpenTelemetry v1 metrics protobuf message types.
+ * Specifically, the metric should be one of the following:
+ *
+ * -
+ * `Sum`: Monotonic total count meter (Counter). Suitable for metrics like total number of X, e.g., total bytes sent.
+ *
+ * -
+ * `Gauge`: Non-monotonic current value meter (UpDownCounter). Suitable for metrics like current value of Y, e.g., current queue count.
+ *
+ *
+ * Metrics not matching these types are silently ignored.
+ * Executing this method for a previously registered metric is a benign operation and results in updating that metrics entry.
+ *
+ * @param metric The application metric to register
+ */
+ @Override
+ public void registerMetricForSubscription(KafkaMetric metric) {
+ if (clientTelemetryReporter.isPresent()) {
+ ClientTelemetryReporter reporter = clientTelemetryReporter.get();
+ reporter.metricChange(metric);
+ }
+ }
+
+ /**
+ * Remove the provided application metric for subscription.
+ * This metric is removed from this client's metrics
+ * and will not be available for subscription any longer.
+ * Executing this method with a metric that has not been registered is a
+ * benign operation and does not result in any action taken (no-op).
+ *
+ * @param metric The application metric to remove
+ */
+ @Override
+ public void unregisterMetricFromSubscription(KafkaMetric metric) {
+ if (clientTelemetryReporter.isPresent()) {
+ ClientTelemetryReporter reporter = clientTelemetryReporter.get();
+ reporter.metricRemoval(metric);
+ }
+ }
+
/**
* Determines the client's unique client instance ID used for telemetry. This ID is unique to
* this specific client instance and will not change after it is initially generated.
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 971ea0194c397..3d278c40cb067 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -29,6 +29,7 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Time;
@@ -36,6 +37,7 @@
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
@@ -71,6 +73,7 @@ public class MockProducer implements Producer {
private boolean producerFenced;
private boolean sentOffsets;
private long commitCount = 0L;
+ private final List addedMetrics = new ArrayList<>();
public RuntimeException initTransactionException = null;
public RuntimeException beginTransactionException = null;
@@ -334,7 +337,7 @@ public synchronized Future send(ProducerRecord record, Cal
keySerializer.serialize(record.topic(), record.key());
valueSerializer.serialize(record.topic(), record.value());
}
-
+
TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
ProduceRequestResult result = new ProduceRequestResult(topicPartition);
FutureRecordMetadata future = new FutureRecordMetadata(result, 0, RecordBatch.NO_TIMESTAMP,
@@ -607,4 +610,17 @@ public void complete(RuntimeException e) {
}
}
+ public List addedMetrics() {
+ return Collections.unmodifiableList(addedMetrics);
+ }
+
+ @Override
+ public void registerMetricForSubscription(KafkaMetric metric) {
+ addedMetrics.add(metric);
+ }
+
+ @Override
+ public void unregisterMetricFromSubscription(KafkaMetric metric) {
+ addedMetrics.remove(metric);
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index 2579fa0eeb9f2..87e9d6042eeb8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -24,6 +24,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.metrics.KafkaMetric;
import java.io.Closeable;
import java.time.Duration;
@@ -71,6 +72,16 @@ void sendOffsetsToTransaction(Map offsets,
*/
void abortTransaction() throws ProducerFencedException;
+ /**
+ * @see KafkaProducer#registerMetricForSubscription(KafkaMetric)
+ */
+ void registerMetricForSubscription(KafkaMetric metric);
+
+ /**
+ * @see KafkaProducer#unregisterMetricFromSubscription(KafkaMetric)
+ */
+ void unregisterMetricFromSubscription(KafkaMetric metric);
+
/**
* See {@link KafkaProducer#send(ProducerRecord)}
*/
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index a864503e1c928..bf7a624cc4ea8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -121,7 +121,6 @@
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
-import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
import org.apache.kafka.common.message.InitProducerIdResponseData;
@@ -198,8 +197,6 @@
import org.apache.kafka.common.requests.ElectLeadersResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
-import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
-import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
@@ -235,6 +232,8 @@
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -248,6 +247,8 @@
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.MockedStatic;
+import org.mockito.internal.stubbing.answers.CallsRealMethods;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -300,10 +301,14 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
/**
* A unit test for KafkaAdminClient.
@@ -469,6 +474,35 @@ public void testExplicitlyEnableJmxReporter() {
admin.close();
}
+ @Test
+ public void testExplicitlyEnableTelemetryReporter() {
+ Properties props = new Properties();
+ props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.setProperty(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, "true");
+ try (KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props)) {
+ List telemetryReporterList = admin.metrics.reporters().stream()
+ .filter(r -> r instanceof ClientTelemetryReporter)
+ .map(r -> (ClientTelemetryReporter) r)
+ .collect(Collectors.toList());
+
+ assertEquals(telemetryReporterList.size(), 1);
+ }
+ }
+
+ @Test
+ public void testTelemetryReporterIsDisabledByDefault() {
+ Properties props = new Properties();
+ props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ try (KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props)) {
+ List telemetryReporterList = admin.metrics.reporters().stream()
+ .filter(r -> r instanceof ClientTelemetryReporter)
+ .map(r -> (ClientTelemetryReporter) r)
+ .collect(Collectors.toList());
+
+ assertTrue(telemetryReporterList.isEmpty());
+ }
+ }
+
private static Cluster mockCluster(int numNodes, int controllerIndex) {
HashMap nodes = new HashMap<>();
for (int i = 0; i < numNodes; i++)
@@ -8333,18 +8367,21 @@ public void testFenceProducers() throws Exception {
@Test
public void testClientInstanceId() {
- try (AdminClientUnitTestEnv env = mockClientEnv(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, "true")) {
- Uuid expected = Uuid.randomUuid();
-
- GetTelemetrySubscriptionsResponseData responseData =
- new GetTelemetrySubscriptionsResponseData().setClientInstanceId(expected).setErrorCode(Errors.NONE.code());
- env.kafkaClient().prepareResponse(
- request -> request instanceof GetTelemetrySubscriptionsRequest,
- new GetTelemetrySubscriptionsResponse(responseData));
-
- Uuid result = env.adminClient().clientInstanceId(Duration.ofSeconds(1));
- assertEquals(expected, result);
+ try (MockedStatic mockedCommonClientConfigs = mockStatic(CommonClientConfigs.class, new CallsRealMethods())) {
+ ClientTelemetryReporter clientTelemetryReporter = mock(ClientTelemetryReporter.class);
+ clientTelemetryReporter.configure(any());
+ mockedCommonClientConfigs.when(() -> CommonClientConfigs.telemetryReporter(anyString(), any())).thenReturn(Optional.of(clientTelemetryReporter));
+
+ try (AdminClientUnitTestEnv env = mockClientEnv(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, "true")) {
+ ClientTelemetrySender clientTelemetrySender = mock(ClientTelemetrySender.class);
+ Uuid expectedUuid = Uuid.randomUuid();
+ when(clientTelemetryReporter.telemetrySender()).thenReturn(clientTelemetrySender);
+ when(clientTelemetrySender.clientInstanceId(any())).thenReturn(Optional.of(expectedUuid));
+
+ Uuid result = env.adminClient().clientInstanceId(Duration.ofSeconds(1));
+ assertEquals(expectedUuid, result);
+ }
}
}
diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
new file mode 100644
index 0000000000000..9b1854811b716
--- /dev/null
+++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.telemetry.ClientTelemetry;
+import org.apache.kafka.server.telemetry.ClientTelemetryPayload;
+import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
+import org.apache.kafka.shaded.io.opentelemetry.proto.metrics.v1.MetricsData;
+import org.apache.kafka.streams.ClientInstanceIds;
+import org.apache.kafka.streams.KafkaClientSupplier;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.tools.ClientMetricsCommand;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Timeout(600)
+@Tag("integration")
+public class KafkaStreamsTelemetryIntegrationTest {
+ private String appId;
+ private String inputTopicTwoPartitions;
+ private String outputTopicTwoPartitions;
+ private String inputTopicOnePartition;
+ private String outputTopicOnePartition;
+ private Properties streamsApplicationProperties = new Properties();
+ private Properties streamsSecondApplicationProperties = new Properties();
+
+ private static EmbeddedKafkaCluster cluster;
+ private static final List> INTERCEPTING_CONSUMERS = new ArrayList<>();
+ private static final List INTERCEPTING_ADMIN_CLIENTS = new ArrayList<>();
+ private static final int NUM_BROKERS = 3;
+ private static final int FIRST_INSTANCE_CLIENT = 0;
+ private static final int SECOND_INSTANCE_CLIENT = 1;
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsTelemetryIntegrationTest.class);
+
+
+ @BeforeAll
+ public static void startCluster() throws IOException {
+ final Properties properties = new Properties();
+ properties.put("metric.reporters", TelemetryPlugin.class.getName());
+ cluster = new EmbeddedKafkaCluster(NUM_BROKERS, properties);
+ cluster.start();
+ }
+
+ @BeforeEach
+ public void setUp(final TestInfo testInfo) throws InterruptedException {
+ appId = safeUniqueTestName(testInfo);
+ inputTopicTwoPartitions = appId + "-input-two";
+ outputTopicTwoPartitions = appId + "-output-two";
+ inputTopicOnePartition = appId + "-input-one";
+ outputTopicOnePartition = appId + "-output-one";
+ cluster.createTopic(inputTopicTwoPartitions, 2, 1);
+ cluster.createTopic(outputTopicTwoPartitions, 2, 1);
+ cluster.createTopic(inputTopicOnePartition, 1, 1);
+ cluster.createTopic(outputTopicOnePartition, 1, 1);
+ }
+
+ @AfterAll
+ public static void closeCluster() {
+ cluster.stop();
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ INTERCEPTING_CONSUMERS.clear();
+ INTERCEPTING_ADMIN_CLIENTS.clear();
+ IntegrationTestUtils.purgeLocalStreamsState(streamsApplicationProperties);
+ if (!streamsSecondApplicationProperties.isEmpty()) {
+ IntegrationTestUtils.purgeLocalStreamsState(streamsSecondApplicationProperties);
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"INFO", "DEBUG", "TRACE"})
+ @DisplayName("End-to-end test validating metrics pushed to broker")
+ public void shouldPushMetricsToBroker(final String recordingLevel) throws Exception {
+ streamsApplicationProperties = props(true);
+ streamsApplicationProperties.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, recordingLevel);
+ final Topology topology = simpleTopology();
+ subscribeForStreamsMetrics();
+ try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) {
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+ final ClientInstanceIds clientInstanceIds = streams.clientInstanceIds(Duration.ofSeconds(60));
+ final Uuid adminInstanceId = clientInstanceIds.adminInstanceId();
+
+ final Uuid mainConsumerInstanceId = clientInstanceIds.consumerInstanceIds().entrySet().stream()
+ .filter(entry -> !entry.getKey().endsWith("-restore-consumer")
+ && !entry.getKey().endsWith("GlobalStreamThread"))
+ .map(Map.Entry::getValue)
+ .findFirst().orElseThrow();
+ assertNotNull(adminInstanceId);
+ assertNotNull(mainConsumerInstanceId);
+ LOG.info("Main consumer instance id {}", mainConsumerInstanceId);
+
+ TestUtils.waitForCondition(() -> !TelemetryPlugin.SUBSCRIBED_METRICS.get(mainConsumerInstanceId).isEmpty(),
+ 30_000,
+ "Never received subscribed metrics");
+ final List expectedMetrics = streams.metrics().values().stream().map(Metric::metricName)
+ .filter(metricName -> metricName.tags().containsKey("thread-id")).map(mn -> {
+ final String name = mn.name().replace('-', '.');
+ final String group = mn.group().replace("-metrics", "").replace('-', '.');
+ return "org.apache.kafka." + group + "." + name;
+ }).sorted().collect(Collectors.toList());
+ final List actualMetrics = new ArrayList<>(TelemetryPlugin.SUBSCRIBED_METRICS.get(mainConsumerInstanceId));
+ assertEquals(expectedMetrics, actualMetrics);
+
+ TestUtils.waitForCondition(() -> !TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId).isEmpty(),
+ 30_000,
+ "Never received subscribed metrics");
+ final List actualInstanceMetrics = TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId);
+ final List expectedInstanceMetrics = Arrays.asList("org.apache.kafka.stream.alive.stream.threads", "org.apache.kafka.stream.failed.stream.threads");
+ assertEquals(expectedInstanceMetrics, actualInstanceMetrics);
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("singleAndMultiTaskParameters")
+ @DisplayName("Streams metrics should get passed to Admin and Consumer")
+ public void shouldPassMetrics(final String topologyType, final boolean stateUpdaterEnabled) throws Exception {
+ streamsApplicationProperties = props(stateUpdaterEnabled);
+ final Topology topology = topologyType.equals("simple") ? simpleTopology() : complexTopology();
+
+ try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) {
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
+ final List streamsThreadMetrics = streams.metrics().values().stream().map(Metric::metricName)
+ .filter(metricName -> metricName.tags().containsKey("thread-id")).collect(Collectors.toList());
+
+ final List streamsClientMetrics = streams.metrics().values().stream().map(Metric::metricName)
+ .filter(metricName -> metricName.group().equals("stream-metrics")).collect(Collectors.toList());
+
+
+
+ final List consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).collect(Collectors.toList());
+ final List adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).collect(Collectors.toList());
+
+
+ assertEquals(streamsThreadMetrics.size(), consumerPassedStreamThreadMetricNames.size());
+ consumerPassedStreamThreadMetricNames.forEach(metricName -> assertTrue(streamsThreadMetrics.contains(metricName), "Streams metrics doesn't contain " + metricName));
+
+ assertEquals(streamsClientMetrics.size(), adminPassedStreamClientMetricNames.size());
+ adminPassedStreamClientMetricNames.forEach(metricName -> assertTrue(streamsClientMetrics.contains(metricName), "Client metrics doesn't contain " + metricName));
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("multiTaskParameters")
+ @DisplayName("Correct streams metrics should get passed with dynamic membership")
+ public void shouldPassCorrectMetricsDynamicInstances(final boolean stateUpdaterEnabled) throws Exception {
+ streamsApplicationProperties = props(stateUpdaterEnabled);
+ streamsApplicationProperties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath() + "-ks1");
+ streamsApplicationProperties.put(StreamsConfig.CLIENT_ID_CONFIG, appId + "-ks1");
+
+
+ streamsSecondApplicationProperties = props(stateUpdaterEnabled);
+ streamsSecondApplicationProperties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath() + "-ks2");
+ streamsSecondApplicationProperties.put(StreamsConfig.CLIENT_ID_CONFIG, appId + "-ks2");
+
+
+ final Topology topology = complexTopology();
+ try (final KafkaStreams streamsOne = new KafkaStreams(topology, streamsApplicationProperties)) {
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streamsOne);
+
+ final List streamsTaskMetricNames = streamsOne.metrics().values().stream().map(Metric::metricName)
+ .filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+
+ final List consumerPassedStreamTaskMetricNames = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName)
+ .filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+
+ /*
+ With only one instance, Kafka Streams should register task metrics for all tasks 0_0, 0_1, 1_0, 1_1
+ */
+ final List streamTaskIds = getTaskIdsAsStrings(streamsOne);
+ final long consumerPassedTaskMetricCount = consumerPassedStreamTaskMetricNames.stream().filter(metricName -> streamTaskIds.contains(metricName.tags().get("task-id"))).count();
+ assertEquals(streamsTaskMetricNames.size(), consumerPassedStreamTaskMetricNames.size());
+ assertEquals(consumerPassedTaskMetricCount, streamsTaskMetricNames.size());
+
+
+ try (final KafkaStreams streamsTwo = new KafkaStreams(topology, streamsSecondApplicationProperties)) {
+ streamsTwo.start();
+ waitForCondition(() -> KafkaStreams.State.RUNNING == streamsTwo.state() && KafkaStreams.State.RUNNING == streamsOne.state(),
+ IntegrationTestUtils.DEFAULT_TIMEOUT,
+ () -> "Kafka Streams one or two never transitioned to a RUNNING state.");
+
+ /*
+ Now with 2 instances, the tasks will get split amongst both Kafka Streams applications
+ */
+ final List streamOneTaskIds = getTaskIdsAsStrings(streamsOne);
+ final List streamTwoTasksIds = getTaskIdsAsStrings(streamsTwo);
+
+ final List streamsOneTaskMetrics = streamsOne.metrics().values().stream().map(Metric::metricName)
+ .filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+ final List streamsOneStateMetrics = streamsOne.metrics().values().stream().map(Metric::metricName)
+ .filter(metricName -> metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
+
+ final List consumerOnePassedTaskMetrics = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT)
+ .passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+ final List consumerOnePassedStateMetrics = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT)
+ .passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
+
+ final List streamsTwoTaskMetrics = streamsTwo.metrics().values().stream().map(Metric::metricName)
+ .filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+ final List streamsTwoStateMetrics = streamsTwo.metrics().values().stream().map(Metric::metricName)
+ .filter(metricName -> metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
+
+ final List consumerTwoPassedTaskMetrics = INTERCEPTING_CONSUMERS.get(SECOND_INSTANCE_CLIENT)
+ .passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+ final List consumerTwoPassedStateMetrics = INTERCEPTING_CONSUMERS.get(SECOND_INSTANCE_CLIENT)
+ .passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
+ /*
+ Confirm pre-existing KafkaStreams instance one only passes metrics for its tasks and has no metrics for previous tasks
+ */
+ final long consumerOneStreamOneTaskCount = consumerOnePassedTaskMetrics.stream().filter(metricName -> streamOneTaskIds.contains(metricName.tags().get("task-id"))).count();
+ final long consumerOneStateMetricCount = consumerOnePassedStateMetrics.stream().filter(metricName -> streamOneTaskIds.contains(metricName.tags().get("task-id"))).count();
+ final long consumerOneTaskTwoMetricCount = consumerOnePassedTaskMetrics.stream().filter(metricName -> streamTwoTasksIds.contains(metricName.tags().get("task-id"))).count();
+ final long consumerOneStateTwoMetricCount = consumerOnePassedStateMetrics.stream().filter(metricName -> streamTwoTasksIds.contains(metricName.tags().get("task-id"))).count();
+
+ /*
+ Confirm new KafkaStreams instance only passes metrics for the newly assigned tasks
+ */
+ final long consumerTwoStreamTwoTaskCount = consumerTwoPassedTaskMetrics.stream().filter(metricName -> streamTwoTasksIds.contains(metricName.tags().get("task-id"))).count();
+ final long consumerTwoStateMetricCount = consumerTwoPassedStateMetrics.stream().filter(metricName -> streamTwoTasksIds.contains(metricName.tags().get("task-id"))).count();
+ final long consumerTwoTaskOneMetricCount = consumerTwoPassedTaskMetrics.stream().filter(metricName -> streamOneTaskIds.contains(metricName.tags().get("task-id"))).count();
+ final long consumerTwoStateMetricOneCount = consumerTwoPassedStateMetrics.stream().filter(metricName -> streamOneTaskIds.contains(metricName.tags().get("task-id"))).count();
+
+ assertEquals(streamsOneTaskMetrics.size(), consumerOneStreamOneTaskCount);
+ assertEquals(streamsOneStateMetrics.size(), consumerOneStateMetricCount);
+ assertEquals(0, consumerOneTaskTwoMetricCount);
+ assertEquals(0, consumerOneStateTwoMetricCount);
+
+ assertEquals(streamsTwoTaskMetrics.size(), consumerTwoStreamTwoTaskCount);
+ assertEquals(streamsTwoStateMetrics.size(), consumerTwoStateMetricCount);
+ assertEquals(0, consumerTwoTaskOneMetricCount);
+ assertEquals(0, consumerTwoStateMetricOneCount);
+ }
+ }
+ }
+
+ @Test
+ @DisplayName("Streams metrics should not be visible in client metrics")
+ public void passedMetricsShouldNotLeakIntoClientMetrics() throws Exception {
+ streamsApplicationProperties = props(true);
+ final Topology topology = complexTopology();
+
+ try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) {
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
+ final List streamsThreadMetrics = streams.metrics().values().stream().map(Metric::metricName)
+ .filter(metricName -> metricName.tags().containsKey("thread-id")).collect(Collectors.toList());
+
+ final List streamsClientMetrics = streams.metrics().values().stream().map(Metric::metricName)
+ .filter(metricName -> metricName.group().equals("stream-metrics")).collect(Collectors.toList());
+
+ final Map embeddedConsumerMetrics = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).metrics();
+ final Map embeddedAdminMetrics = INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).metrics();
+
+ streamsThreadMetrics.forEach(metricName -> assertFalse(embeddedConsumerMetrics.containsKey(metricName), "Stream thread metric found in client metrics" + metricName));
+ streamsClientMetrics.forEach(metricName -> assertFalse(embeddedAdminMetrics.containsKey(metricName), "Stream client metric found in client metrics" + metricName));
+ }
+ }
+
+ private void subscribeForStreamsMetrics() throws Exception {
+ final Properties clientProps = new Properties();
+ clientProps.put("bootstrap.servers", cluster.bootstrapServers());
+ try (final ClientMetricsCommand.ClientMetricsService clientMetricsService = new ClientMetricsCommand.ClientMetricsService(clientProps)) {
+ final String[] metricsSubscriptionParameters = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--metrics", "org.apache.kafka.stream", "--alter", "--name", "streams-task-metrics-subscription", "--interval", "1000"};
+ final ClientMetricsCommand.ClientMetricsCommandOptions commandOptions = new ClientMetricsCommand.ClientMetricsCommandOptions(metricsSubscriptionParameters);
+ clientMetricsService.alterClientMetrics(commandOptions);
+ }
+ }
+ private List getTaskIdsAsStrings(final KafkaStreams streams) {
+ return streams.metadataForLocalThreads().stream()
+ .flatMap(threadMeta -> threadMeta.activeTasks().stream()
+ .map(taskMeta -> taskMeta.taskId().toString()))
+ .collect(Collectors.toList());
+ }
+
+ private static Stream singleAndMultiTaskParameters() {
+ return Stream.of(Arguments.of("simple", true),
+ Arguments.of("simple", false),
+ Arguments.of("complex", true),
+ Arguments.of("complex", false));
+ }
+
+ private static Stream multiTaskParameters() {
+ return Stream.of(Arguments.of(true),
+ Arguments.of(false));
+ }
+
+ private Properties props(final boolean stateUpdaterEnabled) {
+ return props(mkObjectProperties(mkMap(mkEntry(StreamsConfig.InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled))));
+ }
+
+ private Properties props(final Properties extraProperties) {
+ final Properties streamsConfiguration = new Properties();
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+ streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+ streamsConfiguration.put(StreamsConfig.DEFAULT_CLIENT_SUPPLIER_CONFIG, TestClientSupplier.class);
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ streamsConfiguration.putAll(extraProperties);
+ return streamsConfiguration;
+ }
+
+ private Topology complexTopology() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder.stream(inputTopicTwoPartitions, Consumed.with(Serdes.String(), Serdes.String()))
+ .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
+ .groupBy((key, value) -> value)
+ .count()
+ .toStream().to(outputTopicTwoPartitions, Produced.with(Serdes.String(), Serdes.Long()));
+ return builder.build();
+ }
+
+ private Topology simpleTopology() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder.stream(inputTopicOnePartition, Consumed.with(Serdes.String(), Serdes.String()))
+ .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
+ .to(outputTopicOnePartition, Produced.with(Serdes.String(), Serdes.String()));
+ return builder.build();
+ }
+
+
+ public static class TestClientSupplier implements KafkaClientSupplier {
+
+ @Override
+ public Producer getProducer(final Map config) {
+ return new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer());
+ }
+
+ @Override
+ public Consumer getConsumer(final Map config) {
+ final TestingMetricsInterceptingConsumer consumer = new TestingMetricsInterceptingConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ INTERCEPTING_CONSUMERS.add(consumer);
+ return consumer;
+ }
+
+ @Override
+ public Consumer getRestoreConsumer(final Map config) {
+ return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ }
+
+ @Override
+ public Consumer getGlobalConsumer(final Map config) {
+ return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ }
+
+ @Override
+ public Admin getAdmin(final Map config) {
+ assertTrue((Boolean) config.get(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG));
+ final TestingMetricsInterceptingAdminClient adminClient = new TestingMetricsInterceptingAdminClient(config);
+ INTERCEPTING_ADMIN_CLIENTS.add(adminClient);
+ return adminClient;
+ }
+ }
+
+ public static class TestingMetricsInterceptingConsumer extends KafkaConsumer {
+
+ public List passedMetrics = new ArrayList<>();
+
+ public TestingMetricsInterceptingConsumer(final Map configs, final Deserializer keyDeserializer, final Deserializer valueDeserializer) {
+ super(configs, keyDeserializer, valueDeserializer);
+ }
+
+ @Override
+ public void registerMetricForSubscription(final KafkaMetric metric) {
+ passedMetrics.add(metric);
+ super.registerMetricForSubscription(metric);
+ }
+
+ @Override
+ public void unregisterMetricFromSubscription(final KafkaMetric metric) {
+ passedMetrics.remove(metric);
+ super.unregisterMetricFromSubscription(metric);
+ }
+ }
+
+ public static class TelemetryPlugin implements ClientTelemetry, MetricsReporter, ClientTelemetryReceiver {
+
+ public static final Map> SUBSCRIBED_METRICS = new ConcurrentHashMap<>();
+ public TelemetryPlugin() {
+ }
+
+ @Override
+ public void init(final List metrics) {
+ }
+
+ @Override
+ public void metricChange(final KafkaMetric metric) {
+ }
+
+ @Override
+ public void metricRemoval(final KafkaMetric metric) {
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void configure(final Map configs) {
+
+ }
+
+ @Override
+ public ClientTelemetryReceiver clientReceiver() {
+ return this;
+ }
+
+ @Override
+ public void exportMetrics(final AuthorizableRequestContext context, final ClientTelemetryPayload payload) {
+ try {
+ final MetricsData data = MetricsData.parseFrom(payload.data());
+ final Uuid clientId = payload.clientInstanceId();
+ final List metricNames = data.getResourceMetricsList()
+ .stream()
+ .map(rm -> rm.getScopeMetricsList().get(0).getMetrics(0).getName())
+ .sorted()
+ .collect(Collectors.toList());
+ LOG.info("Found metrics {} for clientId={}", metricNames, clientId);
+ SUBSCRIBED_METRICS.put(clientId, metricNames);
+ } catch (final Exception e) {
+ e.printStackTrace(System.err);
+ }
+ }
+ }
+}
diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
index e1cb7411f8082..a01324ef7fd85 100644
--- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
+++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
@@ -427,12 +427,14 @@ public ListShareGroupsResult listShareGroups(final ListShareGroupsOptions option
@Override
public void registerMetricForSubscription(final KafkaMetric metric) {
- throw new UnsupportedOperationException("not implemented");
+ passedMetrics.add(metric);
+ adminDelegate.registerMetricForSubscription(metric);
}
@Override
public void unregisterMetricFromSubscription(final KafkaMetric metric) {
- throw new UnsupportedOperationException("not implemented");
+ passedMetrics.remove(metric);
+ adminDelegate.unregisterMetricFromSubscription(metric);
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index cb6cbbfb1f404..5a59a79ef5aca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -53,6 +53,7 @@
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.internals.ClientInstanceIdsImpl;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
+import org.apache.kafka.streams.internals.metrics.StreamsClientMetricsDelegatingReporter;
import org.apache.kafka.streams.processor.StandbyUpdateListener;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
@@ -1016,6 +1017,9 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());
metrics = createMetrics(applicationConfigs, time, clientId);
+ final StreamsClientMetricsDelegatingReporter reporter = new StreamsClientMetricsDelegatingReporter(adminClient, clientId);
+ metrics.addReporter(reporter);
+
streamsMetrics = new StreamsMetricsImpl(
metrics,
clientId,
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index ae12240f9976d..d24575a05d88f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1230,7 +1230,7 @@ public class StreamsConfig extends AbstractConfig {
private static final Map ADMIN_CLIENT_OVERRIDES;
static {
final Map tempAdminClientDefaultOverrides = new HashMap<>();
- tempAdminClientDefaultOverrides.put(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, "true");
+ tempAdminClientDefaultOverrides.put(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, true);
ADMIN_CLIENT_OVERRIDES = Collections.unmodifiableMap(tempAdminClientDefaultOverrides);
}
@@ -1811,7 +1811,6 @@ public Map getGlobalConsumerConfigs(final String clientId) {
// add client id with stream client id prefix
baseConsumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-global-consumer");
baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
-
return baseConsumerProps;
}
@@ -1857,7 +1856,6 @@ public Map getAdminConfigs(final String clientId) {
// add client id with stream client id prefix
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
-
return props;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e45021f25c374..e492d180175ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -48,6 +48,7 @@
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
+import org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter;
import org.apache.kafka.streams.processor.StandbyUpdateListener;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
@@ -87,6 +88,9 @@
public class StreamThread extends Thread implements ProcessingThread {
+ private static final String THREAD_ID_SUBSTRING = "-StreamThread-";
+ private static final String STATE_UPDATER_ID_SUBSTRING = "-StateUpdater-";
+
/**
* Stream thread states are the possible states that a stream thread can be in.
* A thread must only be in one state at a time
@@ -367,7 +371,8 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
final int threadIdx,
final Runnable shutdownErrorHook,
final BiConsumer streamsUncaughtExceptionHandler) {
- final String threadId = clientId + "-StreamThread-" + threadIdx;
+
+ final String threadId = clientId + THREAD_ID_SUBSTRING + threadIdx;
final String logPrefix = String.format("stream-thread [%s] ", threadId);
final LogContext logContext = new LogContext(logPrefix);
@@ -473,6 +478,10 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
taskManager.setMainConsumer(mainConsumer);
referenceContainer.mainConsumer = mainConsumer;
+ final String stateUpdaterId = threadId.replace(THREAD_ID_SUBSTRING, STATE_UPDATER_ID_SUBSTRING);
+ final StreamsThreadMetricsDelegatingReporter reporter = new StreamsThreadMetricsDelegatingReporter(mainConsumer, threadId, stateUpdaterId);
+ streamsMetrics.metricsRegistry().addReporter(reporter);
+
final StreamThread streamThread = new StreamThread(
time,
config,
@@ -533,7 +542,7 @@ private static StateUpdater maybeCreateAndStartStateUpdater(final boolean stateU
final String clientId,
final int threadIdx) {
if (stateUpdaterEnabled) {
- final String name = clientId + "-StateUpdater-" + threadIdx;
+ final String name = clientId + STATE_UPDATER_ID_SUBSTRING + threadIdx;
final StateUpdater stateUpdater = new DefaultStateUpdater(
name,
streamsMetrics.metricsRegistry(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 95948de7f4d46..31e755c3b1f8e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -383,7 +383,7 @@ public void shouldOverrideNonPrefixedCustomConfigsWithPrefixedConfigs() {
@Test
public void shouldOverrideAdminDefaultAdminClientEnableTelemetry() {
final Map returnedProps = streamsConfig.getAdminConfigs(clientId);
- assertThat(returnedProps.get(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG), equalTo("true"));
+ assertTrue((boolean) returnedProps.get(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG));
}
@Test