Skip to content

KAFKA-19315: Move ControllerMutationQuotaManager to server module #19807

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 46 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
a11810c
feat: new ControllerMutationQuotaManager
YutaLin May 21, 2025
beeaa37
feat: add ClientSensors
YutaLin May 21, 2025
1e39f88
fix: style issue
YutaLin May 21, 2025
ece698f
feat: rewrite ClientQuotaManager and put in server module
YutaLin May 24, 2025
0e9c42e
feat: complete ControllerMutationQuotaManager
YutaLin May 24, 2025
bdc3932
feat: move ControllerMutationQuota out
YutaLin May 24, 2025
8b4d0f8
feat: replace scala version of ClientQuotaManager and ControllerMutat…
YutaLin May 25, 2025
03039d0
feat: remove ControllerMutationQuotaManager scala
YutaLin May 25, 2025
6d0a9d7
feat: remove ClientQuotaManager scala version and fix related files
YutaLin May 25, 2025
3ec768e
fix: remove method overloads in ControllerMutationQuotaManager to fix…
YutaLin May 25, 2025
f2d7168
Merge branch 'trunk' into KAFKA-19315
YutaLin May 25, 2025
3d6ce9b
fix: metricTagsToSensorSuffix in ClientQuotaManager
YutaLin May 25, 2025
188f04a
Merge branch 'trunk' into KAFKA-19315
YutaLin May 25, 2025
1e8d590
fix: fix matrictags with LinkedHashMap
YutaLin May 26, 2025
068d4fd
fix: import style
YutaLin May 26, 2025
8ca37a6
refactor: make metrictags immutable
YutaLin May 27, 2025
e491ab5
refactor: make UnboundedControllerMutationQuota inner
YutaLin May 27, 2025
12d2dd1
fix: java doc
YutaLin May 27, 2025
de33e8c
fix: make ClienSensors normal class
YutaLin May 27, 2025
009729c
fix: add null check in PermissiveControllerMutationQuota
YutaLin May 27, 2025
fe24c23
Merge branch 'trunk' into KAFKA-19315
YutaLin May 27, 2025
2618d23
style: import
YutaLin May 27, 2025
8d3f9ff
fix: add null check in StrictControllerMutationQuota
YutaLin May 27, 2025
bc35ff5
fix: remove unused comments
YutaLin May 27, 2025
a594c2c
fix: java docs and scala optional
YutaLin May 27, 2025
f02b95a
Merge branch trunk into KAFKA-19315
YutaLin May 28, 2025
b751ab6
fix: javadoc format
YutaLin May 28, 2025
2001e6d
fix: indentation
YutaLin May 28, 2025
d1d6f2f
fix: javadocs and comments
YutaLin May 28, 2025
7820866
Merge branch 'trunk' into KAFKA-19315
YutaLin May 28, 2025
2f31618
fix: remove equal method in ClientSensors
YutaLin May 28, 2025
a4327fe
Merge branch 'trunk' into KAFKA-19315
YutaLin May 28, 2025
db7a207
fix: add equal and hash method
YutaLin May 29, 2025
b819738
fix: metric tags suffix
YutaLin May 29, 2025
a3a93ff
Merge branch 'trunk' into KAFKA-19315
YutaLin May 29, 2025
40e9ecd
refactor: move QuotaTypes in ClientQuotaManager
YutaLin May 29, 2025
3ffb930
refactor: coding convention
YutaLin May 29, 2025
46f4736
refactor: make entity record
YutaLin May 29, 2025
20a7b69
fix: flatten code
YutaLin May 30, 2025
c004886
refactor: remove BaseUserEntity
YutaLin May 30, 2025
24dbb03
Merge branch 'trunk' into KAFKA-19315
YutaLin May 30, 2025
d33a8db
fix: NPComplexity
YutaLin May 30, 2025
f186386
docs: update format
YutaLin May 31, 2025
8c1a233
fix: remove Map copy of LinkedHashMap to prevent metric tags reorder
YutaLin May 31, 2025
8937e3a
Merge branch 'trunk' into KAFKA-19315
YutaLin May 31, 2025
b397043
Merge branch trunk into KAFKA-19315
YutaLin Jun 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control-server.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
<allow pkg="org.apache.kafka.server" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.network.metrics" />
<allow pkg="org.apache.kafka.network" />
<allow pkg="org.apache.kafka.storage.internals.log" />
<allow pkg="org.apache.kafka.storage.internals.checkpoint" />
<allow pkg="org.apache.logging.log4j" />
Expand Down
15 changes: 7 additions & 8 deletions core/src/main/java/kafka/server/ClientRequestQuotaManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,16 @@
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.ClientQuotaManager;
import org.apache.kafka.server.config.ClientQuotaManagerConfig;
import org.apache.kafka.server.quota.ClientQuotaCallback;
import org.apache.kafka.server.quota.QuotaType;
import org.apache.kafka.server.quota.QuotaUtils;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import scala.jdk.javaapi.CollectionConverters;
import scala.jdk.javaapi.OptionConverters;

@SuppressWarnings("this-escape")
public class ClientRequestQuotaManager extends ClientQuotaManager {
// Since exemptSensor is for all clients and has a constant name, we do not expire exemptSensor and only
Expand All @@ -56,7 +55,7 @@ public ClientRequestQuotaManager(
String threadNamePrefix,
Optional<Plugin<ClientQuotaCallback>> quotaCallbackPlugin
) {
super(config, metrics, QuotaType.REQUEST, time, threadNamePrefix, OptionConverters.toScala(quotaCallbackPlugin));
super(config, metrics, QuotaType.REQUEST, time, threadNamePrefix, quotaCallbackPlugin);
this.maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds);
this.metrics = metrics;
this.exemptMetricName = metrics.metricName("exempt-request-time", QuotaType.REQUEST.toString(), "Tracking exempt-request-time utilization percentage");
Expand All @@ -72,8 +71,8 @@ private void recordExempt(double value) {
}

/**
* Records that a user/clientId changed request processing time being throttled. If quota has been violated, return
* throttle time in milliseconds. Throttle time calculation may be overridden by sub-classes.
* Records that a user/clientId changed request processing time being throttled. If the quota has been violated, return
* throttle time in milliseconds. Subclasses may override throttle time calculation.
* @param request client request
* @return Number of milliseconds to throttle in case of quota violation. Zero otherwise
*/
Expand Down Expand Up @@ -103,8 +102,8 @@ public long throttleTime(QuotaViolationException e, long timeMs) {
}

@Override
public MetricName clientQuotaMetricName(scala.collection.immutable.Map<String, String> quotaMetricTags) {
return metrics.metricName("request-time", QuotaType.REQUEST.toString(), "Tracking request-time per user/client-id", CollectionConverters.asJava(quotaMetricTags));
public MetricName clientQuotaMetricName(Map<String, String> quotaMetricTags) {
return metrics.metricName("request-time", QuotaType.REQUEST.toString(), "Tracking request-time per user/client-id", quotaMetricTags);
}

private double nanosToPercentage(long nanos) {
Expand Down
11 changes: 5 additions & 6 deletions core/src/main/java/kafka/server/QuotaFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.ClientQuotaManager;
import org.apache.kafka.server.ControllerMutationQuotaManager;
import org.apache.kafka.server.config.ClientQuotaManagerConfig;
import org.apache.kafka.server.config.QuotaConfig;
import org.apache.kafka.server.config.ReplicationQuotaManagerConfig;
Expand All @@ -29,8 +31,6 @@

import java.util.Optional;

import scala.Option;
import scala.jdk.javaapi.OptionConverters;

public class QuotaFactory {

Expand Down Expand Up @@ -124,13 +124,12 @@ public static QuotaManagers instantiate(
String role
) {
Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin = createClientQuotaCallback(cfg, metrics, role);
Option<Plugin<ClientQuotaCallback>> clientQuotaCallbackPluginOption = OptionConverters.toScala(clientQuotaCallbackPlugin);

return new QuotaManagers(
new ClientQuotaManager(clientConfig(cfg), metrics, QuotaType.FETCH, time, threadNamePrefix, clientQuotaCallbackPluginOption),
new ClientQuotaManager(clientConfig(cfg), metrics, QuotaType.PRODUCE, time, threadNamePrefix, clientQuotaCallbackPluginOption),
new ClientQuotaManager(clientConfig(cfg), metrics, QuotaType.FETCH, time, threadNamePrefix, clientQuotaCallbackPlugin),
new ClientQuotaManager(clientConfig(cfg), metrics, QuotaType.PRODUCE, time, threadNamePrefix, clientQuotaCallbackPlugin),
new ClientRequestQuotaManager(clientConfig(cfg), metrics, time, threadNamePrefix, clientQuotaCallbackPlugin),
new ControllerMutationQuotaManager(clientControllerMutationConfig(cfg), metrics, time, threadNamePrefix, clientQuotaCallbackPluginOption),
new ControllerMutationQuotaManager(clientControllerMutationConfig(cfg), metrics, time, threadNamePrefix, clientQuotaCallbackPlugin),
new ReplicationQuotaManager(replicationConfig(cfg), metrics, QuotaType.LEADER_REPLICATION, time),
new ReplicationQuotaManager(replicationConfig(cfg), metrics, QuotaType.FOLLOWER_REPLICATION, time),
new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg), metrics, QuotaType.ALTER_LOG_DIRS_REPLICATION, time),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.ControllerMutationQuota;

import scala.collection.{Map, Seq, Set, mutable}
import scala.jdk.CollectionConverters._
Expand Down
Loading