diff --git a/documentation/compose/jaas/kafka_server.conf b/documentation/compose/jaas/kafka_server.conf index 25388be5aa8..0c1fb34652a 100644 --- a/documentation/compose/jaas/kafka_server.conf +++ b/documentation/compose/jaas/kafka_server.conf @@ -11,4 +11,8 @@ KafkaClient { user_admin="admin-secret"; }; -Client {}; +Client { + org.apache.zookeeper.server.auth.DigestLoginModule required + username="zkuser" + password="zkuserpassword"; +}; diff --git a/documentation/compose/jaas/zookeeper_jaas.conf b/documentation/compose/jaas/zookeeper_jaas.conf new file mode 100644 index 00000000000..2d7fd1b1c29 --- /dev/null +++ b/documentation/compose/jaas/zookeeper_jaas.conf @@ -0,0 +1,4 @@ +Server { + org.apache.zookeeper.server.auth.DigestLoginModule required + user_zkuser="zkuserpassword"; +}; diff --git a/documentation/compose/kafka-ui-acl-with-zk.yaml b/documentation/compose/kafka-ui-acl-with-zk.yaml new file mode 100644 index 00000000000..e1d70b29702 --- /dev/null +++ b/documentation/compose/kafka-ui-acl-with-zk.yaml @@ -0,0 +1,59 @@ +--- +version: '2' +services: + + kafka-ui: + container_name: kafka-ui + image: provectuslabs/kafka-ui:latest + ports: + - 8080:8080 + depends_on: + - zookeeper + - kafka + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 + KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SASL_PLAINTEXT + KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM: PLAIN + KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";' + + zookeeper: + image: wurstmeister/zookeeper:3.4.6 + environment: + JVMFLAGS: "-Djava.security.auth.login.config=/etc/zookeeper/zookeeper_jaas.conf" + volumes: + - ./jaas/zookeeper_jaas.conf:/etc/zookeeper/zookeeper_jaas.conf + ports: + - 2181:2181 + + kafka: + image: confluentinc/cp-kafka:7.2.1 + hostname: kafka + container_name: kafka + ports: + - "9092:9092" + - "9997:9997" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' + KAFKA_ADVERTISED_LISTENERS: 'SASL_PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092' + KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas/kafka_server.conf" + KAFKA_AUTHORIZER_CLASS_NAME: "kafka.security.authorizer.AclAuthorizer" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_JMX_PORT: 9997 + KAFKA_JMX_HOSTNAME: localhost + KAFKA_NODE_ID: 1 + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093' + KAFKA_LISTENERS: 'SASL_PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092' + KAFKA_INTER_BROKER_LISTENER_NAME: 'SASL_PLAINTEXT' + KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN' + KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: 'PLAIN' + KAFKA_SECURITY_PROTOCOL: 'SASL_PLAINTEXT' + KAFKA_SUPER_USERS: 'User:admin' + volumes: + - ./scripts/update_run.sh:/tmp/update_run.sh + - ./jaas:/etc/kafka/jaas diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java new file mode 100644 index 00000000000..83d2ef553e8 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java @@ -0,0 +1,115 @@ +package com.provectus.kafka.ui.controller; + +import com.provectus.kafka.ui.api.AclsApi; +import com.provectus.kafka.ui.mapper.ClusterMapper; +import com.provectus.kafka.ui.model.KafkaAclDTO; +import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO; +import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO; +import com.provectus.kafka.ui.model.rbac.AccessContext; +import com.provectus.kafka.ui.model.rbac.permission.AclAction; +import com.provectus.kafka.ui.service.acl.AclsService; +import com.provectus.kafka.ui.service.rbac.AccessControlService; +import java.util.Optional; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePatternFilter; +import org.apache.kafka.common.resource.ResourceType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@RestController +@RequiredArgsConstructor +public class AclsController extends AbstractController implements AclsApi { + + private final AclsService aclsService; + private final AccessControlService accessControlService; + + @Override + public Mono> createAcl(String clusterName, Mono kafkaAclDto, + ServerWebExchange exchange) { + AccessContext context = AccessContext.builder() + .cluster(clusterName) + .aclActions(AclAction.EDIT) + .build(); + + return accessControlService.validateAccess(context) + .then(kafkaAclDto) + .map(ClusterMapper::toAclBinding) + .flatMap(binding -> aclsService.createAcl(getCluster(clusterName), binding)) + .thenReturn(ResponseEntity.ok().build()); + } + + @Override + public Mono> deleteAcl(String clusterName, Mono kafkaAclDto, + ServerWebExchange exchange) { + AccessContext context = AccessContext.builder() + .cluster(clusterName) + .aclActions(AclAction.EDIT) + .build(); + + return accessControlService.validateAccess(context) + .then(kafkaAclDto) + .map(ClusterMapper::toAclBinding) + .flatMap(binding -> aclsService.deleteAcl(getCluster(clusterName), binding)) + .thenReturn(ResponseEntity.ok().build()); + } + + @Override + public Mono>> listAcls(String clusterName, + KafkaAclResourceTypeDTO resourceTypeDto, + String resourceName, + KafkaAclNamePatternTypeDTO namePatternTypeDto, + ServerWebExchange exchange) { + AccessContext context = AccessContext.builder() + .cluster(clusterName) + .aclActions(AclAction.VIEW) + .build(); + + var resourceType = Optional.ofNullable(resourceTypeDto) + .map(ClusterMapper::mapAclResourceTypeDto) + .orElse(ResourceType.ANY); + + var namePatternType = Optional.ofNullable(namePatternTypeDto) + .map(ClusterMapper::mapPatternTypeDto) + .orElse(PatternType.ANY); + + var filter = new ResourcePatternFilter(resourceType, resourceName, namePatternType); + + return accessControlService.validateAccess(context).then( + Mono.just( + ResponseEntity.ok( + aclsService.listAcls(getCluster(clusterName), filter) + .map(ClusterMapper::toKafkaAclDto))) + ); + } + + @Override + public Mono> getAclAsCsv(String clusterName, ServerWebExchange exchange) { + AccessContext context = AccessContext.builder() + .cluster(clusterName) + .aclActions(AclAction.VIEW) + .build(); + + return accessControlService.validateAccess(context).then( + aclsService.getAclAsCsvString(getCluster(clusterName)) + .map(ResponseEntity::ok) + .flatMap(Mono::just) + ); + } + + @Override + public Mono> syncAclsCsv(String clusterName, Mono csvMono, ServerWebExchange exchange) { + AccessContext context = AccessContext.builder() + .cluster(clusterName) + .aclActions(AclAction.EDIT) + .build(); + + return accessControlService.validateAccess(context) + .then(csvMono) + .flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv)) + .thenReturn(ResponseEntity.ok().build()); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java index d989ce93ba7..a122a269a4e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java @@ -20,6 +20,9 @@ import com.provectus.kafka.ui.model.InternalReplica; import com.provectus.kafka.ui.model.InternalTopic; import com.provectus.kafka.ui.model.InternalTopicConfig; +import com.provectus.kafka.ui.model.KafkaAclDTO; +import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO; +import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO; import com.provectus.kafka.ui.model.MetricDTO; import com.provectus.kafka.ui.model.Metrics; import com.provectus.kafka.ui.model.PartitionDTO; @@ -27,12 +30,18 @@ import com.provectus.kafka.ui.model.TopicConfigDTO; import com.provectus.kafka.ui.model.TopicDTO; import com.provectus.kafka.ui.model.TopicDetailsDTO; -import com.provectus.kafka.ui.service.masking.DataMasking; import com.provectus.kafka.ui.service.metrics.RawMetric; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; import org.mapstruct.Mapper; import org.mapstruct.Mapping; @@ -109,8 +118,74 @@ default BrokerDiskUsageDTO map(Integer id, InternalBrokerDiskUsage internalBroke return brokerDiskUsage; } - default DataMasking map(List maskingProperties) { - return DataMasking.create(maskingProperties); + static KafkaAclDTO.OperationEnum mapAclOperation(AclOperation operation) { + return switch (operation) { + case ALL -> KafkaAclDTO.OperationEnum.ALL; + case READ -> KafkaAclDTO.OperationEnum.READ; + case WRITE -> KafkaAclDTO.OperationEnum.WRITE; + case CREATE -> KafkaAclDTO.OperationEnum.CREATE; + case DELETE -> KafkaAclDTO.OperationEnum.DELETE; + case ALTER -> KafkaAclDTO.OperationEnum.ALTER; + case DESCRIBE -> KafkaAclDTO.OperationEnum.DESCRIBE; + case CLUSTER_ACTION -> KafkaAclDTO.OperationEnum.CLUSTER_ACTION; + case DESCRIBE_CONFIGS -> KafkaAclDTO.OperationEnum.DESCRIBE_CONFIGS; + case ALTER_CONFIGS -> KafkaAclDTO.OperationEnum.ALTER_CONFIGS; + case IDEMPOTENT_WRITE -> KafkaAclDTO.OperationEnum.IDEMPOTENT_WRITE; + case CREATE_TOKENS -> KafkaAclDTO.OperationEnum.CREATE_TOKENS; + case DESCRIBE_TOKENS -> KafkaAclDTO.OperationEnum.DESCRIBE_TOKENS; + case ANY -> throw new IllegalArgumentException("ANY operation can be only part of filter"); + case UNKNOWN -> KafkaAclDTO.OperationEnum.UNKNOWN; + }; + } + + static KafkaAclResourceTypeDTO mapAclResourceType(ResourceType resourceType) { + return switch (resourceType) { + case CLUSTER -> KafkaAclResourceTypeDTO.CLUSTER; + case TOPIC -> KafkaAclResourceTypeDTO.TOPIC; + case GROUP -> KafkaAclResourceTypeDTO.GROUP; + case DELEGATION_TOKEN -> KafkaAclResourceTypeDTO.DELEGATION_TOKEN; + case TRANSACTIONAL_ID -> KafkaAclResourceTypeDTO.TRANSACTIONAL_ID; + case USER -> KafkaAclResourceTypeDTO.USER; + case ANY -> throw new IllegalArgumentException("ANY type can be only part of filter"); + case UNKNOWN -> KafkaAclResourceTypeDTO.UNKNOWN; + }; + } + + static ResourceType mapAclResourceTypeDto(KafkaAclResourceTypeDTO dto) { + return ResourceType.valueOf(dto.name()); + } + + static PatternType mapPatternTypeDto(KafkaAclNamePatternTypeDTO dto) { + return PatternType.valueOf(dto.name()); + } + + static AclBinding toAclBinding(KafkaAclDTO dto) { + return new AclBinding( + new ResourcePattern( + mapAclResourceTypeDto(dto.getResourceType()), + dto.getResourceName(), + mapPatternTypeDto(dto.getNamePatternType()) + ), + new AccessControlEntry( + dto.getPrincipal(), + dto.getHost(), + AclOperation.valueOf(dto.getOperation().name()), + AclPermissionType.valueOf(dto.getPermission().name()) + ) + ); + } + + static KafkaAclDTO toKafkaAclDto(AclBinding binding) { + var pattern = binding.pattern(); + var filter = binding.toFilter().entryFilter(); + return new KafkaAclDTO() + .resourceType(mapAclResourceType(pattern.resourceType())) + .resourceName(pattern.name()) + .namePatternType(KafkaAclNamePatternTypeDTO.fromValue(pattern.patternType().name())) + .principal(filter.principal()) + .host(filter.host()) + .operation(mapAclOperation(filter.operation())) + .permission(KafkaAclDTO.PermissionEnum.fromValue(filter.permissionType().name())); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java index 9731492f001..2973e5500d9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java @@ -4,5 +4,7 @@ public enum ClusterFeature { KAFKA_CONNECT, KSQL_DB, SCHEMA_REGISTRY, - TOPIC_DELETION + TOPIC_DELETION, + KAFKA_ACL_VIEW, + KAFKA_ACL_EDIT } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java index 0c2587d681a..45858093a7f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java @@ -1,5 +1,6 @@ package com.provectus.kafka.ui.model.rbac; +import com.provectus.kafka.ui.model.rbac.permission.AclAction; import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction; import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction; import com.provectus.kafka.ui.model.rbac.permission.ConnectAction; @@ -37,6 +38,8 @@ public class AccessContext { Collection ksqlActions; + Collection aclActions; + public static AccessContextBuilder builder() { return new AccessContextBuilder(); } @@ -55,6 +58,7 @@ public static final class AccessContextBuilder { private String schema; private Collection schemaActions = Collections.emptySet(); private Collection ksqlActions = Collections.emptySet(); + private Collection aclActions = Collections.emptySet(); private AccessContextBuilder() { } @@ -131,6 +135,12 @@ public AccessContextBuilder ksqlActions(KsqlAction... actions) { return this; } + public AccessContextBuilder aclActions(AclAction... actions) { + Assert.isTrue(actions.length > 0, "actions not present"); + this.aclActions = List.of(actions); + return this; + } + public AccessContext build() { return new AccessContext( applicationConfigActions, @@ -140,7 +150,7 @@ public AccessContext build() { connect, connectActions, connector, schema, schemaActions, - ksqlActions); + ksqlActions, aclActions); } } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java index afdcf0ca155..16f01f60e6b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java @@ -4,6 +4,7 @@ import static com.provectus.kafka.ui.model.rbac.Resource.CLUSTERCONFIG; import static com.provectus.kafka.ui.model.rbac.Resource.KSQL; +import com.provectus.kafka.ui.model.rbac.permission.AclAction; import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction; import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction; import com.provectus.kafka.ui.model.rbac.permission.ConnectAction; @@ -76,6 +77,7 @@ private List getAllActionValues() { case SCHEMA -> Arrays.stream(SchemaAction.values()).map(Enum::toString).toList(); case CONNECT -> Arrays.stream(ConnectAction.values()).map(Enum::toString).toList(); case KSQL -> Arrays.stream(KsqlAction.values()).map(Enum::toString).toList(); + case ACL -> Arrays.stream(AclAction.values()).map(Enum::toString).toList(); }; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java index 4b2c66361f7..f71dfb29792 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java @@ -11,7 +11,8 @@ public enum Resource { CONSUMER, SCHEMA, CONNECT, - KSQL; + KSQL, + ACL; @Nullable public static Resource fromString(String name) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/AclAction.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/AclAction.java new file mode 100644 index 00000000000..c86af7e72d2 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/AclAction.java @@ -0,0 +1,15 @@ +package com.provectus.kafka.ui.model.rbac.permission; + +import org.apache.commons.lang3.EnumUtils; +import org.jetbrains.annotations.Nullable; + +public enum AclAction implements PermissibleAction { + + VIEW, + EDIT; + + @Nullable + public static AclAction fromString(String name) { + return EnumUtils.getEnum(AclAction.class, name); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java index ec749abd14e..7ba3f036e93 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java @@ -2,16 +2,19 @@ import com.provectus.kafka.ui.model.ClusterFeature; import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Predicate; import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.Node; +import org.apache.kafka.common.acl.AclOperation; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -26,7 +29,7 @@ public class FeatureService { private final AdminClientService adminClientService; public Mono> getAvailableFeatures(KafkaCluster cluster, - ReactiveAdminClient.ClusterDescription clusterDescription) { + ClusterDescription clusterDescription) { List> features = new ArrayList<>(); if (Optional.ofNullable(cluster.getConnectsClients()) @@ -44,6 +47,8 @@ public Mono> getAvailableFeatures(KafkaCluster cluster, } features.add(topicDeletionEnabled(cluster, clusterDescription.getController())); + features.add(aclView(cluster)); + features.add(aclEdit(clusterDescription)); return Flux.fromIterable(features).flatMap(m -> m).collectList(); } @@ -65,4 +70,20 @@ private Mono topicDeletionEnabled(KafkaCluster cluster, @Nullabl ? Mono.just(ClusterFeature.TOPIC_DELETION) : Mono.empty()); } + + private Mono aclEdit(ClusterDescription clusterDescription) { + var authorizedOps = Optional.ofNullable(clusterDescription.getAuthorizedOperations()).orElse(Set.of()); + boolean canEdit = authorizedOps.contains(AclOperation.ALL) || authorizedOps.contains(AclOperation.ALTER); + return canEdit + ? Mono.just(ClusterFeature.KAFKA_ACL_EDIT) + : Mono.empty(); + } + + private Mono aclView(KafkaCluster cluster) { + return adminClientService.get(cluster).flatMap( + ac -> ac.getClusterFeatures().contains(ReactiveAdminClient.SupportedFeature.AUTHORIZED_SECURITY_ENABLED) + ? Mono.just(ClusterFeature.KAFKA_ACL_VIEW) + : Mono.empty() + ); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index 39332da39ee..8451a89f97d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -5,6 +5,7 @@ import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableTable; import com.google.common.collect.Iterables; import com.google.common.collect.Table; @@ -15,7 +16,6 @@ import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant; import java.io.Closeable; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -61,16 +61,22 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.requests.DescribeLogDirsResponse; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -82,25 +88,28 @@ @RequiredArgsConstructor public class ReactiveAdminClient implements Closeable { - private enum SupportedFeature { + public enum SupportedFeature { INCREMENTAL_ALTER_CONFIGS(2.3f), CONFIG_DOCUMENTATION_RETRIEVAL(2.6f), - DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f); + DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f), + AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled); - private final float sinceVersion; + private final BiFunction> predicate; - SupportedFeature(float sinceVersion) { - this.sinceVersion = sinceVersion; + SupportedFeature(BiFunction> predicate) { + this.predicate = predicate; } - static Set forVersion(float kafkaVersion) { - return Arrays.stream(SupportedFeature.values()) - .filter(f -> kafkaVersion >= f.sinceVersion) - .collect(Collectors.toSet()); + SupportedFeature(float fromVersion) { + this.predicate = (admin, ver) -> Mono.just(ver != null && ver >= fromVersion); } - static Set defaultFeatures() { - return Set.of(); + static Mono> forVersion(AdminClient ac, @Nullable Float kafkaVersion) { + return Flux.fromArray(SupportedFeature.values()) + .flatMap(f -> f.predicate.apply(ac, kafkaVersion).map(enabled -> Tuples.of(f, enabled))) + .filter(Tuple2::getT2) + .map(Tuple2::getT1) + .collect(Collectors.toSet()); } } @@ -110,25 +119,31 @@ public static class ClusterDescription { Node controller; String clusterId; Collection nodes; + @Nullable // null, if ACL is disabled Set authorizedOperations; } public static Mono create(AdminClient adminClient) { return getClusterVersion(adminClient) - .map(ver -> - new ReactiveAdminClient( - adminClient, - ver, - getSupportedUpdateFeaturesForVersion(ver))); - } - - private static Set getSupportedUpdateFeaturesForVersion(String versionStr) { - try { - float version = KafkaVersion.parse(versionStr); - return SupportedFeature.forVersion(version); - } catch (NumberFormatException e) { - return SupportedFeature.defaultFeatures(); - } + .flatMap(ver -> + getSupportedUpdateFeaturesForVersion(adminClient, ver) + .map(features -> + new ReactiveAdminClient(adminClient, ver, features))); + } + + private static Mono> getSupportedUpdateFeaturesForVersion(AdminClient ac, String versionStr) { + @Nullable Float kafkaVersion = KafkaVersion.parse(versionStr).orElse(null); + return SupportedFeature.forVersion(ac, kafkaVersion); + } + + private static Mono isAuthorizedSecurityEnabled(AdminClient ac, @Nullable Float kafkaVersion) { + return toMono(ac.describeAcls(AclBindingFilter.ANY).values()) + .thenReturn(true) + .doOnError(th -> !(th instanceof SecurityDisabledException) + && !(th instanceof InvalidRequestException) + && !(th instanceof UnsupportedVersionException), + th -> log.warn("Error checking if security enabled", th)) + .onErrorReturn(false); } // NOTE: if KafkaFuture returns null, that Mono will be empty(!), since Reactor does not support nullable results @@ -162,6 +177,10 @@ public static Mono toMono(KafkaFuture future) { private final String version; private final Set features; + public Set getClusterFeatures() { + return features; + } + public Mono> listTopics(boolean listInternal) { return toMono(client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names()); } @@ -576,6 +595,22 @@ Mono> listOffsetsUnsafe(Collection par ); } + public Mono> listAcls(ResourcePatternFilter filter) { + Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED)); + return toMono(client.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values()); + } + + public Mono createAcls(Collection aclBindings) { + Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED)); + return toMono(client.createAcls(aclBindings).all()); + } + + public Mono deleteAcls(Collection aclBindings) { + Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED)); + var filters = aclBindings.stream().map(AclBinding::toFilter).collect(Collectors.toSet()); + return toMono(client.deleteAcls(filters).all()).then(); + } + public Mono updateBrokerConfigByName(Integer brokerId, String name, String value) { ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId)); AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclCsv.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclCsv.java new file mode 100644 index 00000000000..673b17ee1f8 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclCsv.java @@ -0,0 +1,81 @@ +package com.provectus.kafka.ui.service.acl; + +import com.provectus.kafka.ui.exception.ValidationException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; + +public class AclCsv { + + private static final String LINE_SEPARATOR = System.lineSeparator(); + private static final String VALUES_SEPARATOR = ","; + private static final String HEADER = "Principal,ResourceType,PatternType,ResourceName,Operation,PermissionType,Host"; + + public static String transformToCsvString(Collection acls) { + return Stream.concat(Stream.of(HEADER), acls.stream().map(AclCsv::createAclString)) + .collect(Collectors.joining(System.lineSeparator())); + } + + public static String createAclString(AclBinding binding) { + var pattern = binding.pattern(); + var filter = binding.toFilter().entryFilter(); + return String.format( + "%s,%s,%s,%s,%s,%s,%s", + filter.principal(), + pattern.resourceType(), + pattern.patternType(), + pattern.name(), + filter.operation(), + filter.permissionType(), + filter.host() + ); + } + + private static AclBinding parseCsvLine(String csv, int line) { + String[] values = csv.split(VALUES_SEPARATOR); + if (values.length != 7) { + throw new ValidationException("Input csv is not valid - there should be 7 columns in line " + line); + } + for (int i = 0; i < values.length; i++) { + if ((values[i] = values[i].trim()).isBlank()) { + throw new ValidationException("Input csv is not valid - blank value in colum " + i + ", line " + line); + } + } + try { + return new AclBinding( + new ResourcePattern( + ResourceType.valueOf(values[1]), values[3], PatternType.valueOf(values[2])), + new AccessControlEntry( + values[0], values[6], AclOperation.valueOf(values[4]), AclPermissionType.valueOf(values[5])) + ); + } catch (IllegalArgumentException enumParseError) { + throw new ValidationException("Error parsing enum value in line " + line); + } + } + + public static Collection parseCsv(String csvString) { + String[] lines = csvString.split(LINE_SEPARATOR); + if (lines.length == 0) { + throw new ValidationException("Error parsing ACL csv file: no lines in file"); + } + boolean firstLineIsHeader = HEADER.equalsIgnoreCase(lines[0].trim().replace(" ", "")); + Set result = new HashSet<>(); + for (int i = firstLineIsHeader ? 1 : 0; i < lines.length; i++) { + String line = lines[i]; + if (!line.isBlank()) { + AclBinding aclBinding = parseCsvLine(line, i); + result.add(aclBinding); + } + } + return result; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java new file mode 100644 index 00000000000..8c5a8dab06f --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java @@ -0,0 +1,93 @@ +package com.provectus.kafka.ui.service.acl; + +import com.google.common.collect.Sets; +import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.service.AdminClientService; +import java.util.List; +import java.util.Set; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.resource.ResourcePatternFilter; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@Slf4j +@Service +@RequiredArgsConstructor +public class AclsService { + + private final AdminClientService adminClientService; + + public Mono createAcl(KafkaCluster cluster, AclBinding aclBinding) { + var aclString = AclCsv.createAclString(aclBinding); + log.info("CREATING ACL: [{}]", aclString); + return adminClientService.get(cluster) + .flatMap(ac -> ac.createAcls(List.of(aclBinding))) + .doOnSuccess(v -> log.info("ACL CREATED: [{}]", aclString)); + } + + public Mono deleteAcl(KafkaCluster cluster, AclBinding aclBinding) { + var aclString = AclCsv.createAclString(aclBinding); + log.info("DELETING ACL: [{}]", aclString); + return adminClientService.get(cluster) + .flatMap(ac -> ac.deleteAcls(List.of(aclBinding))) + .doOnSuccess(v -> log.info("ACL DELETED: [{}]", aclString)); + } + + public Flux listAcls(KafkaCluster cluster, ResourcePatternFilter filter) { + return adminClientService.get(cluster) + .flatMap(c -> c.listAcls(filter)) + .flatMapIterable(acls -> acls); + } + + public Mono getAclAsCsvString(KafkaCluster cluster) { + return adminClientService.get(cluster) + .flatMap(c -> c.listAcls(ResourcePatternFilter.ANY)) + .map(AclCsv::transformToCsvString); + } + + public Mono syncAclWithAclCsv(KafkaCluster cluster, String csv) { + return adminClientService.get(cluster) + .flatMap(ac -> ac.listAcls(ResourcePatternFilter.ANY).flatMap(existingAclList -> { + var existingSet = Set.copyOf(existingAclList); + var newAcls = Set.copyOf(AclCsv.parseCsv(csv)); + var toDelete = Sets.difference(existingSet, newAcls); + var toAdd = Sets.difference(newAcls, existingSet); + logAclSyncPlan(cluster, toAdd, toDelete); + if (toAdd.isEmpty() && toDelete.isEmpty()) { + return Mono.empty(); + } + log.info("Starting new ACLs creation"); + return ac.createAcls(toAdd) + .doOnSuccess(v -> { + log.info("{} new ACLs created", toAdd.size()); + log.info("Starting ACLs deletion"); + }) + .then(ac.deleteAcls(toDelete) + .doOnSuccess(v -> log.info("{} ACLs deleted", toDelete.size()))); + })); + } + + private void logAclSyncPlan(KafkaCluster cluster, Set toBeAdded, Set toBeDeleted) { + log.info("'{}' cluster ACL sync plan: ", cluster.getName()); + if (toBeAdded.isEmpty() && toBeDeleted.isEmpty()) { + log.info("Nothing to do, ACL is already in sync"); + return; + } + if (!toBeAdded.isEmpty()) { + log.info("ACLs to be added ({}): ", toBeAdded.size()); + for (AclBinding aclBinding : toBeAdded) { + log.info(" " + AclCsv.createAclString(aclBinding)); + } + } + if (!toBeDeleted.isEmpty()) { + log.info("ACLs to be deleted ({}): ", toBeDeleted.size()); + for (AclBinding aclBinding : toBeDeleted) { + log.info(" " + AclCsv.createAclString(aclBinding)); + } + } + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxSslSocketFactory.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxSslSocketFactory.java index 06304365c79..fa84fc361c4 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxSslSocketFactory.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxSslSocketFactory.java @@ -61,7 +61,9 @@ class JmxSslSocketFactory extends javax.net.ssl.SSLSocketFactory { } catch (Exception e) { log.error("----------------------------------"); log.error("SSL can't be enabled for JMX retrieval. " - + "Make sure your java app run with '--add-opens java.rmi/javax.rmi.ssl=ALL-UNNAMED' arg.", e); + + "Make sure your java app run with '--add-opens java.rmi/javax.rmi.ssl=ALL-UNNAMED' arg. Err: {}", + e.getMessage()); + log.trace("SSL can't be enabled for JMX retrieval", e); log.error("----------------------------------"); } SSL_JMX_SUPPORTED = sslJmxSupported; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaVersion.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaVersion.java index 5ed21c6a6e2..3d6b2ca40ee 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaVersion.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaVersion.java @@ -1,24 +1,21 @@ package com.provectus.kafka.ui.util; -import lombok.extern.slf4j.Slf4j; +import java.util.Optional; -@Slf4j public final class KafkaVersion { private KafkaVersion() { } - public static float parse(String version) throws NumberFormatException { - log.trace("Parsing cluster version [{}]", version); + public static Optional parse(String version) throws NumberFormatException { try { final String[] parts = version.split("\\."); if (parts.length > 2) { version = parts[0] + "." + parts[1]; } - return Float.parseFloat(version.split("-")[0]); + return Optional.of(Float.parseFloat(version.split("-")[0])); } catch (Exception e) { - log.error("Conversion clusterVersion [{}] to float value failed", version, e); - throw e; + return Optional.empty(); } } } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclCsvTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclCsvTest.java new file mode 100644 index 00000000000..08ca4d15073 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclCsvTest.java @@ -0,0 +1,70 @@ +package com.provectus.kafka.ui.service.acl; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.provectus.kafka.ui.exception.ValidationException; +import java.util.Collection; +import java.util.List; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +class AclCsvTest { + + private static final List TEST_BINDINGS = List.of( + new AclBinding( + new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL), + new AccessControlEntry("User:test1", "*", AclOperation.READ, AclPermissionType.ALLOW)), + new AclBinding( + new ResourcePattern(ResourceType.GROUP, "group1", PatternType.PREFIXED), + new AccessControlEntry("User:test2", "localhost", AclOperation.DESCRIBE, AclPermissionType.DENY)) + ); + + @ParameterizedTest + @ValueSource(strings = { + "Principal,ResourceType, PatternType, ResourceName,Operation,PermissionType,Host\n" + + "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n" + + "User:test2,GROUP,PREFIXED,group1,DESCRIBE,DENY,localhost", + + //without header + "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n" + + "\n" + + "User:test2,GROUP,PREFIXED,group1,DESCRIBE,DENY,localhost" + + "\n" + }) + void parsesValidInputCsv(String csvString) { + Collection parsed = AclCsv.parseCsv(csvString); + assertThat(parsed).containsExactlyInAnyOrderElementsOf(TEST_BINDINGS); + } + + @ParameterizedTest + @ValueSource(strings = { + // columns > 7 + "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*,1,2,3,4", + // columns < 7 + "User:test1,TOPIC,LITERAL,*", + // enum values are illegal + "User:test1,ILLEGAL,LITERAL,*,READ,ALLOW,*", + "User:test1,TOPIC,LITERAL,*,READ,ILLEGAL,*" + }) + void throwsExceptionForInvalidInputCsv(String csvString) { + assertThatThrownBy(() -> AclCsv.parseCsv(csvString)) + .isInstanceOf(ValidationException.class); + } + + @Test + void transformAndParseUseSameFormat() { + String csv = AclCsv.transformToCsvString(TEST_BINDINGS); + Collection parsedBindings = AclCsv.parseCsv(csv); + assertThat(parsedBindings).containsExactlyInAnyOrderElementsOf(TEST_BINDINGS); + } + +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java new file mode 100644 index 00000000000..5791bb20414 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java @@ -0,0 +1,82 @@ +package com.provectus.kafka.ui.service.acl; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.service.AdminClientService; +import com.provectus.kafka.ui.service.ReactiveAdminClient; +import java.util.Collection; +import java.util.List; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; +import org.apache.kafka.common.resource.ResourceType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import reactor.core.publisher.Mono; + +class AclsServiceTest { + + private static final KafkaCluster CLUSTER = KafkaCluster.builder().build(); + + private final ReactiveAdminClient adminClientMock = mock(ReactiveAdminClient.class); + private final AdminClientService adminClientService = mock(AdminClientService.class); + + private final AclsService aclsService = new AclsService(adminClientService); + + @BeforeEach + void initMocks() { + when(adminClientService.get(CLUSTER)).thenReturn(Mono.just(adminClientMock)); + } + + @Test + void testSyncAclWithAclCsv() { + var existingBinding1 = new AclBinding( + new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL), + new AccessControlEntry("User:test1", "*", AclOperation.READ, AclPermissionType.ALLOW)); + + var existingBinding2 = new AclBinding( + new ResourcePattern(ResourceType.GROUP, "group1", PatternType.PREFIXED), + new AccessControlEntry("User:test2", "localhost", AclOperation.DESCRIBE, AclPermissionType.DENY)); + + var newBindingToBeAdded = new AclBinding( + new ResourcePattern(ResourceType.GROUP, "groupNew", PatternType.PREFIXED), + new AccessControlEntry("User:test3", "localhost", AclOperation.DESCRIBE, AclPermissionType.DENY)); + + when(adminClientMock.listAcls(ResourcePatternFilter.ANY)) + .thenReturn(Mono.just(List.of(existingBinding1, existingBinding2))); + + ArgumentCaptor createdCaptor = ArgumentCaptor.forClass(Collection.class); + when(adminClientMock.createAcls((Collection) createdCaptor.capture())) + .thenReturn(Mono.empty()); + + ArgumentCaptor deletedCaptor = ArgumentCaptor.forClass(Collection.class); + when(adminClientMock.deleteAcls((Collection) deletedCaptor.capture())) + .thenReturn(Mono.empty()); + + aclsService.syncAclWithAclCsv( + CLUSTER, + "Principal,ResourceType, PatternType, ResourceName,Operation,PermissionType,Host\n" + + "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n" + + "User:test3,GROUP,PREFIXED,groupNew,DESCRIBE,DENY,localhost" + ).block(); + + Collection createdBindings = (Collection) createdCaptor.getValue(); + assertThat(createdBindings) + .hasSize(1) + .contains(newBindingToBeAdded); + + Collection deletedBindings = (Collection) deletedCaptor.getValue(); + assertThat(deletedBindings) + .hasSize(1) + .contains(existingBinding2); + } + +} diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index b589198b5a1..b89f8d09635 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1730,6 +1730,125 @@ paths: 404: description: Not found + /api/clusters/{clusterName}/acls: + get: + tags: + - Acls + summary: listKafkaAcls + operationId: listAcls + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: resourceType + in: query + required: false + schema: + $ref: '#/components/schemas/KafkaAclResourceType' + - name: resourceName + in: query + required: false + schema: + type: string + - name: namePatternType + in: query + required: false + schema: + $ref: '#/components/schemas/KafkaAclNamePatternType' + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/KafkaAcl' + + /api/clusters/{clusterName}/acl/csv: + get: + tags: + - Acls + summary: getAclAsCsv + operationId: getAclAsCsv + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + text/plain: + schema: + type: string + post: + tags: + - Acls + summary: syncAclsCsv + operationId: syncAclsCsv + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + text/plain: + schema: + type: string + responses: + 200: + description: OK + + /api/clusters/{clusterName}/acl: + post: + tags: + - Acls + summary: createAcl + operationId: createAcl + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/KafkaAcl' + responses: + 200: + description: OK + + delete: + tags: + - Acls + summary: deleteAcl + operationId: deleteAcl + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/KafkaAcl' + responses: + 200: + description: OK + 404: + description: Acl not found + /api/authorization: get: tags: @@ -1972,6 +2091,8 @@ components: - KAFKA_CONNECT - KSQL_DB - TOPIC_DELETION + - KAFKA_ACL_VIEW # get ACLs listing + - KAFKA_ACL_EDIT # create & delete ACLs required: - id - name @@ -3342,6 +3463,62 @@ components: - SCHEMA - CONNECT - KSQL + - ACL + + KafkaAcl: + type: object + required: [resourceType, resourceName, namePatternType, principal, host, operation, permission] + properties: + resourceType: + $ref: '#/components/schemas/KafkaAclResourceType' + resourceName: + type: string # "*" if acl can be applied to any resource of given type + namePatternType: + $ref: '#/components/schemas/KafkaAclNamePatternType' + principal: + type: string + host: + type: string # "*" if acl can be applied to any resource of given type + operation: + type: string + enum: + - UNKNOWN # Unknown operation, need to update mapping code on BE + - ALL # Cluster, Topic, Group + - READ # Topic, Group + - WRITE # Topic, TransactionalId + - CREATE # Cluster, Topic + - DELETE # Topic, Group + - ALTER # Cluster, Topic, + - DESCRIBE # Cluster, Topic, Group, TransactionalId, DelegationToken + - CLUSTER_ACTION # Cluster + - DESCRIBE_CONFIGS # Cluster, Topic + - ALTER_CONFIGS # Cluster, Topic + - IDEMPOTENT_WRITE # Cluster + - CREATE_TOKENS + - DESCRIBE_TOKENS + permission: + type: string + enum: + - ALLOW + - DENY + + KafkaAclResourceType: + type: string + enum: + - UNKNOWN # Unknown operation, need to update mapping code on BE + - TOPIC + - GROUP + - CLUSTER + - TRANSACTIONAL_ID + - DELEGATION_TOKEN + - USER + + KafkaAclNamePatternType: + type: string + enum: + - MATCH + - LITERAL + - PREFIXED RestartRequest: type: object