From aa7429eeba2e942846fe071d72d4621dbd2aa70b Mon Sep 17 00:00:00 2001 From: Ilya Kuramshin Date: Wed, 10 May 2023 10:45:18 +0400 Subject: [PATCH] BE: Update Kafka config-related info on schedule (#3764) Co-authored-by: iliax --- .../kafka/ui/service/FeatureService.java | 30 ++--- .../kafka/ui/service/ReactiveAdminClient.java | 103 +++++++++++------- .../kafka/ui/service/StatisticsService.java | 39 +++---- 3 files changed, 90 insertions(+), 82 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java index 7ba3f036e93..87b48072c2d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java @@ -4,16 +4,13 @@ import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.function.Predicate; -import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.Node; import org.apache.kafka.common.acl.AclOperation; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; @@ -24,11 +21,10 @@ @Slf4j public class FeatureService { - private static final String DELETE_TOPIC_ENABLED_SERVER_PROPERTY = "delete.topic.enable"; - private final AdminClientService adminClientService; - public Mono> getAvailableFeatures(KafkaCluster cluster, + public Mono> getAvailableFeatures(ReactiveAdminClient adminClient, + KafkaCluster cluster, ClusterDescription clusterDescription) { List> features = new ArrayList<>(); @@ -46,29 +42,17 @@ public Mono> getAvailableFeatures(KafkaCluster cluster, features.add(Mono.just(ClusterFeature.SCHEMA_REGISTRY)); } - features.add(topicDeletionEnabled(cluster, clusterDescription.getController())); + features.add(topicDeletionEnabled(adminClient)); features.add(aclView(cluster)); features.add(aclEdit(clusterDescription)); return Flux.fromIterable(features).flatMap(m -> m).collectList(); } - private Mono topicDeletionEnabled(KafkaCluster cluster, @Nullable Node controller) { - if (controller == null) { - return Mono.just(ClusterFeature.TOPIC_DELETION); // assuming it is enabled by default - } - return adminClientService.get(cluster) - .flatMap(ac -> ac.loadBrokersConfig(List.of(controller.id()))) - .map(config -> - config.values().stream() - .flatMap(Collection::stream) - .filter(e -> e.name().equals(DELETE_TOPIC_ENABLED_SERVER_PROPERTY)) - .map(e -> Boolean.parseBoolean(e.value())) - .findFirst() - .orElse(true)) - .flatMap(enabled -> enabled - ? Mono.just(ClusterFeature.TOPIC_DELETION) - : Mono.empty()); + private Mono topicDeletionEnabled(ReactiveAdminClient adminClient) { + return adminClient.isTopicDeletionEnabled() + ? Mono.just(ClusterFeature.TOPIC_DELETION) + : Mono.empty(); } private Mono aclEdit(ClusterDescription clusterDescription) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index 8451a89f97d..0b6f16a2235 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -32,8 +32,9 @@ import java.util.stream.Stream; import javax.annotation.Nullable; import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Getter; -import lombok.RequiredArgsConstructor; import lombok.Value; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; @@ -75,7 +76,6 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.requests.DescribeLogDirsResponse; -import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourcePatternFilter; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -85,7 +85,7 @@ @Slf4j -@RequiredArgsConstructor +@AllArgsConstructor public class ReactiveAdminClient implements Closeable { public enum SupportedFeature { @@ -104,7 +104,8 @@ public enum SupportedFeature { this.predicate = (admin, ver) -> Mono.just(ver != null && ver >= fromVersion); } - static Mono> forVersion(AdminClient ac, @Nullable Float kafkaVersion) { + static Mono> forVersion(AdminClient ac, String kafkaVersionStr) { + @Nullable Float kafkaVersion = KafkaVersion.parse(kafkaVersionStr).orElse(null); return Flux.fromArray(SupportedFeature.values()) .flatMap(f -> f.predicate.apply(ac, kafkaVersion).map(enabled -> Tuples.of(f, enabled))) .filter(Tuple2::getT2) @@ -123,19 +124,46 @@ public static class ClusterDescription { Set authorizedOperations; } - public static Mono create(AdminClient adminClient) { - return getClusterVersion(adminClient) - .flatMap(ver -> - getSupportedUpdateFeaturesForVersion(adminClient, ver) - .map(features -> - new ReactiveAdminClient(adminClient, ver, features))); + @Builder + private record ConfigRelatedInfo(String version, + Set features, + boolean topicDeletionIsAllowed) { + + private static Mono extract(AdminClient ac, int controllerId) { + return loadBrokersConfig(ac, List.of(controllerId)) + .map(map -> map.isEmpty() ? List.of() : map.get(controllerId)) + .flatMap(configs -> { + String version = "1.0-UNKNOWN"; + boolean topicDeletionEnabled = true; + for (ConfigEntry entry : configs) { + if (entry.name().contains("inter.broker.protocol.version")) { + version = entry.value(); + } + if (entry.name().equals("delete.topic.enable")) { + topicDeletionEnabled = Boolean.parseBoolean(entry.value()); + } + } + var builder = ConfigRelatedInfo.builder() + .version(version) + .topicDeletionIsAllowed(topicDeletionEnabled); + return SupportedFeature.forVersion(ac, version) + .map(features -> builder.features(features).build()); + }); + } } - private static Mono> getSupportedUpdateFeaturesForVersion(AdminClient ac, String versionStr) { - @Nullable Float kafkaVersion = KafkaVersion.parse(versionStr).orElse(null); - return SupportedFeature.forVersion(ac, kafkaVersion); + public static Mono create(AdminClient adminClient) { + return describeClusterImpl(adminClient, Set.of()) + // choosing node from which we will get configs (starting with controller) + .flatMap(descr -> descr.controller != null + ? Mono.just(descr.controller) + : Mono.justOrEmpty(descr.nodes.stream().findFirst()) + ) + .flatMap(node -> ConfigRelatedInfo.extract(adminClient, node.id())) + .map(info -> new ReactiveAdminClient(adminClient, info)); } + private static Mono isAuthorizedSecurityEnabled(AdminClient ac, @Nullable Float kafkaVersion) { return toMono(ac.describeAcls(AclBindingFilter.ANY).values()) .thenReturn(true) @@ -174,11 +202,10 @@ public static Mono toMono(KafkaFuture future) { @Getter(AccessLevel.PACKAGE) // visible for testing private final AdminClient client; - private final String version; - private final Set features; + private volatile ConfigRelatedInfo configRelatedInfo; public Set getClusterFeatures() { - return features; + return configRelatedInfo.features(); } public Mono> listTopics(boolean listInternal) { @@ -190,7 +217,20 @@ public Mono deleteTopic(String topicName) { } public String getVersion() { - return version; + return configRelatedInfo.version(); + } + + public boolean isTopicDeletionEnabled() { + return configRelatedInfo.topicDeletionIsAllowed(); + } + + public Mono updateInternalStats(@Nullable Node controller) { + if (controller == null) { + return Mono.empty(); + } + return ConfigRelatedInfo.extract(client, controller.id()) + .doOnNext(info -> this.configRelatedInfo = info) + .then(); } public Mono>> getTopicsConfig() { @@ -200,7 +240,7 @@ public Mono>> getTopicsConfig() { //NOTE: skips not-found topics (for which UnknownTopicOrPartitionException was thrown by AdminClient) //and topics for which DESCRIBE_CONFIGS permission is not set (TopicAuthorizationException was thrown) public Mono>> getTopicsConfig(Collection topicNames, boolean includeDoc) { - var includeDocFixed = features.contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL) && includeDoc; + var includeDocFixed = includeDoc && getClusterFeatures().contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL); // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count return partitionCalls( topicNames, @@ -349,7 +389,7 @@ public Mono>> descr } public Mono describeCluster() { - return describeClusterImpl(client, features); + return describeClusterImpl(client, getClusterFeatures()); } private static Mono describeClusterImpl(AdminClient client, Set features) { @@ -371,23 +411,6 @@ private static Mono describeClusterImpl(AdminClient client, ); } - private static Mono getClusterVersion(AdminClient client) { - return describeClusterImpl(client, Set.of()) - // choosing node from which we will get configs (starting with controller) - .flatMap(descr -> descr.controller != null - ? Mono.just(descr.controller) - : Mono.justOrEmpty(descr.nodes.stream().findFirst()) - ) - .flatMap(node -> loadBrokersConfig(client, List.of(node.id()))) - .flatMap(configs -> configs.values().stream() - .flatMap(Collection::stream) - .filter(entry -> entry.name().contains("inter.broker.protocol.version")) - .findFirst() - .map(configEntry -> Mono.just(configEntry.value())) - .orElse(Mono.empty())) - .switchIfEmpty(Mono.just("1.0-UNKNOWN")); - } - public Mono deleteConsumerGroups(Collection groupIds) { return toMono(client.deleteConsumerGroups(groupIds).all()) .onErrorResume(GroupIdNotFoundException.class, @@ -421,7 +444,7 @@ public Mono createPartitions(Map newPartitionsMap) // NOTE: places whole current topic config with new one. Entries that were present in old config, // but missed in new will be set to default public Mono updateTopicConfig(String topicName, Map configs) { - if (features.contains(SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) { + if (getClusterFeatures().contains(SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) { return getTopicsConfigImpl(List.of(topicName), false) .map(conf -> conf.getOrDefault(topicName, List.of())) .flatMap(currentConfigs -> incrementalAlterConfig(topicName, currentConfigs, configs)); @@ -596,17 +619,17 @@ Mono> listOffsetsUnsafe(Collection par } public Mono> listAcls(ResourcePatternFilter filter) { - Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED)); + Preconditions.checkArgument(getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED)); return toMono(client.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values()); } public Mono createAcls(Collection aclBindings) { - Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED)); + Preconditions.checkArgument(getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED)); return toMono(client.createAcls(aclBindings).all()); } public Mono deleteAcls(Collection aclBindings) { - Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED)); + Preconditions.checkArgument(getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED)); var filters = aclBindings.stream().map(AclBinding::toFilter).collect(Collectors.toSet()); return toMono(client.deleteAcls(filters).all()).then(); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java index 994c30714ae..19d946590c4 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java @@ -37,25 +37,26 @@ public Mono updateCache(KafkaCluster c) { private Mono getStatistics(KafkaCluster cluster) { return adminClientService.get(cluster).flatMap(ac -> ac.describeCluster().flatMap(description -> - Mono.zip( - List.of( - metricsCollector.getBrokerMetrics(cluster, description.getNodes()), - getLogDirInfo(description, ac), - featureService.getAvailableFeatures(cluster, description), - loadTopicConfigs(cluster), - describeTopics(cluster)), - results -> - Statistics.builder() - .status(ServerStatusDTO.ONLINE) - .clusterDescription(description) - .version(ac.getVersion()) - .metrics((Metrics) results[0]) - .logDirInfo((InternalLogDirStats) results[1]) - .features((List) results[2]) - .topicConfigs((Map>) results[3]) - .topicDescriptions((Map) results[4]) - .build() - ))) + ac.updateInternalStats(description.getController()).then( + Mono.zip( + List.of( + metricsCollector.getBrokerMetrics(cluster, description.getNodes()), + getLogDirInfo(description, ac), + featureService.getAvailableFeatures(ac, cluster, description), + loadTopicConfigs(cluster), + describeTopics(cluster)), + results -> + Statistics.builder() + .status(ServerStatusDTO.ONLINE) + .clusterDescription(description) + .version(ac.getVersion()) + .metrics((Metrics) results[0]) + .logDirInfo((InternalLogDirStats) results[1]) + .features((List) results[2]) + .topicConfigs((Map>) results[3]) + .topicDescriptions((Map) results[4]) + .build() + )))) .doOnError(e -> log.error("Failed to collect cluster {} info", cluster.getName(), e)) .onErrorResume(