From 3d3845a23bccb0d975b280ecf873d3347135f199 Mon Sep 17 00:00:00 2001 From: iliax Date: Fri, 11 Aug 2023 14:38:44 +0400 Subject: [PATCH 01/10] wip --- documentation/compose/kafka-ui.yaml | 267 ++++++++++-------- .../kafka/ui/KafkaUiApplication.java | 8 +- .../ui/controller/ClientQuotasController.java | 68 +++++ .../kafka/ui/model/ClusterFeature.java | 3 +- .../kafka/ui/service/FeatureService.java | 9 + .../kafka/ui/service/ReactiveAdminClient.java | 14 +- .../ui/service/quota/ClientQuotaRecord.java | 20 ++ .../kafka/ui/service/quota/QuotaService.java | 39 +++ .../main/resources/swagger/kafka-ui-api.yaml | 59 ++++ 9 files changed, 360 insertions(+), 127 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaRecord.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/QuotaService.java diff --git a/documentation/compose/kafka-ui.yaml b/documentation/compose/kafka-ui.yaml index 14a269ca7cb..bf558696859 100644 --- a/documentation/compose/kafka-ui.yaml +++ b/documentation/compose/kafka-ui.yaml @@ -8,146 +8,165 @@ services: ports: - 8080:8080 depends_on: - - kafka0 + - kafka2 - kafka1 - - schemaregistry0 - - schemaregistry1 - - kafka-connect0 environment: KAFKA_CLUSTERS_0_NAME: local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092 + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:9092 KAFKA_CLUSTERS_0_METRICS_PORT: 9997 - KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085 - KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first - KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083 - KAFKA_CLUSTERS_1_NAME: secondLocal - KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka1:29092 - KAFKA_CLUSTERS_1_METRICS_PORT: 9998 - KAFKA_CLUSTERS_1_SCHEMAREGISTRY: http://schemaregistry1:8085 - DYNAMIC_CONFIG_ENABLED: 'true' - kafka0: + kafka1: image: confluentinc/cp-kafka:7.2.1 - hostname: kafka0 - container_name: kafka0 - ports: - - "9092:9092" - - "9997:9997" + container_name: kafka1 environment: - KAFKA_BROKER_ID: 1 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' - KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092' - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_JMX_PORT: 9997 - KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997 - KAFKA_PROCESS_ROLES: 'broker,controller' KAFKA_NODE_ID: 1 - KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka0:29093' - KAFKA_LISTENERS: 'PLAINTEXT://kafka0:29092,CONTROLLER://kafka0:29093,PLAINTEXT_HOST://0.0.0.0:9092' - KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_LISTENERS: PLAINTEXT://kafka1:9092,CONTROLLER://kafka1:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092 KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' - KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093' + KAFKA_PROCESS_ROLES: 'broker,controller' volumes: - - ./scripts/update_run.sh:/tmp/update_run.sh + - ./scripts/update_run_cluster.sh:/tmp/update_run.sh command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'" - kafka1: + kafka2: image: confluentinc/cp-kafka:7.2.1 - hostname: kafka1 - container_name: kafka1 - ports: - - "9093:9092" - - "9998:9998" + container_name: kafka2 environment: - KAFKA_BROKER_ID: 1 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' - KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092' - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_JMX_PORT: 9998 - KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9998 - KAFKA_PROCESS_ROLES: 'broker,controller' - KAFKA_NODE_ID: 1 - KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093' - KAFKA_LISTENERS: 'PLAINTEXT://kafka1:29092,CONTROLLER://kafka1:29093,PLAINTEXT_HOST://0.0.0.0:9092' - KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' + KAFKA_NODE_ID: 2 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_LISTENERS: PLAINTEXT://kafka2:9092,CONTROLLER://kafka2:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092 KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' - KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093' + KAFKA_PROCESS_ROLES: 'broker,controller' volumes: - - ./scripts/update_run.sh:/tmp/update_run.sh + - ./scripts/update_run_cluster.sh:/tmp/update_run.sh command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'" - schemaregistry0: - image: confluentinc/cp-schema-registry:7.2.1 - ports: - - 8085:8085 - depends_on: - - kafka0 - environment: - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092 - SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT - SCHEMA_REGISTRY_HOST_NAME: schemaregistry0 - SCHEMA_REGISTRY_LISTENERS: http://schemaregistry0:8085 - - SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http" - SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO - SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas +# kafka0: +# image: confluentinc/cp-kafka:7.2.1 +# hostname: kafka0 +# container_name: kafka0 +# ports: +# - "9092:9092" +# - "9997:9997" +# environment: +# KAFKA_BROKER_ID: 1 +# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' +# KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092' +# KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 +# KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 +# KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 +# KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 +# KAFKA_JMX_PORT: 9997 +# KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997 +# KAFKA_PROCESS_ROLES: 'broker,controller' +# KAFKA_NODE_ID: 1 +# KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka0:29093' +# KAFKA_LISTENERS: 'PLAINTEXT://kafka0:29092,CONTROLLER://kafka0:29093,PLAINTEXT_HOST://0.0.0.0:9092' +# KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' +# KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' +# KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' +# volumes: +# - ./scripts/update_run.sh:/tmp/update_run.sh +# command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'" +# +# kafka1: +# image: confluentinc/cp-kafka:7.2.1 +# hostname: kafka1 +# container_name: kafka1 +# ports: +# - "9093:9092" +# - "9998:9998" +# environment: +# KAFKA_BROKER_ID: 1 +# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' +# KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092' +# KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 +# KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 +# KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 +# KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 +# KAFKA_JMX_PORT: 9998 +# KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9998 +# KAFKA_PROCESS_ROLES: 'broker,controller' +# KAFKA_NODE_ID: 1 +# KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093' +# KAFKA_LISTENERS: 'PLAINTEXT://kafka1:29092,CONTROLLER://kafka1:29093,PLAINTEXT_HOST://0.0.0.0:9092' +# KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' +# KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' +# KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' +# volumes: +# - ./scripts/update_run.sh:/tmp/update_run.sh +# command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'" - schemaregistry1: - image: confluentinc/cp-schema-registry:7.2.1 - ports: - - 18085:8085 - depends_on: - - kafka1 - environment: - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:29092 - SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT - SCHEMA_REGISTRY_HOST_NAME: schemaregistry1 - SCHEMA_REGISTRY_LISTENERS: http://schemaregistry1:8085 - - SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http" - SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO - SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas - - kafka-connect0: - image: confluentinc/cp-kafka-connect:7.2.1 - ports: - - 8083:8083 - depends_on: - - kafka0 - - schemaregistry0 - environment: - CONNECT_BOOTSTRAP_SERVERS: kafka0:29092 - CONNECT_GROUP_ID: compose-connect-group - CONNECT_CONFIG_STORAGE_TOPIC: _connect_configs - CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 - CONNECT_OFFSET_STORAGE_TOPIC: _connect_offset - CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 - CONNECT_STATUS_STORAGE_TOPIC: _connect_status - CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 - CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter - CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085 - CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085 - CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter - CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter - CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect0 - CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" - - kafka-init-topics: - image: confluentinc/cp-kafka:7.2.1 - volumes: - - ./data/message.json:/data/message.json - depends_on: - - kafka1 - command: "bash -c 'echo Waiting for Kafka to be ready... && \ - cub kafka-ready -b kafka1:29092 1 30 && \ - kafka-topics --create --topic second.users --partitions 3 --replication-factor 1 --if-not-exists --bootstrap-server kafka1:29092 && \ - kafka-topics --create --topic second.messages --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server kafka1:29092 && \ - kafka-topics --create --topic first.messages --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server kafka0:29092 && \ - kafka-console-producer --bootstrap-server kafka1:29092 -topic second.users < /data/message.json'" +# schemaregistry0: +# image: confluentinc/cp-schema-registry:7.2.1 +# ports: +# - 8085:8085 +# depends_on: +# - kafka0 +# environment: +# SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092 +# SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT +# SCHEMA_REGISTRY_HOST_NAME: schemaregistry0 +# SCHEMA_REGISTRY_LISTENERS: http://schemaregistry0:8085 +# +# SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http" +# SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO +# SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas +# +# schemaregistry1: +# image: confluentinc/cp-schema-registry:7.2.1 +# ports: +# - 18085:8085 +# depends_on: +# - kafka1 +# environment: +# SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:29092 +# SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT +# SCHEMA_REGISTRY_HOST_NAME: schemaregistry1 +# SCHEMA_REGISTRY_LISTENERS: http://schemaregistry1:8085 +# +# SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http" +# SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO +# SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas +# +# kafka-connect0: +# image: confluentinc/cp-kafka-connect:7.2.1 +# ports: +# - 8083:8083 +# depends_on: +# - kafka0 +# - schemaregistry0 +# environment: +# CONNECT_BOOTSTRAP_SERVERS: kafka0:29092 +# CONNECT_GROUP_ID: compose-connect-group +# CONNECT_CONFIG_STORAGE_TOPIC: _connect_configs +# CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 +# CONNECT_OFFSET_STORAGE_TOPIC: _connect_offset +# CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 +# CONNECT_STATUS_STORAGE_TOPIC: _connect_status +# CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 +# CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter +# CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085 +# CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter +# CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085 +# CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter +# CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter +# CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect0 +# CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" +# +# kafka-init-topics: +# image: confluentinc/cp-kafka:7.2.1 +# volumes: +# - ./data/message.json:/data/message.json +# depends_on: +# - kafka1 +# command: "bash -c 'echo Waiting for Kafka to be ready... && \ +# cub kafka-ready -b kafka1:29092 1 30 && \ +# kafka-topics --create --topic second.users --partitions 3 --replication-factor 1 --if-not-exists --bootstrap-server kafka1:29092 && \ +# kafka-topics --create --topic second.messages --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server kafka1:29092 && \ +# kafka-topics --create --topic first.messages --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server kafka0:29092 && \ +# kafka-console-producer --bootstrap-server kafka1:29092 -topic second.users < /data/message.json'" diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/KafkaUiApplication.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/KafkaUiApplication.java index 8d0eafeff39..a7f0db04371 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/KafkaUiApplication.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/KafkaUiApplication.java @@ -1,6 +1,9 @@ package com.provectus.kafka.ui; import com.provectus.kafka.ui.util.DynamicConfigOperations; +import java.util.Map; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.ldap.LdapAutoConfiguration; import org.springframework.boot.builder.SpringApplicationBuilder; @@ -14,7 +17,10 @@ public class KafkaUiApplication { public static void main(String[] args) { - startApplication(args); + AdminClient ac = AdminClient.create(Map.of( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" + )); + System.out.println(ac); } public static ConfigurableApplicationContext startApplication(String[] args) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java new file mode 100644 index 00000000000..709e82bf94c --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java @@ -0,0 +1,68 @@ +package com.provectus.kafka.ui.controller; + +import static java.util.stream.Collectors.toMap; + +import com.provectus.kafka.ui.api.ClientQuotasApi; +import com.provectus.kafka.ui.model.ClientQuotasDTO; +import com.provectus.kafka.ui.service.audit.AuditService; +import com.provectus.kafka.ui.service.quota.ClientQuotaRecord; +import com.provectus.kafka.ui.service.quota.QuotaService; +import com.provectus.kafka.ui.service.rbac.AccessControlService; +import java.math.BigDecimal; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@RestController +@RequiredArgsConstructor +public class ClientQuotasController extends AbstractController implements ClientQuotasApi { + + private final QuotaService quotaService; + private final AccessControlService accessControlService; + private final AuditService auditService; + + @Override + public Mono>> listQuotas(String clusterName, + ServerWebExchange exchange) { + return Mono.just(quotaService.all(getCluster(clusterName)).map(this::map)) + .map(ResponseEntity::ok); + } + + @Override + public Mono> upsertClientQuotas(String clusterName, + Mono clientQuotasDTO, + ServerWebExchange exchange) { + + return clientQuotasDTO.flatMap( + quotas -> + quotaService.upsert( + getCluster(clusterName), + quotas.getUser(), + quotas.getClientId(), + quotas.getIp(), + Optional.ofNullable(quotas.getQuotas()).orElse(Map.of()) + .entrySet() + .stream() + .collect(toMap(Map.Entry::getKey, e -> e.getValue().doubleValue())) + ) + ).map(statusCode -> ResponseEntity.status(statusCode).build()); + } + + private ClientQuotasDTO map(ClientQuotaRecord quotaRecord) { + return new ClientQuotasDTO() + .user(quotaRecord.user()) + .clientId(quotaRecord.clientId()) + .ip(quotaRecord.ip()) + .quotas( + quotaRecord.quotas().entrySet().stream() + .collect(toMap(Map.Entry::getKey, e -> BigDecimal.valueOf(e.getValue()))) + ); + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java index 2973e5500d9..fec21b40dbf 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java @@ -6,5 +6,6 @@ public enum ClusterFeature { SCHEMA_REGISTRY, TOPIC_DELETION, KAFKA_ACL_VIEW, - KAFKA_ACL_EDIT + KAFKA_ACL_EDIT, + CLIENT_QUOTA_MANAGEMENT } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java index b08691aef5a..f51054b5c58 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java @@ -1,5 +1,7 @@ package com.provectus.kafka.ui.service; +import static com.provectus.kafka.ui.service.ReactiveAdminClient.SupportedFeature.CLIENT_QUOTA_MANAGEMENT; + import com.provectus.kafka.ui.model.ClusterFeature; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription; @@ -41,6 +43,7 @@ public Mono> getAvailableFeatures(ReactiveAdminClient admin features.add(topicDeletionEnabled(adminClient)); features.add(aclView(adminClient)); features.add(aclEdit(adminClient, clusterDescription)); + features.add(quotaManagement(adminClient)); return Flux.fromIterable(features).flatMap(m -> m).collectList(); } @@ -70,4 +73,10 @@ private boolean aclViewEnabled(ReactiveAdminClient adminClient) { return adminClient.getClusterFeatures().contains(ReactiveAdminClient.SupportedFeature.AUTHORIZED_SECURITY_ENABLED); } + private Mono quotaManagement(ReactiveAdminClient adminClient) { + return adminClient.getClusterFeatures().contains(CLIENT_QUOTA_MANAGEMENT) + ? Mono.just(ClusterFeature.CLIENT_QUOTA_MANAGEMENT) + : Mono.empty(); + } + } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index 9de908efa7f..70c1e3ff1cd 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -77,6 +77,9 @@ import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.quota.ClientQuotaAlteration; +import org.apache.kafka.common.quota.ClientQuotaEntity; +import org.apache.kafka.common.quota.ClientQuotaFilter; import org.apache.kafka.common.requests.DescribeLogDirsResponse; import org.apache.kafka.common.resource.ResourcePatternFilter; import reactor.core.publisher.Flux; @@ -94,7 +97,8 @@ public enum SupportedFeature { INCREMENTAL_ALTER_CONFIGS(2.3f), CONFIG_DOCUMENTATION_RETRIEVAL(2.6f), DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f), - AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled); + AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled), + CLIENT_QUOTA_MANAGEMENT(2.6f); private final BiFunction> predicate; @@ -658,6 +662,14 @@ public Mono alterReplicaLogDirs(Map replica return toMono(client.alterReplicaLogDirs(replicaAssignment).all()); } + public Mono>> getClientQuotas(ClientQuotaFilter filter) { + return toMono(client.describeClientQuotas(filter).entities()); + } + + public Mono alterClientQuota(ClientQuotaAlteration alteration) { + return toMono(client.alterClientQuotas(List.of(alteration)).all()); + } + private Mono incrementalAlterConfig(String topicName, List currentConfigs, Map newConfigs) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaRecord.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaRecord.java new file mode 100644 index 00000000000..8dc0e49e453 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaRecord.java @@ -0,0 +1,20 @@ +package com.provectus.kafka.ui.service.quota; + +import jakarta.annotation.Nullable; +import java.util.Map; +import org.apache.kafka.common.quota.ClientQuotaEntity; + +public record ClientQuotaRecord(@Nullable String user, + @Nullable String clientId, + @Nullable String ip, + Map quotas) { + + static ClientQuotaRecord create(ClientQuotaEntity entity, Map qoutas) { + return new ClientQuotaRecord( + entity.entries().get(ClientQuotaEntity.USER), + entity.entries().get(ClientQuotaEntity.CLIENT_ID), + entity.entries().get(ClientQuotaEntity.IP), + qoutas + ); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/QuotaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/QuotaService.java new file mode 100644 index 00000000000..3ff8ac5f691 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/QuotaService.java @@ -0,0 +1,39 @@ +package com.provectus.kafka.ui.service.quota; + +import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.service.AdminClientService; +import jakarta.annotation.Nullable; +import java.math.BigDecimal; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.common.quota.ClientQuotaEntity; +import org.apache.kafka.common.quota.ClientQuotaFilter; +import org.springframework.http.HttpStatusCode; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@Service +@RequiredArgsConstructor +public class QuotaService { + + private final AdminClientService adminClientService; + + public Flux all(KafkaCluster cluster) { + return adminClientService.get(cluster) + .flatMap(ac -> ac.getClientQuotas(ClientQuotaFilter.all())) + .flatMapIterable(map -> + map.entrySet().stream().map(e -> ClientQuotaRecord.create(e.getKey(), e.getValue())).toList()); + } + + //returns 201 is new entity was created, 204 if exsiting was updated + public Mono upsert(KafkaCluster cluster, + @Nullable String user, + @Nullable String clientId, + @Nullable String ip, + Map quotas) { + + } +} diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 9484948b67e..8f68bb049f0 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1910,6 +1910,50 @@ paths: 200: description: OK + /api/clusters/{clusterName}/clientquotas: + get: + tags: + - ClientQuotas + summary: listQuotas + operationId: listQuotas + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/ClientQuotas' + post: + tags: + - ClientQuotas + summary: upsertClientQuotas + operationId: upsertClientQuotas + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/ClientQuotas' + responses: + 201: + description: Quota created + 204: + description: Existing quota updated + + /api/clusters/{clusterName}/acl/streamApp: post: tags: @@ -2175,6 +2219,7 @@ components: - TOPIC_DELETION - KAFKA_ACL_VIEW # get ACLs listing - KAFKA_ACL_EDIT # create & delete ACLs + - CLIENT_QUOTA_MANAGEMENT required: - id - name @@ -3701,6 +3746,20 @@ components: nullable: false type: string + ClientQuotas: + type: object + properties: + user: + type: string + clientId: + type: string + ip: + type: string + quotas: + type: object + additionalProperties: + type: number + KafkaAclResourceType: type: string enum: From 1586f75d9ff23a8f00886c82aff7f05ccc0878ac Mon Sep 17 00:00:00 2001 From: iliax Date: Fri, 11 Aug 2023 16:49:59 +0400 Subject: [PATCH 02/10] wip --- .../ui/controller/ClientQuotasController.java | 20 ++++-- .../kafka/ui/service/quota/QuotaService.java | 69 +++++++++++++++++-- .../main/resources/swagger/kafka-ui-api.yaml | 4 +- 3 files changed, 80 insertions(+), 13 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java index 709e82bf94c..acb97496ed9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java @@ -9,9 +9,9 @@ import com.provectus.kafka.ui.service.quota.QuotaService; import com.provectus.kafka.ui.service.rbac.AccessControlService; import java.math.BigDecimal; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RestController; @@ -30,16 +30,15 @@ public class ClientQuotasController extends AbstractController implements Client @Override public Mono>> listQuotas(String clusterName, ServerWebExchange exchange) { - return Mono.just(quotaService.all(getCluster(clusterName)).map(this::map)) + return Mono.just(quotaService.list(getCluster(clusterName)).map(this::map)) .map(ResponseEntity::ok); } @Override public Mono> upsertClientQuotas(String clusterName, - Mono clientQuotasDTO, + Mono clientQuotasDto, ServerWebExchange exchange) { - - return clientQuotasDTO.flatMap( + return clientQuotasDto.flatMap( quotas -> quotaService.upsert( getCluster(clusterName), @@ -60,8 +59,15 @@ private ClientQuotasDTO map(ClientQuotaRecord quotaRecord) { .clientId(quotaRecord.clientId()) .ip(quotaRecord.ip()) .quotas( - quotaRecord.quotas().entrySet().stream() - .collect(toMap(Map.Entry::getKey, e -> BigDecimal.valueOf(e.getValue()))) + quotaRecord.quotas().entrySet() + .stream() + .sorted(Map.Entry.comparingByKey()) + .collect(toMap( + Map.Entry::getKey, + e -> BigDecimal.valueOf(e.getValue()), + (v1, v2) -> null, //won't be called + LinkedHashMap::new //to keep order + )) ); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/QuotaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/QuotaService.java index 3ff8ac5f691..2d20ca48a18 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/QuotaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/QuotaService.java @@ -1,15 +1,29 @@ package com.provectus.kafka.ui.service.quota; +import static org.apache.kafka.common.quota.ClientQuotaEntity.CLIENT_ID; +import static org.apache.kafka.common.quota.ClientQuotaEntity.IP; +import static org.apache.kafka.common.quota.ClientQuotaEntity.USER; + +import com.google.common.collect.Sets; +import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.service.AdminClientService; +import com.provectus.kafka.ui.service.ReactiveAdminClient; import jakarta.annotation.Nullable; -import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.Set; +import java.util.stream.Stream; import lombok.RequiredArgsConstructor; +import org.apache.kafka.common.quota.ClientQuotaAlteration; import org.apache.kafka.common.quota.ClientQuotaEntity; import org.apache.kafka.common.quota.ClientQuotaFilter; +import org.apache.kafka.common.quota.ClientQuotaFilterComponent; +import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatusCode; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; @@ -21,19 +35,64 @@ public class QuotaService { private final AdminClientService adminClientService; - public Flux all(KafkaCluster cluster) { + public Flux list(KafkaCluster cluster) { return adminClientService.get(cluster) .flatMap(ac -> ac.getClientQuotas(ClientQuotaFilter.all())) .flatMapIterable(map -> map.entrySet().stream().map(e -> ClientQuotaRecord.create(e.getKey(), e.getValue())).toList()); } - //returns 201 is new entity was created, 204 if exsiting was updated - public Mono upsert(KafkaCluster cluster, + //returns 201 is new entity was created, 200 if existing was updated, 204 if it was deleted + public Mono upsert(KafkaCluster cluster, @Nullable String user, @Nullable String clientId, @Nullable String ip, - Map quotas) { + Map newQuotas) { + ClientQuotaEntity quotaEntity = quotaEntity(user, clientId, ip); + return adminClientService.get(cluster) + .flatMap(ac -> + findQuotas(ac, quotaEntity) + .flatMap(currentQuotas -> { + Set quotasToClear = Sets.difference(currentQuotas.keySet(), newQuotas.keySet()); + List ops = Stream.concat( + quotasToClear.stream() + //setting null value to clear current state + .map(name -> new ClientQuotaAlteration.Op(name, null)), + newQuotas.entrySet().stream() + .map(e -> new ClientQuotaAlteration.Op(e.getKey(), e.getValue())) + ).toList(); + + HttpStatus result = HttpStatus.OK; //updated + if (newQuotas.isEmpty()) { + result = HttpStatus.NO_CONTENT; //deleted + } else if (currentQuotas.isEmpty()) { + result = HttpStatus.CREATED; + } + return ac.alterClientQuota(new ClientQuotaAlteration(quotaEntity, ops)) + .thenReturn(result); + }) + ); + } + + private ClientQuotaEntity quotaEntity(@Nullable String user, @Nullable String clientId, @Nullable String ip) { + if (Stream.of(user, clientId, ip).allMatch(Objects::isNull)) { + throw new ValidationException("Quota entity id is not set"); + } + var id = new HashMap(); + Optional.ofNullable(user).ifPresent(u -> id.put(USER, u)); + Optional.ofNullable(clientId).ifPresent(cid -> id.put(CLIENT_ID, cid)); + Optional.ofNullable(ip).ifPresent(i -> id.put(IP, i)); + return new ClientQuotaEntity(id); + } + + private Mono> findQuotas(ReactiveAdminClient ac, ClientQuotaEntity quotaEntity) { + return ac.getClientQuotas(searchFilter(quotaEntity)) + .map(foundRecords -> Optional.ofNullable(foundRecords.get(quotaEntity)).orElse(Map.of())); + } + private ClientQuotaFilter searchFilter(ClientQuotaEntity quotaEntity) { + List filters = new ArrayList<>(); + quotaEntity.entries().forEach((type, name) -> filters.add(ClientQuotaFilterComponent.ofEntity(type, name))); + return ClientQuotaFilter.contains(filters); } } diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 8f68bb049f0..09bbb7fc7e0 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1948,10 +1948,12 @@ paths: schema: $ref: '#/components/schemas/ClientQuotas' responses: + 200: + description: Existing quota updated 201: description: Quota created 204: - description: Existing quota updated + description: Existing quota deleted /api/clusters/{clusterName}/acl/streamApp: From bf06c34f78021a9652223776d2f86411a87885f8 Mon Sep 17 00:00:00 2001 From: iliax Date: Fri, 11 Aug 2023 17:55:00 +0400 Subject: [PATCH 03/10] wip --- .../ui/controller/ClientQuotasController.java | 53 +++++++++++++------ .../kafka/ui/model/rbac/AccessContext.java | 12 ++++- .../kafka/ui/model/rbac/Permission.java | 5 +- .../kafka/ui/model/rbac/Resource.java | 3 +- .../rbac/permission/ClientQuotaAction.java | 18 +++++++ .../rbac/permission/PermissibleAction.java | 2 +- .../ui/service/quota/ClientQuotaRecord.java | 10 +++- ...taService.java => ClientQuotaService.java} | 48 +++++++++-------- .../ui/service/rbac/AccessControlService.java | 20 ++++++- .../service/quota/ClientQuotaServiceTest.java | 11 ++++ .../main/resources/swagger/kafka-ui-api.yaml | 5 +- 11 files changed, 141 insertions(+), 46 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ClientQuotaAction.java rename kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/{QuotaService.java => ClientQuotaService.java} (65%) create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java index acb97496ed9..59fc5cdcc2c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java @@ -4,10 +4,10 @@ import com.provectus.kafka.ui.api.ClientQuotasApi; import com.provectus.kafka.ui.model.ClientQuotasDTO; -import com.provectus.kafka.ui.service.audit.AuditService; +import com.provectus.kafka.ui.model.rbac.AccessContext; +import com.provectus.kafka.ui.model.rbac.permission.ClientQuotaAction; import com.provectus.kafka.ui.service.quota.ClientQuotaRecord; -import com.provectus.kafka.ui.service.quota.QuotaService; -import com.provectus.kafka.ui.service.rbac.AccessControlService; +import com.provectus.kafka.ui.service.quota.ClientQuotaService; import java.math.BigDecimal; import java.util.LinkedHashMap; import java.util.Map; @@ -23,37 +23,56 @@ @RequiredArgsConstructor public class ClientQuotasController extends AbstractController implements ClientQuotasApi { - private final QuotaService quotaService; - private final AccessControlService accessControlService; - private final AuditService auditService; + private final ClientQuotaService clientQuotaService; @Override public Mono>> listQuotas(String clusterName, ServerWebExchange exchange) { - return Mono.just(quotaService.list(getCluster(clusterName)).map(this::map)) - .map(ResponseEntity::ok); + var context = AccessContext.builder() + .cluster(clusterName) + .operationName("listClientQuotas") + .clientQuotaActions(ClientQuotaAction.VIEW) + .build(); + + Mono>> operation = + Mono.just(clientQuotaService.list(getCluster(clusterName)).map(this::mapToDto)) + .map(ResponseEntity::ok); + + return validateAccess(context) + .then(operation) + .doOnEach(sig -> audit(context, sig)); } @Override public Mono> upsertClientQuotas(String clusterName, - Mono clientQuotasDto, + Mono quotasDto, ServerWebExchange exchange) { - return clientQuotasDto.flatMap( - quotas -> - quotaService.upsert( + var context = AccessContext.builder() + .cluster(clusterName) + .operationName("upsertClientQuotas") + .clientQuotaActions(ClientQuotaAction.EDIT) + .build(); + + Mono> operation = quotasDto.flatMap( + newQuotas -> + clientQuotaService.upsert( getCluster(clusterName), - quotas.getUser(), - quotas.getClientId(), - quotas.getIp(), - Optional.ofNullable(quotas.getQuotas()).orElse(Map.of()) + newQuotas.getUser(), + newQuotas.getClientId(), + newQuotas.getIp(), + Optional.ofNullable(newQuotas.getQuotas()).orElse(Map.of()) .entrySet() .stream() .collect(toMap(Map.Entry::getKey, e -> e.getValue().doubleValue())) ) ).map(statusCode -> ResponseEntity.status(statusCode).build()); + + return validateAccess(context) + .then(operation) + .doOnEach(sig -> audit(context, sig)); } - private ClientQuotasDTO map(ClientQuotaRecord quotaRecord) { + private ClientQuotasDTO mapToDto(ClientQuotaRecord quotaRecord) { return new ClientQuotasDTO() .user(quotaRecord.user()) .clientId(quotaRecord.clientId()) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java index cf126bf3dfe..75560fddb4b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java @@ -3,6 +3,7 @@ import com.provectus.kafka.ui.model.rbac.permission.AclAction; import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction; import com.provectus.kafka.ui.model.rbac.permission.AuditAction; +import com.provectus.kafka.ui.model.rbac.permission.ClientQuotaAction; import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction; import com.provectus.kafka.ui.model.rbac.permission.ConnectAction; import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction; @@ -44,6 +45,8 @@ public class AccessContext { Collection auditAction; + Collection clientQuotaActions; + String operationName; Object operationParams; @@ -67,6 +70,7 @@ public static final class AccessContextBuilder { private Collection ksqlActions = Collections.emptySet(); private Collection aclActions = Collections.emptySet(); private Collection auditActions = Collections.emptySet(); + private Collection clientQuotaActions = Collections.emptySet(); private String operationName; private Object operationParams; @@ -158,6 +162,12 @@ public AccessContextBuilder auditActions(AuditAction... actions) { return this; } + public AccessContextBuilder clientQuotaActions(ClientQuotaAction... actions) { + Assert.isTrue(actions.length > 0, "actions not present"); + this.clientQuotaActions = List.of(actions); + return this; + } + public AccessContextBuilder operationName(String operationName) { this.operationName = operationName; return this; @@ -182,7 +192,7 @@ public AccessContext build() { connect, connectActions, connector, schema, schemaActions, - ksqlActions, aclActions, auditActions, + ksqlActions, aclActions, auditActions, clientQuotaActions, operationName, operationParams); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java index 56b0a098026..95810cbeda0 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java @@ -3,12 +3,14 @@ import static com.provectus.kafka.ui.model.rbac.Resource.ACL; import static com.provectus.kafka.ui.model.rbac.Resource.APPLICATIONCONFIG; import static com.provectus.kafka.ui.model.rbac.Resource.AUDIT; +import static com.provectus.kafka.ui.model.rbac.Resource.CLIENT_QUOTAS; import static com.provectus.kafka.ui.model.rbac.Resource.CLUSTERCONFIG; import static com.provectus.kafka.ui.model.rbac.Resource.KSQL; import com.provectus.kafka.ui.model.rbac.permission.AclAction; import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction; import com.provectus.kafka.ui.model.rbac.permission.AuditAction; +import com.provectus.kafka.ui.model.rbac.permission.ClientQuotaAction; import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction; import com.provectus.kafka.ui.model.rbac.permission.ConnectAction; import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction; @@ -32,7 +34,7 @@ public class Permission { private static final List RBAC_ACTION_EXEMPT_LIST = - List.of(KSQL, CLUSTERCONFIG, APPLICATIONCONFIG, ACL, AUDIT); + List.of(KSQL, CLUSTERCONFIG, APPLICATIONCONFIG, ACL, AUDIT, CLIENT_QUOTAS); Resource resource; List actions; @@ -88,6 +90,7 @@ private List getAllActionValues() { case KSQL -> Arrays.stream(KsqlAction.values()).map(Enum::toString).toList(); case ACL -> Arrays.stream(AclAction.values()).map(Enum::toString).toList(); case AUDIT -> Arrays.stream(AuditAction.values()).map(Enum::toString).toList(); + case CLIENT_QUOTAS -> Arrays.stream(ClientQuotaAction.values()).map(Enum::toString).toList(); }; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java index ca2efab3a9a..f724c4326f1 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java @@ -13,7 +13,8 @@ public enum Resource { CONNECT, KSQL, ACL, - AUDIT; + AUDIT, + CLIENT_QUOTAS; @Nullable public static Resource fromString(String name) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ClientQuotaAction.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ClientQuotaAction.java new file mode 100644 index 00000000000..1b451f7c7c9 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ClientQuotaAction.java @@ -0,0 +1,18 @@ +package com.provectus.kafka.ui.model.rbac.permission; + +import java.util.Set; + +public enum ClientQuotaAction implements PermissibleAction { + + VIEW, + EDIT + + ; + + public static final Set ALTER_ACTIONS = Set.of(EDIT); + + public boolean isAlter() { + return ALTER_ACTIONS.contains(this); + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/PermissibleAction.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/PermissibleAction.java index 5de78a15178..1f8952d57a7 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/PermissibleAction.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/PermissibleAction.java @@ -4,5 +4,5 @@ public sealed interface PermissibleAction permits AclAction, ApplicationConfigAction, ConsumerGroupAction, SchemaAction, ConnectAction, ClusterConfigAction, - KsqlAction, TopicAction, AuditAction { + KsqlAction, TopicAction, AuditAction, ClientQuotaAction { } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaRecord.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaRecord.java index 8dc0e49e453..820ec56043b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaRecord.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaRecord.java @@ -1,6 +1,7 @@ package com.provectus.kafka.ui.service.quota; import jakarta.annotation.Nullable; +import java.util.Comparator; import java.util.Map; import org.apache.kafka.common.quota.ClientQuotaEntity; @@ -9,12 +10,17 @@ public record ClientQuotaRecord(@Nullable String user, @Nullable String ip, Map quotas) { - static ClientQuotaRecord create(ClientQuotaEntity entity, Map qoutas) { + static final Comparator COMPARATOR = + Comparator.comparing(r -> r.user) + .thenComparing(r -> r.clientId) + .thenComparing(r -> r.ip); + + static ClientQuotaRecord create(ClientQuotaEntity entity, Map quotas) { return new ClientQuotaRecord( entity.entries().get(ClientQuotaEntity.USER), entity.entries().get(ClientQuotaEntity.CLIENT_ID), entity.entries().get(ClientQuotaEntity.IP), - qoutas + quotas ); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/QuotaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java similarity index 65% rename from kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/QuotaService.java rename to kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java index 2d20ca48a18..f132cf17ead 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/QuotaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java @@ -24,14 +24,13 @@ import org.apache.kafka.common.quota.ClientQuotaFilter; import org.apache.kafka.common.quota.ClientQuotaFilterComponent; import org.springframework.http.HttpStatus; -import org.springframework.http.HttpStatusCode; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @Service @RequiredArgsConstructor -public class QuotaService { +public class ClientQuotaService { private final AdminClientService adminClientService; @@ -39,36 +38,29 @@ public Flux list(KafkaCluster cluster) { return adminClientService.get(cluster) .flatMap(ac -> ac.getClientQuotas(ClientQuotaFilter.all())) .flatMapIterable(map -> - map.entrySet().stream().map(e -> ClientQuotaRecord.create(e.getKey(), e.getValue())).toList()); + map.entrySet().stream().map(e -> ClientQuotaRecord.create(e.getKey(), e.getValue())).toList()) + .sort(ClientQuotaRecord.COMPARATOR); } - //returns 201 is new entity was created, 200 if existing was updated, 204 if it was deleted + //returns 201 if new entity was created, 200 if existing was updated, 204 if existing was deleted public Mono upsert(KafkaCluster cluster, - @Nullable String user, - @Nullable String clientId, - @Nullable String ip, - Map newQuotas) { + @Nullable String user, + @Nullable String clientId, + @Nullable String ip, + Map newQuotas) { ClientQuotaEntity quotaEntity = quotaEntity(user, clientId, ip); return adminClientService.get(cluster) .flatMap(ac -> findQuotas(ac, quotaEntity) .flatMap(currentQuotas -> { - Set quotasToClear = Sets.difference(currentQuotas.keySet(), newQuotas.keySet()); - List ops = Stream.concat( - quotasToClear.stream() - //setting null value to clear current state - .map(name -> new ClientQuotaAlteration.Op(name, null)), - newQuotas.entrySet().stream() - .map(e -> new ClientQuotaAlteration.Op(e.getKey(), e.getValue())) - ).toList(); - HttpStatus result = HttpStatus.OK; //updated if (newQuotas.isEmpty()) { result = HttpStatus.NO_CONTENT; //deleted } else if (currentQuotas.isEmpty()) { result = HttpStatus.CREATED; } - return ac.alterClientQuota(new ClientQuotaAlteration(quotaEntity, ops)) + var alteration = createAlteration(quotaEntity, currentQuotas, newQuotas); + return ac.alterClientQuota(alteration) .thenReturn(result); }) ); @@ -85,12 +77,26 @@ private ClientQuotaEntity quotaEntity(@Nullable String user, @Nullable String cl return new ClientQuotaEntity(id); } + private ClientQuotaAlteration createAlteration(ClientQuotaEntity quotaEntity, + Map currentQuotas, + Map newQuotas) { + Set quotasToClear = Sets.difference(currentQuotas.keySet(), newQuotas.keySet()); + List ops = Stream.concat( + quotasToClear.stream() + .map(name -> new ClientQuotaAlteration.Op(name, null)), //setting null value to clear current state + newQuotas.entrySet().stream() + .map(e -> new ClientQuotaAlteration.Op(e.getKey(), e.getValue())) + ).toList(); + return new ClientQuotaAlteration(quotaEntity, ops); + } + + // returns empty map if no quotas found for an entity private Mono> findQuotas(ReactiveAdminClient ac, ClientQuotaEntity quotaEntity) { - return ac.getClientQuotas(searchFilter(quotaEntity)) - .map(foundRecords -> Optional.ofNullable(foundRecords.get(quotaEntity)).orElse(Map.of())); + return ac.getClientQuotas(crateSearchFilter(quotaEntity)) + .map(found -> Optional.ofNullable(found.get(quotaEntity)).orElse(Map.of())); } - private ClientQuotaFilter searchFilter(ClientQuotaEntity quotaEntity) { + private ClientQuotaFilter crateSearchFilter(ClientQuotaEntity quotaEntity) { List filters = new ArrayList<>(); quotaEntity.entries().forEach((type, name) -> filters.add(ClientQuotaFilterComponent.ofEntity(type, name))); return ClientQuotaFilter.contains(filters); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java index 59ea02fea84..69a851ded6d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java @@ -123,7 +123,8 @@ && isConnectorAccessible(context, user) // TODO connector selectors && isSchemaAccessible(context, user) && isKsqlAccessible(context, user) && isAclAccessible(context, user) - && isAuditAccessible(context, user); + && isAuditAccessible(context, user) + && isClientQuotaAccessible(context, user); if (!accessGranted) { throw new AccessDeniedException(ACCESS_DENIED); @@ -417,6 +418,23 @@ private boolean isAuditAccessible(AccessContext context, AuthenticatedUser user) return isAccessible(Resource.AUDIT, null, user, context, requiredActions); } + private boolean isClientQuotaAccessible(AccessContext context, AuthenticatedUser user) { + if (!rbacEnabled) { + return true; + } + + if (context.getClientQuotaActions().isEmpty()) { + return true; + } + + Set requiredActions = context.getClientQuotaActions() + .stream() + .map(a -> a.toString().toUpperCase()) + .collect(Collectors.toSet()); + + return isAccessible(Resource.CLIENT_QUOTAS, null, user, context, requiredActions); + } + public Set getOauthExtractors() { return oauthExtractors; } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java new file mode 100644 index 00000000000..1ca42f9de86 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java @@ -0,0 +1,11 @@ +package com.provectus.kafka.ui.service.quota; + +import com.provectus.kafka.ui.AbstractIntegrationTest; +import org.springframework.beans.factory.annotation.Autowired; + +class ClientQuotaServiceTest extends AbstractIntegrationTest { + + @Autowired + ClientQuotaService quotaService; + +} diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 09bbb7fc7e0..57a0550616a 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1936,6 +1936,9 @@ paths: - ClientQuotas summary: upsertClientQuotas operationId: upsertClientQuotas + description: | + - updates/creates client quota record if `quotas` field is non-empty + - deletes client quota record if `quotas` field is null or empty parameters: - name: clusterName in: path @@ -1951,7 +1954,7 @@ paths: 200: description: Existing quota updated 201: - description: Quota created + description: New quota created 204: description: Existing quota deleted From 551357207e364e9afb233abe32430dd0f53ff7ae Mon Sep 17 00:00:00 2001 From: iliax Date: Fri, 11 Aug 2023 17:56:52 +0400 Subject: [PATCH 04/10] kafka-ui.yaml reverted --- documentation/compose/kafka-ui.yaml | 267 +++++++++++++--------------- 1 file changed, 124 insertions(+), 143 deletions(-) diff --git a/documentation/compose/kafka-ui.yaml b/documentation/compose/kafka-ui.yaml index bf558696859..ddaa9867cab 100644 --- a/documentation/compose/kafka-ui.yaml +++ b/documentation/compose/kafka-ui.yaml @@ -8,165 +8,146 @@ services: ports: - 8080:8080 depends_on: - - kafka2 + - kafka0 - kafka1 + - schemaregistry0 + - schemaregistry1 + - kafka-connect0 environment: KAFKA_CLUSTERS_0_NAME: local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:9092 + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092 KAFKA_CLUSTERS_0_METRICS_PORT: 9997 + KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085 + KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first + KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083 + KAFKA_CLUSTERS_1_NAME: secondLocal + KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka1:29092 + KAFKA_CLUSTERS_1_METRICS_PORT: 9998 + KAFKA_CLUSTERS_1_SCHEMAREGISTRY: http://schemaregistry1:8085 + DYNAMIC_CONFIG_ENABLED: 'true' - kafka1: + kafka0: image: confluentinc/cp-kafka:7.2.1 - container_name: kafka1 + hostname: kafka0 + container_name: kafka0 + ports: + - "9092:9092" + - "9997:9997" environment: + KAFKA_BROKER_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092' + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_JMX_PORT: 9997 + KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997 + KAFKA_PROCESS_ROLES: 'broker,controller' KAFKA_NODE_ID: 1 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_LISTENERS: PLAINTEXT://kafka1:9092,CONTROLLER://kafka1:9093 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092 + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka0:29093' + KAFKA_LISTENERS: 'PLAINTEXT://kafka0:29092,CONTROLLER://kafka0:29093,PLAINTEXT_HOST://0.0.0.0:9092' + KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' - KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093' - KAFKA_PROCESS_ROLES: 'broker,controller' + KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' volumes: - - ./scripts/update_run_cluster.sh:/tmp/update_run.sh + - ./scripts/update_run.sh:/tmp/update_run.sh command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'" - kafka2: + kafka1: image: confluentinc/cp-kafka:7.2.1 - container_name: kafka2 + hostname: kafka1 + container_name: kafka1 + ports: + - "9093:9092" + - "9998:9998" environment: - KAFKA_NODE_ID: 2 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_LISTENERS: PLAINTEXT://kafka2:9092,CONTROLLER://kafka2:9093 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092 - KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' - KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093' + KAFKA_BROKER_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092' + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_JMX_PORT: 9998 + KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9998 KAFKA_PROCESS_ROLES: 'broker,controller' + KAFKA_NODE_ID: 1 + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093' + KAFKA_LISTENERS: 'PLAINTEXT://kafka1:29092,CONTROLLER://kafka1:29093,PLAINTEXT_HOST://0.0.0.0:9092' + KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' volumes: - - ./scripts/update_run_cluster.sh:/tmp/update_run.sh + - ./scripts/update_run.sh:/tmp/update_run.sh command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'" -# kafka0: -# image: confluentinc/cp-kafka:7.2.1 -# hostname: kafka0 -# container_name: kafka0 -# ports: -# - "9092:9092" -# - "9997:9997" -# environment: -# KAFKA_BROKER_ID: 1 -# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' -# KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092' -# KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 -# KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 -# KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 -# KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 -# KAFKA_JMX_PORT: 9997 -# KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997 -# KAFKA_PROCESS_ROLES: 'broker,controller' -# KAFKA_NODE_ID: 1 -# KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka0:29093' -# KAFKA_LISTENERS: 'PLAINTEXT://kafka0:29092,CONTROLLER://kafka0:29093,PLAINTEXT_HOST://0.0.0.0:9092' -# KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' -# KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' -# KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' -# volumes: -# - ./scripts/update_run.sh:/tmp/update_run.sh -# command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'" -# -# kafka1: -# image: confluentinc/cp-kafka:7.2.1 -# hostname: kafka1 -# container_name: kafka1 -# ports: -# - "9093:9092" -# - "9998:9998" -# environment: -# KAFKA_BROKER_ID: 1 -# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' -# KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092' -# KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 -# KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 -# KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 -# KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 -# KAFKA_JMX_PORT: 9998 -# KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9998 -# KAFKA_PROCESS_ROLES: 'broker,controller' -# KAFKA_NODE_ID: 1 -# KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093' -# KAFKA_LISTENERS: 'PLAINTEXT://kafka1:29092,CONTROLLER://kafka1:29093,PLAINTEXT_HOST://0.0.0.0:9092' -# KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' -# KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' -# KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' -# volumes: -# - ./scripts/update_run.sh:/tmp/update_run.sh -# command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'" + schemaregistry0: + image: confluentinc/cp-schema-registry:7.2.1 + ports: + - 8085:8085 + depends_on: + - kafka0 + environment: + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092 + SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT + SCHEMA_REGISTRY_HOST_NAME: schemaregistry0 + SCHEMA_REGISTRY_LISTENERS: http://schemaregistry0:8085 + + SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http" + SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO + SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas -# schemaregistry0: -# image: confluentinc/cp-schema-registry:7.2.1 -# ports: -# - 8085:8085 -# depends_on: -# - kafka0 -# environment: -# SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092 -# SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT -# SCHEMA_REGISTRY_HOST_NAME: schemaregistry0 -# SCHEMA_REGISTRY_LISTENERS: http://schemaregistry0:8085 -# -# SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http" -# SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO -# SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas -# -# schemaregistry1: -# image: confluentinc/cp-schema-registry:7.2.1 -# ports: -# - 18085:8085 -# depends_on: -# - kafka1 -# environment: -# SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:29092 -# SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT -# SCHEMA_REGISTRY_HOST_NAME: schemaregistry1 -# SCHEMA_REGISTRY_LISTENERS: http://schemaregistry1:8085 -# -# SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http" -# SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO -# SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas -# -# kafka-connect0: -# image: confluentinc/cp-kafka-connect:7.2.1 -# ports: -# - 8083:8083 -# depends_on: -# - kafka0 -# - schemaregistry0 -# environment: -# CONNECT_BOOTSTRAP_SERVERS: kafka0:29092 -# CONNECT_GROUP_ID: compose-connect-group -# CONNECT_CONFIG_STORAGE_TOPIC: _connect_configs -# CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 -# CONNECT_OFFSET_STORAGE_TOPIC: _connect_offset -# CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 -# CONNECT_STATUS_STORAGE_TOPIC: _connect_status -# CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 -# CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter -# CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085 -# CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter -# CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085 -# CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter -# CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter -# CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect0 -# CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" -# -# kafka-init-topics: -# image: confluentinc/cp-kafka:7.2.1 -# volumes: -# - ./data/message.json:/data/message.json -# depends_on: -# - kafka1 -# command: "bash -c 'echo Waiting for Kafka to be ready... && \ -# cub kafka-ready -b kafka1:29092 1 30 && \ -# kafka-topics --create --topic second.users --partitions 3 --replication-factor 1 --if-not-exists --bootstrap-server kafka1:29092 && \ -# kafka-topics --create --topic second.messages --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server kafka1:29092 && \ -# kafka-topics --create --topic first.messages --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server kafka0:29092 && \ -# kafka-console-producer --bootstrap-server kafka1:29092 -topic second.users < /data/message.json'" + schemaregistry1: + image: confluentinc/cp-schema-registry:7.2.1 + ports: + - 18085:8085 + depends_on: + - kafka1 + environment: + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:29092 + SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT + SCHEMA_REGISTRY_HOST_NAME: schemaregistry1 + SCHEMA_REGISTRY_LISTENERS: http://schemaregistry1:8085 + + SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http" + SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO + SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas + + kafka-connect0: + image: confluentinc/cp-kafka-connect:7.2.1 + ports: + - 8083:8083 + depends_on: + - kafka0 + - schemaregistry0 + environment: + CONNECT_BOOTSTRAP_SERVERS: kafka0:29092 + CONNECT_GROUP_ID: compose-connect-group + CONNECT_CONFIG_STORAGE_TOPIC: _connect_configs + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_OFFSET_STORAGE_TOPIC: _connect_offset + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_STATUS_STORAGE_TOPIC: _connect_status + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter + CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085 + CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter + CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085 + CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect0 + CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" + + kafka-init-topics: + image: confluentinc/cp-kafka:7.2.1 + volumes: + - ./data/message.json:/data/message.json + depends_on: + - kafka1 + command: "bash -c 'echo Waiting for Kafka to be ready... && \ + cub kafka-ready -b kafka1:29092 1 30 && \ + kafka-topics --create --topic second.users --partitions 3 --replication-factor 1 --if-not-exists --bootstrap-server kafka1:29092 && \ + kafka-topics --create --topic second.messages --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server kafka1:29092 && \ + kafka-topics --create --topic first.messages --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server kafka0:29092 && \ + kafka-console-producer --bootstrap-server kafka1:29092 -topic second.users < /data/message.json'" From 8e50d59a445536df7f56249d8411d6f05d953990 Mon Sep 17 00:00:00 2001 From: iliax Date: Fri, 11 Aug 2023 18:56:45 +0400 Subject: [PATCH 05/10] wip --- .../ui/service/quota/ClientQuotaService.java | 11 +++-- .../service/quota/ClientQuotaServiceTest.java | 44 +++++++++++++++++++ .../main/resources/swagger/kafka-ui-api.yaml | 1 + 3 files changed, 53 insertions(+), 3 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java index f132cf17ead..3c78a9bf818 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java @@ -4,6 +4,7 @@ import static org.apache.kafka.common.quota.ClientQuotaEntity.IP; import static org.apache.kafka.common.quota.ClientQuotaEntity.USER; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.model.KafkaCluster; @@ -38,8 +39,11 @@ public Flux list(KafkaCluster cluster) { return adminClientService.get(cluster) .flatMap(ac -> ac.getClientQuotas(ClientQuotaFilter.all())) .flatMapIterable(map -> - map.entrySet().stream().map(e -> ClientQuotaRecord.create(e.getKey(), e.getValue())).toList()) - .sort(ClientQuotaRecord.COMPARATOR); + map.entrySet().stream() + .map(e -> ClientQuotaRecord.create(e.getKey(), e.getValue())) + .sorted(ClientQuotaRecord.COMPARATOR) + .toList() + ); } //returns 201 if new entity was created, 200 if existing was updated, 204 if existing was deleted @@ -66,7 +70,8 @@ public Mono upsert(KafkaCluster cluster, ); } - private ClientQuotaEntity quotaEntity(@Nullable String user, @Nullable String clientId, @Nullable String ip) { + @VisibleForTesting + static ClientQuotaEntity quotaEntity(@Nullable String user, @Nullable String clientId, @Nullable String ip) { if (Stream.of(user, clientId, ip).allMatch(Objects::isNull)) { throw new ValidationException("Quota entity id is not set"); } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java index 1ca42f9de86..d79a62c78af 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java @@ -1,11 +1,55 @@ package com.provectus.kafka.ui.service.quota; +import static org.assertj.core.api.Assertions.assertThat; + import com.provectus.kafka.ui.AbstractIntegrationTest; +import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.service.ClustersStorage; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.springframework.beans.factory.annotation.Autowired; +import reactor.test.StepVerifier; class ClientQuotaServiceTest extends AbstractIntegrationTest { @Autowired ClientQuotaService quotaService; + private KafkaCluster cluster; + + @BeforeEach + void init() { + cluster = applicationContext.getBean(ClustersStorage.class).getClusterByName(LOCAL).get(); + } + + @ParameterizedTest + @CsvSource( + value = { + "testUser, null, null ", + "null, testUserId, null", + "testUser2, testUserId2, null", + "null, null, 127.0.0.1" + }, + nullValues = "null" + ) + void createsQuotaRecord(String user, String clientId, String ip) { + StepVerifier.create( + quotaService.upsert( + cluster, + user, + clientId, + ip, + Map.of( + "producer_byte_rate", 123.0, + "consumer_byte_rate", 234.0, + "request_percentage", 10.0 + ) + ) + ) + .assertNext(status -> assertThat(status.value()).isEqualTo(201)) + .verifyComplete(); + } + } diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 57a0550616a..98fe8eed2e8 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -3650,6 +3650,7 @@ components: - KSQL - ACL - AUDIT + - CLIENT_QUOTAS KafkaAcl: type: object From 5c6cd565a670b68899c31d89a88ea6ee7478e053 Mon Sep 17 00:00:00 2001 From: iliax Date: Fri, 11 Aug 2023 19:24:24 +0400 Subject: [PATCH 06/10] wip --- .../ui/controller/ClientQuotasController.java | 45 ++++++++++--------- .../rbac/permission/ClientQuotaAction.java | 1 + .../kafka/ui/service/audit/AuditRecord.java | 2 + .../ui/service/quota/ClientQuotaService.java | 4 +- .../ui/service/audit/AuditWriterTest.java | 8 +++- .../service/quota/ClientQuotaServiceTest.java | 29 +++++++++--- 6 files changed, 56 insertions(+), 33 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java index 59fc5cdcc2c..64f42045620 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java @@ -47,29 +47,32 @@ public Mono>> listQuotas(String clusterName public Mono> upsertClientQuotas(String clusterName, Mono quotasDto, ServerWebExchange exchange) { - var context = AccessContext.builder() - .cluster(clusterName) - .operationName("upsertClientQuotas") - .clientQuotaActions(ClientQuotaAction.EDIT) - .build(); + return quotasDto.flatMap( + newQuotas -> { + var context = AccessContext.builder() + .cluster(clusterName) + .operationName("upsertClientQuotas") + .operationParams(Map.of("newQuotas", newQuotas)) + .clientQuotaActions(ClientQuotaAction.EDIT) + .build(); - Mono> operation = quotasDto.flatMap( - newQuotas -> - clientQuotaService.upsert( - getCluster(clusterName), - newQuotas.getUser(), - newQuotas.getClientId(), - newQuotas.getIp(), - Optional.ofNullable(newQuotas.getQuotas()).orElse(Map.of()) - .entrySet() - .stream() - .collect(toMap(Map.Entry::getKey, e -> e.getValue().doubleValue())) - ) - ).map(statusCode -> ResponseEntity.status(statusCode).build()); + Mono> operation = clientQuotaService.upsert( + getCluster(clusterName), + newQuotas.getUser(), + newQuotas.getClientId(), + newQuotas.getIp(), + Optional.ofNullable(newQuotas.getQuotas()).orElse(Map.of()) + .entrySet() + .stream() + .collect(toMap(Map.Entry::getKey, e -> e.getValue().doubleValue())) + ) + .map(statusCode -> ResponseEntity.status(statusCode).build()); - return validateAccess(context) - .then(operation) - .doOnEach(sig -> audit(context, sig)); + return validateAccess(context) + .then(operation) + .doOnEach(sig -> audit(context, sig)); + } + ); } private ClientQuotasDTO mapToDto(ClientQuotaRecord quotaRecord) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ClientQuotaAction.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ClientQuotaAction.java index 1b451f7c7c9..b2ff994d0d0 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ClientQuotaAction.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ClientQuotaAction.java @@ -11,6 +11,7 @@ public enum ClientQuotaAction implements PermissibleAction { public static final Set ALTER_ACTIONS = Set.of(EDIT); + @Override public boolean isAlter() { return ALTER_ACTIONS.contains(this); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditRecord.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditRecord.java index 3f7fb44aacd..870da8668df 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditRecord.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditRecord.java @@ -65,6 +65,8 @@ static List getAccessedResources(AccessContext ctx) { .forEach(a -> resources.add(create(a, Resource.ACL, null))); ctx.getAuditAction() .forEach(a -> resources.add(create(a, Resource.AUDIT, null))); + ctx.getClientQuotaActions() + .forEach(a -> resources.add(create(a, Resource.CLIENT_QUOTAS, null))); return resources; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java index 3c78a9bf818..97f17c12c89 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java @@ -4,7 +4,6 @@ import static org.apache.kafka.common.quota.ClientQuotaEntity.IP; import static org.apache.kafka.common.quota.ClientQuotaEntity.USER; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.model.KafkaCluster; @@ -70,8 +69,7 @@ public Mono upsert(KafkaCluster cluster, ); } - @VisibleForTesting - static ClientQuotaEntity quotaEntity(@Nullable String user, @Nullable String clientId, @Nullable String ip) { + private ClientQuotaEntity quotaEntity(@Nullable String user, @Nullable String clientId, @Nullable String ip) { if (Stream.of(user, clientId, ip).allMatch(Objects::isNull)) { throw new ValidationException("Quota entity id is not set"); } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/audit/AuditWriterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/audit/AuditWriterTest.java index 5bcee45ac8e..4213e888d3f 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/audit/AuditWriterTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/audit/AuditWriterTest.java @@ -8,6 +8,7 @@ import com.provectus.kafka.ui.model.rbac.AccessContext; import com.provectus.kafka.ui.model.rbac.AccessContext.AccessContextBuilder; import com.provectus.kafka.ui.model.rbac.permission.AclAction; +import com.provectus.kafka.ui.model.rbac.permission.ClientQuotaAction; import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction; import com.provectus.kafka.ui.model.rbac.permission.ConnectAction; import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction; @@ -55,9 +56,11 @@ static Stream onlyLogsWhenAlterOperationIsPresentForOneOfResource SchemaAction.ALTER_ACTIONS.stream().map(a -> c -> c.schema("sc").schemaActions(a)); Stream> connEditActions = ConnectAction.ALTER_ACTIONS.stream().map(a -> c -> c.connect("conn").connectActions(a)); + Stream> quotaEditActions = + ClientQuotaAction.ALTER_ACTIONS.stream().map(a -> c -> c.clientQuotaActions(a)); return Stream.of( topicEditActions, clusterConfigEditActions, aclEditActions, - cgEditActions, connEditActions, schemaEditActions + cgEditActions, connEditActions, schemaEditActions, quotaEditActions ) .flatMap(c -> c) .map(setter -> setter.apply(AccessContext.builder().cluster("test").operationName("test")).build()); @@ -78,7 +81,8 @@ static Stream doesNothingIfNoResourceHasAlterAction() { c -> c.aclActions(AclAction.VIEW), c -> c.consumerGroup("cg").consumerGroupActions(ConsumerGroupAction.VIEW), c -> c.schema("sc").schemaActions(SchemaAction.VIEW), - c -> c.connect("conn").connectActions(ConnectAction.VIEW) + c -> c.connect("conn").connectActions(ConnectAction.VIEW), + c -> c.clientQuotaActions(ClientQuotaAction.VIEW) ).map(setter -> setter.apply(AccessContext.builder().cluster("test").operationName("test")).build()); } } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java index d79a62c78af..cffc012ca31 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java @@ -30,17 +30,13 @@ void init() { "testUser, null, null ", "null, testUserId, null", "testUser2, testUserId2, null", - "null, null, 127.0.0.1" }, nullValues = "null" ) - void createsQuotaRecord(String user, String clientId, String ip) { + void createUpdateDelete(String user, String clientId, String ip) { + //creating new StepVerifier.create( - quotaService.upsert( - cluster, - user, - clientId, - ip, + quotaService.upsert(cluster, user, clientId, ip, Map.of( "producer_byte_rate", 123.0, "consumer_byte_rate", 234.0, @@ -50,6 +46,25 @@ void createsQuotaRecord(String user, String clientId, String ip) { ) .assertNext(status -> assertThat(status.value()).isEqualTo(201)) .verifyComplete(); + + //updating + StepVerifier.create( + quotaService.upsert(cluster, user, clientId, ip, + Map.of( + "producer_byte_rate", 111111.0, + "consumer_byte_rate", 22222.0 + ) + ) + ) + .assertNext(status -> assertThat(status.value()).isEqualTo(200)) + .verifyComplete(); + + //deleting just created record + StepVerifier.create( + quotaService.upsert(cluster, user, clientId, ip, Map.of()) + ) + .assertNext(status -> assertThat(status.value()).isEqualTo(204)) + .verifyComplete(); } } From a02d537f1e039117086c5dc2e2989e261551f1bc Mon Sep 17 00:00:00 2001 From: iliax Date: Fri, 11 Aug 2023 19:33:56 +0400 Subject: [PATCH 07/10] wip --- .../service/quota/ClientQuotaServiceTest.java | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java index cffc012ca31..c4751717321 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java @@ -34,37 +34,45 @@ void init() { nullValues = "null" ) void createUpdateDelete(String user, String clientId, String ip) { + var quotas = Map.of( + "producer_byte_rate", 123.0, + "consumer_byte_rate", 234.0, + "request_percentage", 10.0 + ); + //creating new StepVerifier.create( - quotaService.upsert(cluster, user, clientId, ip, - Map.of( - "producer_byte_rate", 123.0, - "consumer_byte_rate", 234.0, - "request_percentage", 10.0 - ) - ) + quotaService.upsert(cluster, user, clientId, ip, quotas) ) .assertNext(status -> assertThat(status.value()).isEqualTo(201)) .verifyComplete(); + assertThat(quotaRecordExisits(new ClientQuotaRecord(user, clientId, ip, quotas))) + .isTrue(); + //updating StepVerifier.create( - quotaService.upsert(cluster, user, clientId, ip, - Map.of( - "producer_byte_rate", 111111.0, - "consumer_byte_rate", 22222.0 - ) - ) + quotaService.upsert(cluster, user, clientId, ip, Map.of("producer_byte_rate", 111111.0)) ) .assertNext(status -> assertThat(status.value()).isEqualTo(200)) .verifyComplete(); - //deleting just created record + assertThat(quotaRecordExisits(new ClientQuotaRecord(user, clientId, ip, Map.of("producer_byte_rate", 111111.0)))) + .isTrue(); + + //deleting created record StepVerifier.create( quotaService.upsert(cluster, user, clientId, ip, Map.of()) ) .assertNext(status -> assertThat(status.value()).isEqualTo(204)) .verifyComplete(); + + assertThat(quotaRecordExisits(new ClientQuotaRecord(user, clientId, ip, Map.of("producer_byte_rate", 111111.0)))) + .isFalse(); + } + + private boolean quotaRecordExisits(ClientQuotaRecord rec) { + return quotaService.list(cluster).collectList().block().contains(rec); } } From 2b2a12c336671857c58f02c83b63615fc8ac8b82 Mon Sep 17 00:00:00 2001 From: iliax Date: Fri, 11 Aug 2023 19:35:52 +0400 Subject: [PATCH 08/10] reverted unrelated files --- documentation/compose/kafka-ui.yaml | 2 +- .../java/com/provectus/kafka/ui/KafkaUiApplication.java | 8 +------- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/documentation/compose/kafka-ui.yaml b/documentation/compose/kafka-ui.yaml index ddaa9867cab..14a269ca7cb 100644 --- a/documentation/compose/kafka-ui.yaml +++ b/documentation/compose/kafka-ui.yaml @@ -142,7 +142,7 @@ services: kafka-init-topics: image: confluentinc/cp-kafka:7.2.1 volumes: - - ./data/message.json:/data/message.json + - ./data/message.json:/data/message.json depends_on: - kafka1 command: "bash -c 'echo Waiting for Kafka to be ready... && \ diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/KafkaUiApplication.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/KafkaUiApplication.java index a7f0db04371..8d0eafeff39 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/KafkaUiApplication.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/KafkaUiApplication.java @@ -1,9 +1,6 @@ package com.provectus.kafka.ui; import com.provectus.kafka.ui.util.DynamicConfigOperations; -import java.util.Map; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.AdminClientConfig; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.ldap.LdapAutoConfiguration; import org.springframework.boot.builder.SpringApplicationBuilder; @@ -17,10 +14,7 @@ public class KafkaUiApplication { public static void main(String[] args) { - AdminClient ac = AdminClient.create(Map.of( - AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" - )); - System.out.println(ac); + startApplication(args); } public static ConfigurableApplicationContext startApplication(String[] args) { From 1336ec33cb90cff7467b6899aed97e4a965e99b0 Mon Sep 17 00:00:00 2001 From: iliax Date: Sun, 13 Aug 2023 14:27:03 +0400 Subject: [PATCH 09/10] wip --- .../kafka/ui/controller/ClientQuotasController.java | 13 +++++++++++-- .../kafka/ui/service/quota/ClientQuotaRecord.java | 6 ------ .../kafka/ui/service/quota/ClientQuotaService.java | 8 ++------ 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java index 64f42045620..1d0ecda4a42 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java @@ -9,6 +9,7 @@ import com.provectus.kafka.ui.service.quota.ClientQuotaRecord; import com.provectus.kafka.ui.service.quota.ClientQuotaService; import java.math.BigDecimal; +import java.util.Comparator; import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; @@ -23,6 +24,11 @@ @RequiredArgsConstructor public class ClientQuotasController extends AbstractController implements ClientQuotasApi { + private static final Comparator QUOTA_RECORDS_COMPARATOR = + Comparator.nullsLast(Comparator.comparing(ClientQuotaRecord::user)) + .thenComparing(Comparator.nullsLast(Comparator.comparing(ClientQuotaRecord::clientId))) + .thenComparing(Comparator.nullsLast(Comparator.comparing(ClientQuotaRecord::ip))); + private final ClientQuotaService clientQuotaService; @Override @@ -35,8 +41,11 @@ public Mono>> listQuotas(String clusterName .build(); Mono>> operation = - Mono.just(clientQuotaService.list(getCluster(clusterName)).map(this::mapToDto)) - .map(ResponseEntity::ok); + Mono.just( + clientQuotaService.list(getCluster(clusterName)) + .sort(QUOTA_RECORDS_COMPARATOR) + .map(this::mapToDto) + ).map(ResponseEntity::ok); return validateAccess(context) .then(operation) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaRecord.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaRecord.java index 820ec56043b..a348b2a1ea9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaRecord.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaRecord.java @@ -1,7 +1,6 @@ package com.provectus.kafka.ui.service.quota; import jakarta.annotation.Nullable; -import java.util.Comparator; import java.util.Map; import org.apache.kafka.common.quota.ClientQuotaEntity; @@ -10,11 +9,6 @@ public record ClientQuotaRecord(@Nullable String user, @Nullable String ip, Map quotas) { - static final Comparator COMPARATOR = - Comparator.comparing(r -> r.user) - .thenComparing(r -> r.clientId) - .thenComparing(r -> r.ip); - static ClientQuotaRecord create(ClientQuotaEntity entity, Map quotas) { return new ClientQuotaRecord( entity.entries().get(ClientQuotaEntity.USER), diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java index 97f17c12c89..6f1b5be991b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java @@ -37,12 +37,8 @@ public class ClientQuotaService { public Flux list(KafkaCluster cluster) { return adminClientService.get(cluster) .flatMap(ac -> ac.getClientQuotas(ClientQuotaFilter.all())) - .flatMapIterable(map -> - map.entrySet().stream() - .map(e -> ClientQuotaRecord.create(e.getKey(), e.getValue())) - .sorted(ClientQuotaRecord.COMPARATOR) - .toList() - ); + .flatMapIterable(Map::entrySet) + .map(e -> ClientQuotaRecord.create(e.getKey(), e.getValue())); } //returns 201 if new entity was created, 200 if existing was updated, 204 if existing was deleted From 26c21f733df3a41b51236a8b27acf294b0313c0b Mon Sep 17 00:00:00 2001 From: iliax Date: Sun, 13 Aug 2023 14:33:21 +0400 Subject: [PATCH 10/10] wip --- .../ui/controller/ClientQuotasController.java | 2 +- .../ui/service/quota/ClientQuotaService.java | 2 +- .../ui/service/quota/ClientQuotaServiceTest.java | 16 ++++++++-------- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java index 1d0ecda4a42..8be2cddcc13 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java @@ -42,7 +42,7 @@ public Mono>> listQuotas(String clusterName Mono>> operation = Mono.just( - clientQuotaService.list(getCluster(clusterName)) + clientQuotaService.getAll(getCluster(clusterName)) .sort(QUOTA_RECORDS_COMPARATOR) .map(this::mapToDto) ).map(ResponseEntity::ok); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java index 6f1b5be991b..679f7b9381d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java @@ -34,7 +34,7 @@ public class ClientQuotaService { private final AdminClientService adminClientService; - public Flux list(KafkaCluster cluster) { + public Flux getAll(KafkaCluster cluster) { return adminClientService.get(cluster) .flatMap(ac -> ac.getClientQuotas(ClientQuotaFilter.all())) .flatMapIterable(Map::entrySet) diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java index c4751717321..2f04008dd46 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java @@ -34,7 +34,7 @@ void init() { nullValues = "null" ) void createUpdateDelete(String user, String clientId, String ip) { - var quotas = Map.of( + var initialQuotas = Map.of( "producer_byte_rate", 123.0, "consumer_byte_rate", 234.0, "request_percentage", 10.0 @@ -42,22 +42,22 @@ void createUpdateDelete(String user, String clientId, String ip) { //creating new StepVerifier.create( - quotaService.upsert(cluster, user, clientId, ip, quotas) + quotaService.upsert(cluster, user, clientId, ip, initialQuotas) ) .assertNext(status -> assertThat(status.value()).isEqualTo(201)) .verifyComplete(); - assertThat(quotaRecordExisits(new ClientQuotaRecord(user, clientId, ip, quotas))) + assertThat(quotaRecordExists(new ClientQuotaRecord(user, clientId, ip, initialQuotas))) .isTrue(); //updating StepVerifier.create( - quotaService.upsert(cluster, user, clientId, ip, Map.of("producer_byte_rate", 111111.0)) + quotaService.upsert(cluster, user, clientId, ip, Map.of("producer_byte_rate", 22222.0)) ) .assertNext(status -> assertThat(status.value()).isEqualTo(200)) .verifyComplete(); - assertThat(quotaRecordExisits(new ClientQuotaRecord(user, clientId, ip, Map.of("producer_byte_rate", 111111.0)))) + assertThat(quotaRecordExists(new ClientQuotaRecord(user, clientId, ip, Map.of("producer_byte_rate", 22222.0)))) .isTrue(); //deleting created record @@ -67,12 +67,12 @@ void createUpdateDelete(String user, String clientId, String ip) { .assertNext(status -> assertThat(status.value()).isEqualTo(204)) .verifyComplete(); - assertThat(quotaRecordExisits(new ClientQuotaRecord(user, clientId, ip, Map.of("producer_byte_rate", 111111.0)))) + assertThat(quotaRecordExists(new ClientQuotaRecord(user, clientId, ip, Map.of("producer_byte_rate", 22222.0)))) .isFalse(); } - private boolean quotaRecordExisits(ClientQuotaRecord rec) { - return quotaService.list(cluster).collectList().block().contains(rec); + private boolean quotaRecordExists(ClientQuotaRecord rec) { + return quotaService.getAll(cluster).collectList().block().contains(rec); } }