Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BE: Update Kafka config-related info on schedule #3764

Merged
merged 33 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
4f3ae69
ISSUE-2787:
Oct 20, 2022
86d931a
ReactiveAdminClient.SupportedFeature refactor
Oct 20, 2022
83bccc1
ISSUE-754: Backend for kafka ACLs
Oct 24, 2022
c1d9f01
ISSUE-754: UnsupportedVersionException handling added when trying che…
Oct 26, 2022
fcfdc69
merged with master
Oct 31, 2022
fd6fb51
Merge branch 'master' into ISSUE_754_acl
iliax Oct 31, 2022
257cc7e
Merge branch 'master' into ISSUE_754_acl
iliax Oct 31, 2022
13af67c
syncAclWithAclCsv test added
Jan 18, 2023
b16ec76
merge with master
Jan 18, 2023
0db0af0
checkstyle fix
Jan 18, 2023
cb17449
RBAC integration added
Jan 18, 2023
4173f78
Merge branch 'master' of github.com:provectus/kafka-ui into ISSUE_754…
Mar 13, 2023
ffff964
PR updated
Mar 13, 2023
297a1d9
wip
Mar 14, 2023
e80096e
Merge branch 'master' into ISSUE_754_acl
iliax Mar 14, 2023
be26f86
wip
Mar 14, 2023
8e5cf2b
wip
Mar 14, 2023
8532085
Merge branch 'master' of github.com:provectus/kafka-ui into ISSUE_754…
Mar 15, 2023
e093ec6
Merge branch 'master' into ISSUE_754_acl
iliax Mar 20, 2023
931b3d1
Merge branch 'master' into ISSUE_754_acl
iliax Mar 27, 2023
d323b89
minor fixes
Apr 3, 2023
be80920
Merge branch 'master' of github.com:provectus/kafka-ui into ISSUE_754…
Apr 3, 2023
a58c205
Merge branch 'master' of github.com:provectus/kafka-ui into ISSUE_754…
Apr 11, 2023
807ca5a
filter params added to acls endpoint
Apr 21, 2023
23a3906
Merge branch 'master' into ISSUE_754_acl
iliax Apr 21, 2023
2d90897
Merge branch 'master' into ISSUE_754_acl
iliax Apr 24, 2023
779ba46
Merge branch 'master' into ISSUE_754_acl
iliax Apr 25, 2023
27fede1
Merge branch 'master' into ISSUE_754_acl
iliax Apr 28, 2023
0be17bc
Merge branch 'master' into ISSUE_754_acl
iliax May 2, 2023
00dcff6
kafka-ui-acl-with-zk.yaml added, kafka-ui-sasl.yaml reverted
May 2, 2023
41020e1
Retrieving config-related info (version, features) on stats update
May 2, 2023
2d78a6e
Merge branch 'master' of github.com:provectus/kafka-ui into ISSUE-374…
May 2, 2023
fe28124
merge with master
May 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.acl.AclOperation;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
Expand All @@ -24,11 +21,10 @@
@Slf4j
public class FeatureService {

private static final String DELETE_TOPIC_ENABLED_SERVER_PROPERTY = "delete.topic.enable";

private final AdminClientService adminClientService;

public Mono<List<ClusterFeature>> getAvailableFeatures(KafkaCluster cluster,
public Mono<List<ClusterFeature>> getAvailableFeatures(ReactiveAdminClient adminClient,
KafkaCluster cluster,
ClusterDescription clusterDescription) {
List<Mono<ClusterFeature>> features = new ArrayList<>();

Expand All @@ -46,29 +42,17 @@ public Mono<List<ClusterFeature>> getAvailableFeatures(KafkaCluster cluster,
features.add(Mono.just(ClusterFeature.SCHEMA_REGISTRY));
}

features.add(topicDeletionEnabled(cluster, clusterDescription.getController()));
features.add(topicDeletionEnabled(adminClient));
features.add(aclView(cluster));
features.add(aclEdit(clusterDescription));

return Flux.fromIterable(features).flatMap(m -> m).collectList();
}

private Mono<ClusterFeature> topicDeletionEnabled(KafkaCluster cluster, @Nullable Node controller) {
if (controller == null) {
return Mono.just(ClusterFeature.TOPIC_DELETION); // assuming it is enabled by default
}
return adminClientService.get(cluster)
.flatMap(ac -> ac.loadBrokersConfig(List.of(controller.id())))
.map(config ->
config.values().stream()
.flatMap(Collection::stream)
.filter(e -> e.name().equals(DELETE_TOPIC_ENABLED_SERVER_PROPERTY))
.map(e -> Boolean.parseBoolean(e.value()))
.findFirst()
.orElse(true))
.flatMap(enabled -> enabled
? Mono.just(ClusterFeature.TOPIC_DELETION)
: Mono.empty());
private Mono<ClusterFeature> topicDeletionEnabled(ReactiveAdminClient adminClient) {
return adminClient.isTopicDeletionEnabled()
? Mono.just(ClusterFeature.TOPIC_DELETION)
: Mono.empty();
}

private Mono<ClusterFeature> aclEdit(ClusterDescription clusterDescription) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@
import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
Expand Down Expand Up @@ -75,7 +76,6 @@
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -85,7 +85,7 @@


@Slf4j
@RequiredArgsConstructor
@AllArgsConstructor
public class ReactiveAdminClient implements Closeable {

public enum SupportedFeature {
Expand All @@ -104,7 +104,8 @@ public enum SupportedFeature {
this.predicate = (admin, ver) -> Mono.just(ver != null && ver >= fromVersion);
}

static Mono<Set<SupportedFeature>> forVersion(AdminClient ac, @Nullable Float kafkaVersion) {
static Mono<Set<SupportedFeature>> forVersion(AdminClient ac, String kafkaVersionStr) {
@Nullable Float kafkaVersion = KafkaVersion.parse(kafkaVersionStr).orElse(null);
return Flux.fromArray(SupportedFeature.values())
.flatMap(f -> f.predicate.apply(ac, kafkaVersion).map(enabled -> Tuples.of(f, enabled)))
.filter(Tuple2::getT2)
Expand All @@ -123,19 +124,46 @@ public static class ClusterDescription {
Set<AclOperation> authorizedOperations;
}

public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
return getClusterVersion(adminClient)
.flatMap(ver ->
getSupportedUpdateFeaturesForVersion(adminClient, ver)
.map(features ->
new ReactiveAdminClient(adminClient, ver, features)));
@Builder
private record ConfigRelatedInfo(String version,
Set<SupportedFeature> features,
boolean topicDeletionIsAllowed) {

private static Mono<ConfigRelatedInfo> extract(AdminClient ac, int controllerId) {
return loadBrokersConfig(ac, List.of(controllerId))
.map(map -> map.isEmpty() ? List.<ConfigEntry>of() : map.get(controllerId))
.flatMap(configs -> {
String version = "1.0-UNKNOWN";
boolean topicDeletionEnabled = true;
for (ConfigEntry entry : configs) {
if (entry.name().contains("inter.broker.protocol.version")) {
version = entry.value();
}
if (entry.name().equals("delete.topic.enable")) {
topicDeletionEnabled = Boolean.parseBoolean(entry.value());
}
}
var builder = ConfigRelatedInfo.builder()
.version(version)
.topicDeletionIsAllowed(topicDeletionEnabled);
return SupportedFeature.forVersion(ac, version)
.map(features -> builder.features(features).build());
});
}
}

private static Mono<Set<SupportedFeature>> getSupportedUpdateFeaturesForVersion(AdminClient ac, String versionStr) {
@Nullable Float kafkaVersion = KafkaVersion.parse(versionStr).orElse(null);
return SupportedFeature.forVersion(ac, kafkaVersion);
public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
return describeClusterImpl(adminClient, Set.of())
// choosing node from which we will get configs (starting with controller)
.flatMap(descr -> descr.controller != null
? Mono.just(descr.controller)
: Mono.justOrEmpty(descr.nodes.stream().findFirst())
)
.flatMap(node -> ConfigRelatedInfo.extract(adminClient, node.id()))
.map(info -> new ReactiveAdminClient(adminClient, info));
}


private static Mono<Boolean> isAuthorizedSecurityEnabled(AdminClient ac, @Nullable Float kafkaVersion) {
return toMono(ac.describeAcls(AclBindingFilter.ANY).values())
.thenReturn(true)
Expand Down Expand Up @@ -174,11 +202,10 @@ public static <T> Mono<T> toMono(KafkaFuture<T> future) {

@Getter(AccessLevel.PACKAGE) // visible for testing
private final AdminClient client;
private final String version;
private final Set<SupportedFeature> features;
private volatile ConfigRelatedInfo configRelatedInfo;

public Set<SupportedFeature> getClusterFeatures() {
return features;
return configRelatedInfo.features();
}

public Mono<Set<String>> listTopics(boolean listInternal) {
Expand All @@ -190,7 +217,20 @@ public Mono<Void> deleteTopic(String topicName) {
}

public String getVersion() {
return version;
return configRelatedInfo.version();
}

public boolean isTopicDeletionEnabled() {
return configRelatedInfo.topicDeletionIsAllowed();
}

public Mono<Void> updateInternalStats(@Nullable Node controller) {
if (controller == null) {
return Mono.empty();
}
return ConfigRelatedInfo.extract(client, controller.id())
.doOnNext(info -> this.configRelatedInfo = info)
.then();
}

public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig() {
Expand All @@ -200,7 +240,7 @@ public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig() {
//NOTE: skips not-found topics (for which UnknownTopicOrPartitionException was thrown by AdminClient)
//and topics for which DESCRIBE_CONFIGS permission is not set (TopicAuthorizationException was thrown)
public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames, boolean includeDoc) {
var includeDocFixed = features.contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL) && includeDoc;
var includeDocFixed = includeDoc && getClusterFeatures().contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL);
// we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
return partitionCalls(
topicNames,
Expand Down Expand Up @@ -349,7 +389,7 @@ public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> descr
}

public Mono<ClusterDescription> describeCluster() {
return describeClusterImpl(client, features);
return describeClusterImpl(client, getClusterFeatures());
}

private static Mono<ClusterDescription> describeClusterImpl(AdminClient client, Set<SupportedFeature> features) {
Expand All @@ -371,23 +411,6 @@ private static Mono<ClusterDescription> describeClusterImpl(AdminClient client,
);
}

private static Mono<String> getClusterVersion(AdminClient client) {
return describeClusterImpl(client, Set.of())
// choosing node from which we will get configs (starting with controller)
.flatMap(descr -> descr.controller != null
? Mono.just(descr.controller)
: Mono.justOrEmpty(descr.nodes.stream().findFirst())
)
.flatMap(node -> loadBrokersConfig(client, List.of(node.id())))
.flatMap(configs -> configs.values().stream()
.flatMap(Collection::stream)
.filter(entry -> entry.name().contains("inter.broker.protocol.version"))
.findFirst()
.map(configEntry -> Mono.just(configEntry.value()))
.orElse(Mono.empty()))
.switchIfEmpty(Mono.just("1.0-UNKNOWN"));
}

public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) {
return toMono(client.deleteConsumerGroups(groupIds).all())
.onErrorResume(GroupIdNotFoundException.class,
Expand Down Expand Up @@ -421,7 +444,7 @@ public Mono<Void> createPartitions(Map<String, NewPartitions> newPartitionsMap)
// NOTE: places whole current topic config with new one. Entries that were present in old config,
// but missed in new will be set to default
public Mono<Void> updateTopicConfig(String topicName, Map<String, String> configs) {
if (features.contains(SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) {
if (getClusterFeatures().contains(SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) {
return getTopicsConfigImpl(List.of(topicName), false)
.map(conf -> conf.getOrDefault(topicName, List.of()))
.flatMap(currentConfigs -> incrementalAlterConfig(topicName, currentConfigs, configs));
Expand Down Expand Up @@ -596,17 +619,17 @@ Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> par
}

public Mono<Collection<AclBinding>> listAcls(ResourcePatternFilter filter) {
Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
Preconditions.checkArgument(getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
return toMono(client.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values());
}

public Mono<Void> createAcls(Collection<AclBinding> aclBindings) {
Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
Preconditions.checkArgument(getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
return toMono(client.createAcls(aclBindings).all());
}

public Mono<Void> deleteAcls(Collection<AclBinding> aclBindings) {
Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
Preconditions.checkArgument(getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
var filters = aclBindings.stream().map(AclBinding::toFilter).collect(Collectors.toSet());
return toMono(client.deleteAcls(filters).all()).then();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,26 @@ public Mono<Statistics> updateCache(KafkaCluster c) {
private Mono<Statistics> getStatistics(KafkaCluster cluster) {
return adminClientService.get(cluster).flatMap(ac ->
ac.describeCluster().flatMap(description ->
Mono.zip(
List.of(
metricsCollector.getBrokerMetrics(cluster, description.getNodes()),
getLogDirInfo(description, ac),
featureService.getAvailableFeatures(cluster, description),
loadTopicConfigs(cluster),
describeTopics(cluster)),
results ->
Statistics.builder()
.status(ServerStatusDTO.ONLINE)
.clusterDescription(description)
.version(ac.getVersion())
.metrics((Metrics) results[0])
.logDirInfo((InternalLogDirStats) results[1])
.features((List<ClusterFeature>) results[2])
.topicConfigs((Map<String, List<ConfigEntry>>) results[3])
.topicDescriptions((Map<String, TopicDescription>) results[4])
.build()
)))
ac.updateInternalStats(description.getController()).then(
Mono.zip(
List.of(
metricsCollector.getBrokerMetrics(cluster, description.getNodes()),
getLogDirInfo(description, ac),
featureService.getAvailableFeatures(ac, cluster, description),
loadTopicConfigs(cluster),
describeTopics(cluster)),
results ->
Statistics.builder()
.status(ServerStatusDTO.ONLINE)
.clusterDescription(description)
.version(ac.getVersion())
.metrics((Metrics) results[0])
.logDirInfo((InternalLogDirStats) results[1])
.features((List<ClusterFeature>) results[2])
.topicConfigs((Map<String, List<ConfigEntry>>) results[3])
.topicDescriptions((Map<String, TopicDescription>) results[4])
.build()
))))
.doOnError(e ->
log.error("Failed to collect cluster {} info", cluster.getName(), e))
.onErrorResume(
Expand Down