From d6f991948e7a5401b3ecd9fc827c11d87029dd52 Mon Sep 17 00:00:00 2001 From: bbejeck Date: Tue, 3 Sep 2024 18:28:14 -0400 Subject: [PATCH] Added test to confirm stream metrics don't bleed into consumer metrics --- .../KafkaStreamsTelemetryIntegrationTest.java | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java index 89751ec679390..fd51f042e18f0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java @@ -49,13 +49,12 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -73,13 +72,13 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @Timeout(600) @Tag("integration") public class KafkaStreamsTelemetryIntegrationTest { - private static final Logger log = LoggerFactory.getLogger(KafkaStreamsTelemetryIntegrationTest.class); private static final int NUM_BROKERS = 1; public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); @@ -215,6 +214,27 @@ void shouldPassCorrectMetricsDynamicInstances(final boolean stateUpdaterEnabled) } } + @Test + @DisplayName("Streams metrics should not be visible in consumer metrics") + void passedMetricsShouldNotLeakIntoConsumerMetrics() throws InterruptedException { + final Properties properties = props(true); + final Topology topology = complexTopology(); + + try (final KafkaStreams streams = new KafkaStreams(topology, properties)) { + streams.start(); + waitForCondition(() -> KafkaStreams.State.RUNNING == streams.state(), + IntegrationTestUtils.DEFAULT_TIMEOUT, + () -> "Kafka Streams never transitioned to a RUNNING state."); + + final List streamsThreadMetrics = streams.metrics().values().stream().map(Metric::metricName) + .filter(metricName -> metricName.tags().containsKey("thread-id")).collect(Collectors.toList()); + + final Map embeddedConsumerMetrics = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CONSUMER).metrics(); + + streamsThreadMetrics.forEach(metricName -> assertFalse(embeddedConsumerMetrics.containsKey(metricName), "Stream thread metric found in client metrics" + metricName)); + } + } + private List getTaskIdsAsStrings(final KafkaStreams streams) { return streams.metadataForLocalThreads().stream() .flatMap(threadMeta -> threadMeta.activeTasks().stream()