Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16550: add integration test for LogDirsCommand #16485

Merged
merged 1 commit into from
Jul 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 117 additions & 1 deletion tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,142 @@
*/
package org.apache.kafka.tools;

import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterTest;
import kafka.test.junit.ClusterTestExtensions;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.LogDirDescription;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

@ExtendWith(value = ClusterTestExtensions.class)
public class LogDirsCommandTest {
private static final String TOPIC = "test-log-dirs-topic";

@ClusterTest(brokers = 3)
public void testLogDirsWithoutBrokers(ClusterInstance clusterInstance) {
createTopic(clusterInstance, TOPIC);
try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
String output = assertDoesNotThrow(() -> execute(fromArgsToOptions("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe"), admin));

// check all brokers are present
clusterInstance.brokerIds().forEach(brokerId -> assertTrue(output.contains("\"broker\":" + brokerId)));

// check all log dirs and topic partitions are present
Map<Integer, Map<String, LogDirDescription>> logDirs = assertDoesNotThrow(() -> admin.describeLogDirs(clusterInstance.brokerIds()).allDescriptions().get());
assertFalse(logDirs.isEmpty());
logDirs.forEach((brokerId, logDirInfo) ->
logDirInfo.forEach((logDir, logDirInfoValue) -> {
assertTrue(output.contains("\"logDir\":\"" + logDir + "\""));
logDirInfoValue.replicaInfos().forEach((topicPartition, replicaInfo) -> assertTrue(output.contains("\"partition\":\"" + topicPartition + "\"")));
}));
}
}

@ClusterTest(brokers = 3)
public void testLogDirsWithBrokers(ClusterInstance clusterInstance) {
createTopic(clusterInstance, TOPIC);
try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
int brokerId = 0;
String output = assertDoesNotThrow(() -> execute(fromArgsToOptions("--bootstrap-server", clusterInstance.bootstrapServers(), "--broker-list", String.valueOf(brokerId), "--describe"), admin));

// check only broker in list is present
assertTrue(output.contains("\"broker\":" + brokerId));
clusterInstance.brokerIds().stream().filter(id -> id != brokerId).forEach(id -> assertFalse(output.contains("\"broker\":" + id)));

// check log dir and topic partition are present
Map<Integer, Map<String, LogDirDescription>> logDirs = assertDoesNotThrow(() -> admin.describeLogDirs(Collections.singleton(brokerId)).allDescriptions().get());
assertEquals(1, logDirs.size());
logDirs.forEach((brokerIdValue, logDirInfo) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove {}

assertFalse(logDirInfo.isEmpty());
logDirInfo.forEach((logDir, logDirInfoValue) -> {
assertTrue(output.contains("\"logDir\":\"" + logDir + "\""));
Optional<TopicPartition> topicPartition = logDirInfoValue.replicaInfos().keySet().stream().filter(tp -> tp.topic().equals(TOPIC)).findFirst();
assertTrue(topicPartition.isPresent());
assertTrue(output.contains("\"partition\":\"" + topicPartition.get() + "\""));
});
});
}
}

@ClusterTest
public void testLogDirsWithNonExistentTopic(ClusterInstance clusterInstance) {
try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
String output = assertDoesNotThrow(() -> execute(fromArgsToOptions("--bootstrap-server", clusterInstance.bootstrapServers(), "--topic-list", TOPIC, "--describe"), admin));
// check all brokers are present
clusterInstance.brokerIds().forEach(brokerId -> assertTrue(output.contains("\"broker\":" + brokerId)));

// check log dir is present, but topic partition is not
Map<Integer, Map<String, LogDirDescription>> logDirs = assertDoesNotThrow(() -> admin.describeLogDirs(clusterInstance.brokerIds()).allDescriptions().get());
assertFalse(logDirs.isEmpty());
logDirs.forEach((brokerId, logDirInfo) ->
logDirInfo.forEach((logDir, logDirInfoValue) -> {
assertTrue(output.contains("\"logDir\":\"" + logDir + "\""));
logDirInfoValue.replicaInfos().forEach((topicPartition, replicaInfo) -> {
assertFalse(output.contains("\"partition\":\"" + topicPartition + "\""));
});
}));
}
}

@ClusterTest
public void testLogDirsWithSpecificTopic(ClusterInstance clusterInstance) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add test to verify that it returns only the specific topic will get returned? for example, you can create another topic before testing.

createTopic(clusterInstance, TOPIC);
createTopic(clusterInstance, "other-topic");
try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
String output = assertDoesNotThrow(() -> execute(fromArgsToOptions("--bootstrap-server", clusterInstance.bootstrapServers(), "--topic-list", TOPIC, "--describe"), admin));
// check all brokers are present
clusterInstance.brokerIds().forEach(brokerId -> assertTrue(output.contains("\"broker\":" + brokerId)));

// check log dir is present and only one topic partition is present
Map<Integer, Map<String, LogDirDescription>> logDirs = assertDoesNotThrow(() -> admin.describeLogDirs(clusterInstance.brokerIds()).allDescriptions().get());
assertFalse(logDirs.isEmpty());
assertTrue(output.contains("\"partition\":\"" + new TopicPartition(TOPIC, 0) + "\""));
assertFalse(output.contains("\"partition\":\"" + new TopicPartition("other-topic", 0) + "\"")); // other-topic should not be present
logDirs.forEach((brokerId, logDirInfo) ->
logDirInfo.forEach((logDir, logDirInfoValue) -> {
assertTrue(output.contains("\"logDir\":\"" + logDir + "\""));
logDirInfoValue.replicaInfos().keySet().stream().filter(tp -> !tp.topic().equals(TOPIC)).forEach(tp -> {
assertFalse(output.contains("\"partition\":\"" + tp + "\""));
});
}));
}
}

@Test
public void shouldThrowWhenQueryingNonExistentBrokers() {
Node broker = new Node(1, "hostname", 9092);
try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
assertThrows(RuntimeException.class, () -> execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "0,1,2", "--describe"), adminClient));
RuntimeException exception = assertThrows(RuntimeException.class, () -> execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "0,1,2", "--describe"), adminClient));
assertNotNull(exception.getCause());
assertEquals(TerseException.class, exception.getCause().getClass());
assertEquals("ERROR: The given brokers do not exist from --broker-list: 0,2. Current existent brokers: 1", exception.getCause().getMessage());
}
}

Expand Down Expand Up @@ -113,4 +222,11 @@ private String execute(LogDirsCommand.LogDirsCommandOptions options, Admin admin
};
return ToolsTestUtils.captureStandardOut(runnable);
}

private void createTopic(ClusterInstance clusterInstance, String topic) {
try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
assertDoesNotThrow(() -> admin.createTopics(Collections.singletonList(new NewTopic(topic, Collections.singletonMap(0, Collections.singletonList(0))))).topicId(topic).get());
assertDoesNotThrow(() -> clusterInstance.waitForTopic(topic, 1));
}
}
}