Skip to content

Commit

Permalink
Added integration tests, set logging level in reporter to debug
Browse files Browse the repository at this point in the history
  • Loading branch information
bbejeck committed Sep 3, 2024
1 parent d3c347d commit b566d34
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public StreamsThreadDelegatingMetricsReporter(final Consumer<?, ?> consumer, fin
this.consumer = Objects.requireNonNull(consumer);
this.threadId = Objects.requireNonNull(threadId);
this.stateUpdaterThreadId = Objects.requireNonNull(stateUpdaterThreadId);
LOG.info("Creating MetricsReporter for threadId {} and stateUpdaterId {}", threadId, stateUpdaterThreadId);
LOG.debug("Creating MetricsReporter for threadId {} and stateUpdaterId {}", threadId, stateUpdaterThreadId);
}

@Override
Expand All @@ -53,7 +53,7 @@ public void init(final List<KafkaMetric> metrics) {
@Override
public void metricChange(final KafkaMetric metric) {
if (tagMatchStreamOrStateUpdaterThreadId(metric)) {
LOG.info("Registering metric {}", metric.metricName());
LOG.debug("Registering metric {}", metric.metricName());
consumer.registerMetric(metric);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public class KafkaStreamsTelemetryIntegrationTest {
private String outputTopicOnePartition;
private final List<Properties> streamsConfigurations = new ArrayList<>();
private static final List<MetricsInterceptingConsumer<?, ?>> INTERCEPTING_CONSUMERS = new ArrayList<>();
private static final int FIRST_INSTANCE_CONSUMER = 0;
private static final int SECOND_INSTANCE_CONSUMER = 1;

@BeforeAll
public static void startCluster() throws IOException {
Expand Down Expand Up @@ -123,7 +125,7 @@ public void tearDown() throws Exception {


@ParameterizedTest
@MethodSource("provideStreamParameters")
@MethodSource("singleAndMultiTaskParameters")
@DisplayName("Streams metrics should get passed to Consumer")
void shouldPassMetrics(final String topologyType, final boolean stateUpdaterEnabled) throws InterruptedException {
final Properties properties = props(stateUpdaterEnabled);
Expand All @@ -134,31 +136,108 @@ void shouldPassMetrics(final String topologyType, final boolean stateUpdaterEnab
waitForCondition(() -> KafkaStreams.State.RUNNING == streams.state(),
IntegrationTestUtils.DEFAULT_TIMEOUT,
() -> "Kafka Streams never transitioned to a RUNNING state.");

final List<MetricName> streamsThreadMetrics = streams.metrics().values().stream().map(Metric::metricName)
.filter(metricName -> metricName.tags().containsKey("thread-id")).collect(Collectors.toList());
final List<MetricName> consumerPassedStreamMetricNames = INTERCEPTING_CONSUMERS.get(0).addedMetrics.stream().map(KafkaMetric::metricName).collect(Collectors.toList());

final List<MetricName> consumerPassedStreamMetricNames = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CONSUMER).passedMetrics.stream().map(KafkaMetric::metricName).collect(Collectors.toList());

assertEquals(streamsThreadMetrics.size(), consumerPassedStreamMetricNames.size());
consumerPassedStreamMetricNames.forEach(metricName -> assertTrue(streamsThreadMetrics.contains(metricName)));
consumerPassedStreamMetricNames.forEach(metricName -> assertTrue(streamsThreadMetrics.contains(metricName), "Streams metrics doesn't contain " + metricName));
}
}

private static Stream<Arguments> provideStreamParameters() {
@ParameterizedTest
@MethodSource("multiTaskParameters")
@DisplayName("Correct treams metrics should get passed with dynamic membership")
void shouldPassCorrectMetricsDynamicInstances(final boolean stateUpdaterEnabled) throws InterruptedException {
final Properties properties1 = props(stateUpdaterEnabled);
properties1.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath() + "-ks1");
properties1.put(StreamsConfig.CLIENT_ID_CONFIG, appId + "-ks1");


final Properties properties2 = props(stateUpdaterEnabled);
properties2.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath() + "-ks2");
properties2.put(StreamsConfig.CLIENT_ID_CONFIG, appId + "-ks2");

final Topology topology = complexTopology();
try (final KafkaStreams streamsOne = new KafkaStreams(topology, properties1)) {
streamsOne.start();
waitForCondition(() -> KafkaStreams.State.RUNNING == streamsOne.state(),
IntegrationTestUtils.DEFAULT_TIMEOUT,
() -> "Kafka Streams never transitioned to a RUNNING state.");

final List<MetricName> streamsTaskMetricNames = streamsOne.metrics().values().stream().map(Metric::metricName)
.filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList());

final List<MetricName> consumerPassedStreamTaskMetricNames = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CONSUMER).passedMetrics.stream().map(KafkaMetric::metricName)
.filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList());

final List<String> streamTaskIds = getTaskIdsAsStrings(streamsOne);
final long consumerPassedTaskMetricCount = consumerPassedStreamTaskMetricNames.stream().filter(metricName -> streamTaskIds.contains(metricName.tags().get("task-id"))).count();
assertEquals(streamsTaskMetricNames.size(), consumerPassedStreamTaskMetricNames.size());
assertEquals(consumerPassedTaskMetricCount, streamsTaskMetricNames.size());


try (final KafkaStreams streamsTwo = new KafkaStreams(topology, properties2)) {
streamsTwo.start();
waitForCondition(() -> KafkaStreams.State.RUNNING == streamsTwo.state() && KafkaStreams.State.RUNNING == streamsOne.state(),
IntegrationTestUtils.DEFAULT_TIMEOUT,
() -> "Kafka Streams one or two never transitioned to a RUNNING state.");

final List<String> streamOneTaskIds = getTaskIdsAsStrings(streamsOne);
final List<String> streamTwoTasksIds = getTaskIdsAsStrings(streamsTwo);

final List<MetricName> streamsOneTaskMetrics = streamsOne.metrics().values().stream().map(Metric::metricName)
.filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList());

final List<MetricName> consumerOnePassedTaskMetrics = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CONSUMER)
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList());

final List<MetricName> streamsTwoTaskMetrics = streamsTwo.metrics().values().stream().map(Metric::metricName)
.filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList());

final List<MetricName> consumerTwoPassedTaskMetrics = INTERCEPTING_CONSUMERS.get(SECOND_INSTANCE_CONSUMER)
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList());

final long consumerOneStreamOneTaskCount = consumerOnePassedTaskMetrics.stream().filter(metricName -> streamOneTaskIds.contains(metricName.tags().get("task-id"))).count();
final long consumerOneTaskTwoMetricCount = consumerOnePassedTaskMetrics.stream().filter(metricName -> streamTwoTasksIds.contains(metricName.tags().get("task-id"))).count();

final long consumerTwoStreamTwoTaskCount = consumerTwoPassedTaskMetrics.stream().filter(metricName -> streamTwoTasksIds.contains(metricName.tags().get("task-id"))).count();
final long consumerTwoTaskOneMetricCount = consumerTwoPassedTaskMetrics.stream().filter(metricName -> streamOneTaskIds.contains(metricName.tags().get("task-id"))).count();

assertEquals(streamsOneTaskMetrics.size(), consumerOneStreamOneTaskCount);
assertEquals(0, consumerOneTaskTwoMetricCount);

assertEquals(streamsTwoTaskMetrics.size(), consumerTwoStreamTwoTaskCount);
assertEquals(0, consumerTwoTaskOneMetricCount);
}
}
}

private List<String> getTaskIdsAsStrings(final KafkaStreams streams) {
return streams.metadataForLocalThreads().stream()
.flatMap(threadMeta -> threadMeta.activeTasks().stream()
.map(taskMeta -> taskMeta.taskId().toString()))
.collect(Collectors.toList());
}

private static Stream<Arguments> singleAndMultiTaskParameters() {
return Stream.of(Arguments.of("simple", true),
Arguments.of("simple", false),
Arguments.of("simple", false),
Arguments.of("complex", true),
Arguments.of("complex", false));
Arguments.of("complex", false));
}


private Properties props() {
return props(new Properties());
private static Stream<Arguments> multiTaskParameters() {
return Stream.of(Arguments.of(true),
Arguments.of(false));
}

private Properties props(final boolean stateUpdaterEnabled) {
return props(mkObjectProperties(mkMap(mkEntry(StreamsConfig.InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled))));
}

private Properties props(final Properties extraProperties) {
final Properties streamsConfiguration = new Properties();

Expand Down Expand Up @@ -228,8 +307,7 @@ public Admin getAdmin(final Map<String, Object> config) {

public static class MetricsInterceptingConsumer<K, V> extends KafkaConsumer<K, V> {

public List<KafkaMetric> addedMetrics = new ArrayList<>();
public List<KafkaMetric> removedMetrics = new ArrayList<>();
public List<KafkaMetric> passedMetrics = new ArrayList<>();

public MetricsInterceptingConsumer(final Map<String, Object> configs) {
super(configs);
Expand All @@ -249,14 +327,13 @@ public MetricsInterceptingConsumer(final Map<String, Object> configs, final Dese

@Override
public void registerMetric(final KafkaMetric metric) {
addedMetrics.add(metric);
passedMetrics.add(metric);
super.registerMetric(metric);
}

@Override
public void unregisterMetric(final KafkaMetric metric) {
addedMetrics.remove(metric);
removedMetrics.add(metric);
passedMetrics.remove(metric);
super.unregisterMetric(metric);
}
}
Expand Down

0 comments on commit b566d34

Please sign in to comment.