Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BE: Client Quotas Management API #4112

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package com.provectus.kafka.ui.controller;

import static java.util.stream.Collectors.toMap;

import com.provectus.kafka.ui.api.ClientQuotasApi;
import com.provectus.kafka.ui.model.ClientQuotasDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.ClientQuotaAction;
import com.provectus.kafka.ui.service.quota.ClientQuotaRecord;
import com.provectus.kafka.ui.service.quota.ClientQuotaService;
import java.math.BigDecimal;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
@RequiredArgsConstructor
public class ClientQuotasController extends AbstractController implements ClientQuotasApi {

private static final Comparator<ClientQuotaRecord> QUOTA_RECORDS_COMPARATOR =
Comparator.nullsLast(Comparator.comparing(ClientQuotaRecord::user))
.thenComparing(Comparator.nullsLast(Comparator.comparing(ClientQuotaRecord::clientId)))
.thenComparing(Comparator.nullsLast(Comparator.comparing(ClientQuotaRecord::ip)));

private final ClientQuotaService clientQuotaService;

@Override
public Mono<ResponseEntity<Flux<ClientQuotasDTO>>> listQuotas(String clusterName,
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.operationName("listClientQuotas")
.clientQuotaActions(ClientQuotaAction.VIEW)
.build();

Mono<ResponseEntity<Flux<ClientQuotasDTO>>> operation =
Mono.just(
clientQuotaService.getAll(getCluster(clusterName))
.sort(QUOTA_RECORDS_COMPARATOR)
.map(this::mapToDto)
).map(ResponseEntity::ok);

return validateAccess(context)
.then(operation)
.doOnEach(sig -> audit(context, sig));
}

@Override
public Mono<ResponseEntity<Void>> upsertClientQuotas(String clusterName,
Mono<ClientQuotasDTO> quotasDto,
ServerWebExchange exchange) {
return quotasDto.flatMap(
newQuotas -> {
var context = AccessContext.builder()
.cluster(clusterName)
.operationName("upsertClientQuotas")
.operationParams(Map.of("newQuotas", newQuotas))
.clientQuotaActions(ClientQuotaAction.EDIT)
.build();

Mono<ResponseEntity<Void>> operation = clientQuotaService.upsert(
getCluster(clusterName),
newQuotas.getUser(),
newQuotas.getClientId(),
newQuotas.getIp(),
Optional.ofNullable(newQuotas.getQuotas()).orElse(Map.of())
.entrySet()
.stream()
.collect(toMap(Map.Entry::getKey, e -> e.getValue().doubleValue()))
)
.map(statusCode -> ResponseEntity.status(statusCode).build());

return validateAccess(context)
.then(operation)
.doOnEach(sig -> audit(context, sig));
}
);
}

private ClientQuotasDTO mapToDto(ClientQuotaRecord quotaRecord) {
return new ClientQuotasDTO()
.user(quotaRecord.user())
.clientId(quotaRecord.clientId())
.ip(quotaRecord.ip())
.quotas(
quotaRecord.quotas().entrySet()
.stream()
.sorted(Map.Entry.comparingByKey())
.collect(toMap(
Map.Entry::getKey,
e -> BigDecimal.valueOf(e.getValue()),
(v1, v2) -> null, //won't be called
LinkedHashMap::new //to keep order
))
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ public enum ClusterFeature {
SCHEMA_REGISTRY,
TOPIC_DELETION,
KAFKA_ACL_VIEW,
KAFKA_ACL_EDIT
KAFKA_ACL_EDIT,
CLIENT_QUOTA_MANAGEMENT
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.AuditAction;
import com.provectus.kafka.ui.model.rbac.permission.ClientQuotaAction;
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction;
Expand Down Expand Up @@ -44,6 +45,8 @@ public class AccessContext {

Collection<AuditAction> auditAction;

Collection<ClientQuotaAction> clientQuotaActions;

String operationName;
Object operationParams;

Expand All @@ -67,6 +70,7 @@ public static final class AccessContextBuilder {
private Collection<KsqlAction> ksqlActions = Collections.emptySet();
private Collection<AclAction> aclActions = Collections.emptySet();
private Collection<AuditAction> auditActions = Collections.emptySet();
private Collection<ClientQuotaAction> clientQuotaActions = Collections.emptySet();

private String operationName;
private Object operationParams;
Expand Down Expand Up @@ -158,6 +162,12 @@ public AccessContextBuilder auditActions(AuditAction... actions) {
return this;
}

public AccessContextBuilder clientQuotaActions(ClientQuotaAction... actions) {
Assert.isTrue(actions.length > 0, "actions not present");
this.clientQuotaActions = List.of(actions);
return this;
}

public AccessContextBuilder operationName(String operationName) {
this.operationName = operationName;
return this;
Expand All @@ -182,7 +192,7 @@ public AccessContext build() {
connect, connectActions,
connector,
schema, schemaActions,
ksqlActions, aclActions, auditActions,
ksqlActions, aclActions, auditActions, clientQuotaActions,
operationName, operationParams);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import static com.provectus.kafka.ui.model.rbac.Resource.ACL;
import static com.provectus.kafka.ui.model.rbac.Resource.APPLICATIONCONFIG;
import static com.provectus.kafka.ui.model.rbac.Resource.AUDIT;
import static com.provectus.kafka.ui.model.rbac.Resource.CLIENT_QUOTAS;
import static com.provectus.kafka.ui.model.rbac.Resource.CLUSTERCONFIG;
import static com.provectus.kafka.ui.model.rbac.Resource.KSQL;

import com.provectus.kafka.ui.model.rbac.permission.AclAction;
import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.AuditAction;
import com.provectus.kafka.ui.model.rbac.permission.ClientQuotaAction;
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction;
Expand All @@ -32,7 +34,7 @@
public class Permission {

private static final List<Resource> RBAC_ACTION_EXEMPT_LIST =
List.of(KSQL, CLUSTERCONFIG, APPLICATIONCONFIG, ACL, AUDIT);
List.of(KSQL, CLUSTERCONFIG, APPLICATIONCONFIG, ACL, AUDIT, CLIENT_QUOTAS);

Resource resource;
List<String> actions;
Expand Down Expand Up @@ -88,6 +90,7 @@ private List<String> getAllActionValues() {
case KSQL -> Arrays.stream(KsqlAction.values()).map(Enum::toString).toList();
case ACL -> Arrays.stream(AclAction.values()).map(Enum::toString).toList();
case AUDIT -> Arrays.stream(AuditAction.values()).map(Enum::toString).toList();
case CLIENT_QUOTAS -> Arrays.stream(ClientQuotaAction.values()).map(Enum::toString).toList();
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ public enum Resource {
CONNECT,
KSQL,
ACL,
AUDIT;
AUDIT,
CLIENT_QUOTAS;

@Nullable
public static Resource fromString(String name) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.provectus.kafka.ui.model.rbac.permission;

import java.util.Set;

public enum ClientQuotaAction implements PermissibleAction {

VIEW,
EDIT

;

public static final Set<ClientQuotaAction> ALTER_ACTIONS = Set.of(EDIT);

@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ public sealed interface PermissibleAction permits
AclAction, ApplicationConfigAction,
ConsumerGroupAction, SchemaAction,
ConnectAction, ClusterConfigAction,
KsqlAction, TopicAction, AuditAction {
KsqlAction, TopicAction, AuditAction, ClientQuotaAction {

String name();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.provectus.kafka.ui.service;

import static com.provectus.kafka.ui.service.ReactiveAdminClient.SupportedFeature.CLIENT_QUOTA_MANAGEMENT;

import com.provectus.kafka.ui.model.ClusterFeature;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription;
Expand Down Expand Up @@ -41,6 +43,7 @@ public Mono<List<ClusterFeature>> getAvailableFeatures(ReactiveAdminClient admin
features.add(topicDeletionEnabled(adminClient));
features.add(aclView(adminClient));
features.add(aclEdit(adminClient, clusterDescription));
features.add(quotaManagement(adminClient));

return Flux.fromIterable(features).flatMap(m -> m).collectList();
}
Expand Down Expand Up @@ -70,4 +73,10 @@ private boolean aclViewEnabled(ReactiveAdminClient adminClient) {
return adminClient.getClusterFeatures().contains(ReactiveAdminClient.SupportedFeature.AUTHORIZED_SECURITY_ENABLED);
}

private Mono<ClusterFeature> quotaManagement(ReactiveAdminClient adminClient) {
return adminClient.getClusterFeatures().contains(CLIENT_QUOTA_MANAGEMENT)
? Mono.just(ClusterFeature.CLIENT_QUOTA_MANAGEMENT)
: Mono.empty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import reactor.core.publisher.Flux;
Expand All @@ -94,7 +97,8 @@ public enum SupportedFeature {
INCREMENTAL_ALTER_CONFIGS(2.3f),
CONFIG_DOCUMENTATION_RETRIEVAL(2.6f),
DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f),
AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled);
AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled),
CLIENT_QUOTA_MANAGEMENT(2.6f);

private final BiFunction<AdminClient, Float, Mono<Boolean>> predicate;

Expand Down Expand Up @@ -658,6 +662,14 @@ public Mono<Void> alterReplicaLogDirs(Map<TopicPartitionReplica, String> replica
return toMono(client.alterReplicaLogDirs(replicaAssignment).all());
}

public Mono<Map<ClientQuotaEntity, Map<String, Double>>> getClientQuotas(ClientQuotaFilter filter) {
return toMono(client.describeClientQuotas(filter).entities());
}

public Mono<Void> alterClientQuota(ClientQuotaAlteration alteration) {
return toMono(client.alterClientQuotas(List.of(alteration)).all());
}

private Mono<Void> incrementalAlterConfig(String topicName,
List<ConfigEntry> currentConfigs,
Map<String, String> newConfigs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ static List<AuditResource> getAccessedResources(AccessContext ctx) {
.forEach(a -> resources.add(create(a, Resource.ACL, null)));
ctx.getAuditAction()
.forEach(a -> resources.add(create(a, Resource.AUDIT, null)));
ctx.getClientQuotaActions()
.forEach(a -> resources.add(create(a, Resource.CLIENT_QUOTAS, null)));
return resources;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.provectus.kafka.ui.service.quota;

import jakarta.annotation.Nullable;
import java.util.Map;
import org.apache.kafka.common.quota.ClientQuotaEntity;

public record ClientQuotaRecord(@Nullable String user,
@Nullable String clientId,
@Nullable String ip,
Map<String, Double> quotas) {

static ClientQuotaRecord create(ClientQuotaEntity entity, Map<String, Double> quotas) {
return new ClientQuotaRecord(
entity.entries().get(ClientQuotaEntity.USER),
entity.entries().get(ClientQuotaEntity.CLIENT_ID),
entity.entries().get(ClientQuotaEntity.IP),
quotas
);
}
}
Loading
Loading