Skip to content

Commit

Permalink
fix tests, start of end-to-end metrics test
Browse files Browse the repository at this point in the history
  • Loading branch information
bbejeck committed Sep 28, 2024
1 parent 6d8db41 commit 235ccb6
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 5 deletions.
1 change: 1 addition & 0 deletions core/src/test/java/kafka/admin/ClientTelemetryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class ClientTelemetryTest {
public void testClientInstanceId(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
configs.put(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, "true");
try (Admin admin = Admin.create(configs)) {
String testTopicName = "test_topic";
admin.createTopics(Collections.singletonList(new NewTopic(testTopicName, 1, (short) 1)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.MemberToRemove;
import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupOptions;
Expand Down Expand Up @@ -994,7 +995,9 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,

// use client id instead of thread client id since this admin client may be shared among threads
this.clientSupplier = clientSupplier;
adminClient = clientSupplier.getAdmin(applicationConfigs.getAdminConfigs(ClientUtils.adminClientId(clientId)));
final Map<String, Object> adminConfigs = applicationConfigs.getAdminConfigs(ClientUtils.adminClientId(clientId));
adminConfigs.put(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, true);
adminClient = clientSupplier.getAdmin(adminConfigs);

log.info("Kafka Streams version: {}", ClientMetrics.version());
log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1812,8 +1812,6 @@ public Map<String, Object> getAdminConfigs(final String clientId) {

// add client id with stream client id prefix
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
props.put(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, true);

return props;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
Expand All @@ -39,6 +40,7 @@
import org.apache.kafka.server.telemetry.ClientTelemetry;
import org.apache.kafka.server.telemetry.ClientTelemetryPayload;
import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
import org.apache.kafka.streams.ClientInstanceIds;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
Expand All @@ -64,6 +66,7 @@
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -82,12 +85,12 @@
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Timeout(600)
@Tag("integration")
public class KafkaStreamsTelemetryIntegrationTest {

private static EmbeddedKafkaCluster cluster;
private String appId;
private String inputTopicTwoPartitions;
Expand All @@ -97,7 +100,7 @@ public class KafkaStreamsTelemetryIntegrationTest {
private final List<Properties> streamsConfigurations = new ArrayList<>();
private static final List<MetricsInterceptingConsumer<byte[], byte[]>> INTERCEPTING_CONSUMERS = new ArrayList<>();
private static final List<TestingMetricsInterceptingAdminClient> INTERCEPTING_ADMIN_CLIENTS = new ArrayList<>();
private static final int NUM_BROKERS = 1;
private static final int NUM_BROKERS = 3;
private static final int FIRST_INSTANCE_CONSUMER = 0;
private static final int SECOND_INSTANCE_CONSUMER = 1;

Expand Down Expand Up @@ -153,6 +156,26 @@ public void shouldNotThrowExceptionWhenRemovingNonExistingMetrics() throws Inter
}
}

@Test
@DisplayName("End-to-end test validating metrics pushed to broker")
public void shouldPushMetricsToBroker() throws Exception {
final Properties properties = props(true);
final Topology topology = complexTopology();
try (final KafkaStreams streams = new KafkaStreams(topology, properties)) {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);

final ClientInstanceIds clientInstanceIds = streams.clientInstanceIds(Duration.ofSeconds(60));
final Uuid adminInstanceId = clientInstanceIds.adminInstanceId();
final Uuid mainConsumerInstanceId = clientInstanceIds.consumerInstanceIds().entrySet().stream()
.filter(entry -> entry.getKey().endsWith("1-consumer"))
.map(Map.Entry::getValue)
.findFirst().get();
assertNotNull(adminInstanceId);
assertNotNull(mainConsumerInstanceId);
}
}


@ParameterizedTest
@MethodSource("singleAndMultiTaskParameters")
@DisplayName("Streams metrics should get passed to Consumer")
Expand Down

0 comments on commit 235ccb6

Please sign in to comment.