diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java new file mode 100644 index 00000000000..8be2cddcc13 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java @@ -0,0 +1,105 @@ +package com.provectus.kafka.ui.controller; + +import static java.util.stream.Collectors.toMap; + +import com.provectus.kafka.ui.api.ClientQuotasApi; +import com.provectus.kafka.ui.model.ClientQuotasDTO; +import com.provectus.kafka.ui.model.rbac.AccessContext; +import com.provectus.kafka.ui.model.rbac.permission.ClientQuotaAction; +import com.provectus.kafka.ui.service.quota.ClientQuotaRecord; +import com.provectus.kafka.ui.service.quota.ClientQuotaService; +import java.math.BigDecimal; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import lombok.RequiredArgsConstructor; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@RestController +@RequiredArgsConstructor +public class ClientQuotasController extends AbstractController implements ClientQuotasApi { + + private static final Comparator QUOTA_RECORDS_COMPARATOR = + Comparator.nullsLast(Comparator.comparing(ClientQuotaRecord::user)) + .thenComparing(Comparator.nullsLast(Comparator.comparing(ClientQuotaRecord::clientId))) + .thenComparing(Comparator.nullsLast(Comparator.comparing(ClientQuotaRecord::ip))); + + private final ClientQuotaService clientQuotaService; + + @Override + public Mono>> listQuotas(String clusterName, + ServerWebExchange exchange) { + var context = AccessContext.builder() + .cluster(clusterName) + .operationName("listClientQuotas") + .clientQuotaActions(ClientQuotaAction.VIEW) + .build(); + + Mono>> operation = + Mono.just( + clientQuotaService.getAll(getCluster(clusterName)) + .sort(QUOTA_RECORDS_COMPARATOR) + .map(this::mapToDto) + ).map(ResponseEntity::ok); + + return validateAccess(context) + .then(operation) + .doOnEach(sig -> audit(context, sig)); + } + + @Override + public Mono> upsertClientQuotas(String clusterName, + Mono quotasDto, + ServerWebExchange exchange) { + return quotasDto.flatMap( + newQuotas -> { + var context = AccessContext.builder() + .cluster(clusterName) + .operationName("upsertClientQuotas") + .operationParams(Map.of("newQuotas", newQuotas)) + .clientQuotaActions(ClientQuotaAction.EDIT) + .build(); + + Mono> operation = clientQuotaService.upsert( + getCluster(clusterName), + newQuotas.getUser(), + newQuotas.getClientId(), + newQuotas.getIp(), + Optional.ofNullable(newQuotas.getQuotas()).orElse(Map.of()) + .entrySet() + .stream() + .collect(toMap(Map.Entry::getKey, e -> e.getValue().doubleValue())) + ) + .map(statusCode -> ResponseEntity.status(statusCode).build()); + + return validateAccess(context) + .then(operation) + .doOnEach(sig -> audit(context, sig)); + } + ); + } + + private ClientQuotasDTO mapToDto(ClientQuotaRecord quotaRecord) { + return new ClientQuotasDTO() + .user(quotaRecord.user()) + .clientId(quotaRecord.clientId()) + .ip(quotaRecord.ip()) + .quotas( + quotaRecord.quotas().entrySet() + .stream() + .sorted(Map.Entry.comparingByKey()) + .collect(toMap( + Map.Entry::getKey, + e -> BigDecimal.valueOf(e.getValue()), + (v1, v2) -> null, //won't be called + LinkedHashMap::new //to keep order + )) + ); + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java index 2973e5500d9..fec21b40dbf 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java @@ -6,5 +6,6 @@ public enum ClusterFeature { SCHEMA_REGISTRY, TOPIC_DELETION, KAFKA_ACL_VIEW, - KAFKA_ACL_EDIT + KAFKA_ACL_EDIT, + CLIENT_QUOTA_MANAGEMENT } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java index cf126bf3dfe..75560fddb4b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java @@ -3,6 +3,7 @@ import com.provectus.kafka.ui.model.rbac.permission.AclAction; import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction; import com.provectus.kafka.ui.model.rbac.permission.AuditAction; +import com.provectus.kafka.ui.model.rbac.permission.ClientQuotaAction; import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction; import com.provectus.kafka.ui.model.rbac.permission.ConnectAction; import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction; @@ -44,6 +45,8 @@ public class AccessContext { Collection auditAction; + Collection clientQuotaActions; + String operationName; Object operationParams; @@ -67,6 +70,7 @@ public static final class AccessContextBuilder { private Collection ksqlActions = Collections.emptySet(); private Collection aclActions = Collections.emptySet(); private Collection auditActions = Collections.emptySet(); + private Collection clientQuotaActions = Collections.emptySet(); private String operationName; private Object operationParams; @@ -158,6 +162,12 @@ public AccessContextBuilder auditActions(AuditAction... actions) { return this; } + public AccessContextBuilder clientQuotaActions(ClientQuotaAction... actions) { + Assert.isTrue(actions.length > 0, "actions not present"); + this.clientQuotaActions = List.of(actions); + return this; + } + public AccessContextBuilder operationName(String operationName) { this.operationName = operationName; return this; @@ -182,7 +192,7 @@ public AccessContext build() { connect, connectActions, connector, schema, schemaActions, - ksqlActions, aclActions, auditActions, + ksqlActions, aclActions, auditActions, clientQuotaActions, operationName, operationParams); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java index 56b0a098026..95810cbeda0 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java @@ -3,12 +3,14 @@ import static com.provectus.kafka.ui.model.rbac.Resource.ACL; import static com.provectus.kafka.ui.model.rbac.Resource.APPLICATIONCONFIG; import static com.provectus.kafka.ui.model.rbac.Resource.AUDIT; +import static com.provectus.kafka.ui.model.rbac.Resource.CLIENT_QUOTAS; import static com.provectus.kafka.ui.model.rbac.Resource.CLUSTERCONFIG; import static com.provectus.kafka.ui.model.rbac.Resource.KSQL; import com.provectus.kafka.ui.model.rbac.permission.AclAction; import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction; import com.provectus.kafka.ui.model.rbac.permission.AuditAction; +import com.provectus.kafka.ui.model.rbac.permission.ClientQuotaAction; import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction; import com.provectus.kafka.ui.model.rbac.permission.ConnectAction; import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction; @@ -32,7 +34,7 @@ public class Permission { private static final List RBAC_ACTION_EXEMPT_LIST = - List.of(KSQL, CLUSTERCONFIG, APPLICATIONCONFIG, ACL, AUDIT); + List.of(KSQL, CLUSTERCONFIG, APPLICATIONCONFIG, ACL, AUDIT, CLIENT_QUOTAS); Resource resource; List actions; @@ -88,6 +90,7 @@ private List getAllActionValues() { case KSQL -> Arrays.stream(KsqlAction.values()).map(Enum::toString).toList(); case ACL -> Arrays.stream(AclAction.values()).map(Enum::toString).toList(); case AUDIT -> Arrays.stream(AuditAction.values()).map(Enum::toString).toList(); + case CLIENT_QUOTAS -> Arrays.stream(ClientQuotaAction.values()).map(Enum::toString).toList(); }; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java index ca2efab3a9a..f724c4326f1 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java @@ -13,7 +13,8 @@ public enum Resource { CONNECT, KSQL, ACL, - AUDIT; + AUDIT, + CLIENT_QUOTAS; @Nullable public static Resource fromString(String name) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ClientQuotaAction.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ClientQuotaAction.java new file mode 100644 index 00000000000..b2ff994d0d0 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ClientQuotaAction.java @@ -0,0 +1,19 @@ +package com.provectus.kafka.ui.model.rbac.permission; + +import java.util.Set; + +public enum ClientQuotaAction implements PermissibleAction { + + VIEW, + EDIT + + ; + + public static final Set ALTER_ACTIONS = Set.of(EDIT); + + @Override + public boolean isAlter() { + return ALTER_ACTIONS.contains(this); + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/PermissibleAction.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/PermissibleAction.java index 24c4adba9f0..b2d740cff2c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/PermissibleAction.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/PermissibleAction.java @@ -4,7 +4,7 @@ public sealed interface PermissibleAction permits AclAction, ApplicationConfigAction, ConsumerGroupAction, SchemaAction, ConnectAction, ClusterConfigAction, - KsqlAction, TopicAction, AuditAction { + KsqlAction, TopicAction, AuditAction, ClientQuotaAction { String name(); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java index b08691aef5a..f51054b5c58 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java @@ -1,5 +1,7 @@ package com.provectus.kafka.ui.service; +import static com.provectus.kafka.ui.service.ReactiveAdminClient.SupportedFeature.CLIENT_QUOTA_MANAGEMENT; + import com.provectus.kafka.ui.model.ClusterFeature; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription; @@ -41,6 +43,7 @@ public Mono> getAvailableFeatures(ReactiveAdminClient admin features.add(topicDeletionEnabled(adminClient)); features.add(aclView(adminClient)); features.add(aclEdit(adminClient, clusterDescription)); + features.add(quotaManagement(adminClient)); return Flux.fromIterable(features).flatMap(m -> m).collectList(); } @@ -70,4 +73,10 @@ private boolean aclViewEnabled(ReactiveAdminClient adminClient) { return adminClient.getClusterFeatures().contains(ReactiveAdminClient.SupportedFeature.AUTHORIZED_SECURITY_ENABLED); } + private Mono quotaManagement(ReactiveAdminClient adminClient) { + return adminClient.getClusterFeatures().contains(CLIENT_QUOTA_MANAGEMENT) + ? Mono.just(ClusterFeature.CLIENT_QUOTA_MANAGEMENT) + : Mono.empty(); + } + } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index 9de908efa7f..70c1e3ff1cd 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -77,6 +77,9 @@ import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.quota.ClientQuotaAlteration; +import org.apache.kafka.common.quota.ClientQuotaEntity; +import org.apache.kafka.common.quota.ClientQuotaFilter; import org.apache.kafka.common.requests.DescribeLogDirsResponse; import org.apache.kafka.common.resource.ResourcePatternFilter; import reactor.core.publisher.Flux; @@ -94,7 +97,8 @@ public enum SupportedFeature { INCREMENTAL_ALTER_CONFIGS(2.3f), CONFIG_DOCUMENTATION_RETRIEVAL(2.6f), DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f), - AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled); + AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled), + CLIENT_QUOTA_MANAGEMENT(2.6f); private final BiFunction> predicate; @@ -658,6 +662,14 @@ public Mono alterReplicaLogDirs(Map replica return toMono(client.alterReplicaLogDirs(replicaAssignment).all()); } + public Mono>> getClientQuotas(ClientQuotaFilter filter) { + return toMono(client.describeClientQuotas(filter).entities()); + } + + public Mono alterClientQuota(ClientQuotaAlteration alteration) { + return toMono(client.alterClientQuotas(List.of(alteration)).all()); + } + private Mono incrementalAlterConfig(String topicName, List currentConfigs, Map newConfigs) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditRecord.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditRecord.java index 3f7fb44aacd..870da8668df 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditRecord.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditRecord.java @@ -65,6 +65,8 @@ static List getAccessedResources(AccessContext ctx) { .forEach(a -> resources.add(create(a, Resource.ACL, null))); ctx.getAuditAction() .forEach(a -> resources.add(create(a, Resource.AUDIT, null))); + ctx.getClientQuotaActions() + .forEach(a -> resources.add(create(a, Resource.CLIENT_QUOTAS, null))); return resources; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaRecord.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaRecord.java new file mode 100644 index 00000000000..a348b2a1ea9 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaRecord.java @@ -0,0 +1,20 @@ +package com.provectus.kafka.ui.service.quota; + +import jakarta.annotation.Nullable; +import java.util.Map; +import org.apache.kafka.common.quota.ClientQuotaEntity; + +public record ClientQuotaRecord(@Nullable String user, + @Nullable String clientId, + @Nullable String ip, + Map quotas) { + + static ClientQuotaRecord create(ClientQuotaEntity entity, Map quotas) { + return new ClientQuotaRecord( + entity.entries().get(ClientQuotaEntity.USER), + entity.entries().get(ClientQuotaEntity.CLIENT_ID), + entity.entries().get(ClientQuotaEntity.IP), + quotas + ); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java new file mode 100644 index 00000000000..679f7b9381d --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java @@ -0,0 +1,103 @@ +package com.provectus.kafka.ui.service.quota; + +import static org.apache.kafka.common.quota.ClientQuotaEntity.CLIENT_ID; +import static org.apache.kafka.common.quota.ClientQuotaEntity.IP; +import static org.apache.kafka.common.quota.ClientQuotaEntity.USER; + +import com.google.common.collect.Sets; +import com.provectus.kafka.ui.exception.ValidationException; +import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.service.AdminClientService; +import com.provectus.kafka.ui.service.ReactiveAdminClient; +import jakarta.annotation.Nullable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Stream; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.common.quota.ClientQuotaAlteration; +import org.apache.kafka.common.quota.ClientQuotaEntity; +import org.apache.kafka.common.quota.ClientQuotaFilter; +import org.apache.kafka.common.quota.ClientQuotaFilterComponent; +import org.springframework.http.HttpStatus; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@Service +@RequiredArgsConstructor +public class ClientQuotaService { + + private final AdminClientService adminClientService; + + public Flux getAll(KafkaCluster cluster) { + return adminClientService.get(cluster) + .flatMap(ac -> ac.getClientQuotas(ClientQuotaFilter.all())) + .flatMapIterable(Map::entrySet) + .map(e -> ClientQuotaRecord.create(e.getKey(), e.getValue())); + } + + //returns 201 if new entity was created, 200 if existing was updated, 204 if existing was deleted + public Mono upsert(KafkaCluster cluster, + @Nullable String user, + @Nullable String clientId, + @Nullable String ip, + Map newQuotas) { + ClientQuotaEntity quotaEntity = quotaEntity(user, clientId, ip); + return adminClientService.get(cluster) + .flatMap(ac -> + findQuotas(ac, quotaEntity) + .flatMap(currentQuotas -> { + HttpStatus result = HttpStatus.OK; //updated + if (newQuotas.isEmpty()) { + result = HttpStatus.NO_CONTENT; //deleted + } else if (currentQuotas.isEmpty()) { + result = HttpStatus.CREATED; + } + var alteration = createAlteration(quotaEntity, currentQuotas, newQuotas); + return ac.alterClientQuota(alteration) + .thenReturn(result); + }) + ); + } + + private ClientQuotaEntity quotaEntity(@Nullable String user, @Nullable String clientId, @Nullable String ip) { + if (Stream.of(user, clientId, ip).allMatch(Objects::isNull)) { + throw new ValidationException("Quota entity id is not set"); + } + var id = new HashMap(); + Optional.ofNullable(user).ifPresent(u -> id.put(USER, u)); + Optional.ofNullable(clientId).ifPresent(cid -> id.put(CLIENT_ID, cid)); + Optional.ofNullable(ip).ifPresent(i -> id.put(IP, i)); + return new ClientQuotaEntity(id); + } + + private ClientQuotaAlteration createAlteration(ClientQuotaEntity quotaEntity, + Map currentQuotas, + Map newQuotas) { + Set quotasToClear = Sets.difference(currentQuotas.keySet(), newQuotas.keySet()); + List ops = Stream.concat( + quotasToClear.stream() + .map(name -> new ClientQuotaAlteration.Op(name, null)), //setting null value to clear current state + newQuotas.entrySet().stream() + .map(e -> new ClientQuotaAlteration.Op(e.getKey(), e.getValue())) + ).toList(); + return new ClientQuotaAlteration(quotaEntity, ops); + } + + // returns empty map if no quotas found for an entity + private Mono> findQuotas(ReactiveAdminClient ac, ClientQuotaEntity quotaEntity) { + return ac.getClientQuotas(crateSearchFilter(quotaEntity)) + .map(found -> Optional.ofNullable(found.get(quotaEntity)).orElse(Map.of())); + } + + private ClientQuotaFilter crateSearchFilter(ClientQuotaEntity quotaEntity) { + List filters = new ArrayList<>(); + quotaEntity.entries().forEach((type, name) -> filters.add(ClientQuotaFilterComponent.ofEntity(type, name))); + return ClientQuotaFilter.contains(filters); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java index 59ea02fea84..69a851ded6d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java @@ -123,7 +123,8 @@ && isConnectorAccessible(context, user) // TODO connector selectors && isSchemaAccessible(context, user) && isKsqlAccessible(context, user) && isAclAccessible(context, user) - && isAuditAccessible(context, user); + && isAuditAccessible(context, user) + && isClientQuotaAccessible(context, user); if (!accessGranted) { throw new AccessDeniedException(ACCESS_DENIED); @@ -417,6 +418,23 @@ private boolean isAuditAccessible(AccessContext context, AuthenticatedUser user) return isAccessible(Resource.AUDIT, null, user, context, requiredActions); } + private boolean isClientQuotaAccessible(AccessContext context, AuthenticatedUser user) { + if (!rbacEnabled) { + return true; + } + + if (context.getClientQuotaActions().isEmpty()) { + return true; + } + + Set requiredActions = context.getClientQuotaActions() + .stream() + .map(a -> a.toString().toUpperCase()) + .collect(Collectors.toSet()); + + return isAccessible(Resource.CLIENT_QUOTAS, null, user, context, requiredActions); + } + public Set getOauthExtractors() { return oauthExtractors; } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/audit/AuditWriterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/audit/AuditWriterTest.java index 5bcee45ac8e..4213e888d3f 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/audit/AuditWriterTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/audit/AuditWriterTest.java @@ -8,6 +8,7 @@ import com.provectus.kafka.ui.model.rbac.AccessContext; import com.provectus.kafka.ui.model.rbac.AccessContext.AccessContextBuilder; import com.provectus.kafka.ui.model.rbac.permission.AclAction; +import com.provectus.kafka.ui.model.rbac.permission.ClientQuotaAction; import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction; import com.provectus.kafka.ui.model.rbac.permission.ConnectAction; import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction; @@ -55,9 +56,11 @@ static Stream onlyLogsWhenAlterOperationIsPresentForOneOfResource SchemaAction.ALTER_ACTIONS.stream().map(a -> c -> c.schema("sc").schemaActions(a)); Stream> connEditActions = ConnectAction.ALTER_ACTIONS.stream().map(a -> c -> c.connect("conn").connectActions(a)); + Stream> quotaEditActions = + ClientQuotaAction.ALTER_ACTIONS.stream().map(a -> c -> c.clientQuotaActions(a)); return Stream.of( topicEditActions, clusterConfigEditActions, aclEditActions, - cgEditActions, connEditActions, schemaEditActions + cgEditActions, connEditActions, schemaEditActions, quotaEditActions ) .flatMap(c -> c) .map(setter -> setter.apply(AccessContext.builder().cluster("test").operationName("test")).build()); @@ -78,7 +81,8 @@ static Stream doesNothingIfNoResourceHasAlterAction() { c -> c.aclActions(AclAction.VIEW), c -> c.consumerGroup("cg").consumerGroupActions(ConsumerGroupAction.VIEW), c -> c.schema("sc").schemaActions(SchemaAction.VIEW), - c -> c.connect("conn").connectActions(ConnectAction.VIEW) + c -> c.connect("conn").connectActions(ConnectAction.VIEW), + c -> c.clientQuotaActions(ClientQuotaAction.VIEW) ).map(setter -> setter.apply(AccessContext.builder().cluster("test").operationName("test")).build()); } } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java new file mode 100644 index 00000000000..2f04008dd46 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java @@ -0,0 +1,78 @@ +package com.provectus.kafka.ui.service.quota; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.provectus.kafka.ui.AbstractIntegrationTest; +import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.service.ClustersStorage; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.springframework.beans.factory.annotation.Autowired; +import reactor.test.StepVerifier; + +class ClientQuotaServiceTest extends AbstractIntegrationTest { + + @Autowired + ClientQuotaService quotaService; + + private KafkaCluster cluster; + + @BeforeEach + void init() { + cluster = applicationContext.getBean(ClustersStorage.class).getClusterByName(LOCAL).get(); + } + + @ParameterizedTest + @CsvSource( + value = { + "testUser, null, null ", + "null, testUserId, null", + "testUser2, testUserId2, null", + }, + nullValues = "null" + ) + void createUpdateDelete(String user, String clientId, String ip) { + var initialQuotas = Map.of( + "producer_byte_rate", 123.0, + "consumer_byte_rate", 234.0, + "request_percentage", 10.0 + ); + + //creating new + StepVerifier.create( + quotaService.upsert(cluster, user, clientId, ip, initialQuotas) + ) + .assertNext(status -> assertThat(status.value()).isEqualTo(201)) + .verifyComplete(); + + assertThat(quotaRecordExists(new ClientQuotaRecord(user, clientId, ip, initialQuotas))) + .isTrue(); + + //updating + StepVerifier.create( + quotaService.upsert(cluster, user, clientId, ip, Map.of("producer_byte_rate", 22222.0)) + ) + .assertNext(status -> assertThat(status.value()).isEqualTo(200)) + .verifyComplete(); + + assertThat(quotaRecordExists(new ClientQuotaRecord(user, clientId, ip, Map.of("producer_byte_rate", 22222.0)))) + .isTrue(); + + //deleting created record + StepVerifier.create( + quotaService.upsert(cluster, user, clientId, ip, Map.of()) + ) + .assertNext(status -> assertThat(status.value()).isEqualTo(204)) + .verifyComplete(); + + assertThat(quotaRecordExists(new ClientQuotaRecord(user, clientId, ip, Map.of("producer_byte_rate", 22222.0)))) + .isFalse(); + } + + private boolean quotaRecordExists(ClientQuotaRecord rec) { + return quotaService.getAll(cluster).collectList().block().contains(rec); + } + +} diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 306c3fd2ddd..fc541496e77 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1910,6 +1910,55 @@ paths: 200: description: OK + /api/clusters/{clusterName}/clientquotas: + get: + tags: + - ClientQuotas + summary: listQuotas + operationId: listQuotas + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/ClientQuotas' + post: + tags: + - ClientQuotas + summary: upsertClientQuotas + operationId: upsertClientQuotas + description: | + - updates/creates client quota record if `quotas` field is non-empty + - deletes client quota record if `quotas` field is null or empty + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/ClientQuotas' + responses: + 200: + description: Existing quota updated + 201: + description: New quota created + 204: + description: Existing quota deleted + + /api/clusters/{clusterName}/acl/streamApp: post: tags: @@ -2175,6 +2224,7 @@ components: - TOPIC_DELETION - KAFKA_ACL_VIEW # get ACLs listing - KAFKA_ACL_EDIT # create & delete ACLs + - CLIENT_QUOTA_MANAGEMENT required: - id - name @@ -3600,6 +3650,7 @@ components: - KSQL - ACL - AUDIT + - CLIENT_QUOTAS KafkaAcl: type: object @@ -3701,6 +3752,20 @@ components: nullable: false type: string + ClientQuotas: + type: object + properties: + user: + type: string + clientId: + type: string + ip: + type: string + quotas: + type: object + additionalProperties: + type: number + KafkaAclResourceType: type: string enum: