-
Notifications
You must be signed in to change notification settings - Fork 14.4k
KAFKA-19315: Move ControllerMutationQuotaManager to server module #19807
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
… mock issue in KafkaApisTest
server/src/main/java/org/apache/kafka/server/UnboundedControllerMutationQuota.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/kafka/server/AbstractControllerMutationQuota.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/kafka/server/PermissiveControllerMutationQuota.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/kafka/server/PermissiveControllerMutationQuota.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/kafka/server/ClientSensors.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @YutaLin for this patch, left some comments
server/src/main/java/org/apache/kafka/server/StrictControllerMutationQuota.java
Outdated
Show resolved
Hide resolved
core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/server/ClientRequestQuotaManagerTest.scala
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java
Outdated
Show resolved
Hide resolved
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@YutaLin: Please fix the conflicts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @YutaLin for update, few more comments
server/src/main/java/org/apache/kafka/server/ControllerMutationQuotaManager.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/kafka/server/AbstractControllerMutationQuota.java
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/kafka/server/ClientSensors.java
Outdated
Show resolved
Hide resolved
We will continue tracking the failing test PlaintextConsumerCommitTest > testCommitAsyncCompletedBeforeConsumerCloses [1] Type=Raft-Isolated, which is recorded in https://issues.apache.org/jira/browse/KAFKA-19352. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@YutaLin thanks for this patch. a couple of comments are left
protected final Metrics metrics; | ||
private final QuotaType quotaType; | ||
protected final Time time; | ||
private final Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need to keep it as member of ClientQuotaManager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume you will remove this variable, but you don't. Could you please share the reason with me?
server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@YutaLin thanks for this patch. a couple of comments remain
server/src/main/java/org/apache/kafka/server/PermissiveControllerMutationQuota.java
Show resolved
Hide resolved
server/src/main/java/org/apache/kafka/server/ControllerMutationQuotaManager.java
Show resolved
Hide resolved
server/src/main/java/org/apache/kafka/server/ControllerMutationQuotaManager.java
Show resolved
Hide resolved
server/src/main/java/org/apache/kafka/server/ControllerMutationQuotaManager.java
Show resolved
Hide resolved
|
||
public class ClientQuotaManager { | ||
|
||
static final class QuotaTypes { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need a struct class?
} | ||
|
||
private void start() { | ||
throttledChannelReaper.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method can be inlined to constructor
this.quotaTypesEnabled = clientQuotaCallbackPlugin.isPresent() ? | ||
QuotaTypes.CUSTOM_QUOTAS : QuotaTypes.NO_QUOTAS; | ||
|
||
this.delayQueueSensor = metrics.sensor(quotaType.toString() + "-delayQueue"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
toString
is redunant
int throttleTimeMs | ||
) { | ||
if (throttleTimeMs > 0) { | ||
ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be cool if we can leverage var
to cleanup the type declaration.
var clientSensors = getOrCreateQuotaSensors(session, clientId);
clientSensors.throttleTimeSensor().record(throttleTimeMs);
var throttledChannel = new ThrottledChannel(time, throttleTimeMs, throttleCallback);
* Recording any larger value will always be throttled, even if no other values were recorded in the quota window. | ||
* This is used for deciding the maximum bytes that can be fetched at once | ||
*/ | ||
public double getMaxValueInQuotaWindow(Session session, String clientId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you please remove the prefix get
* This is used for deciding the maximum bytes that can be fetched at once | ||
*/ | ||
public double getMaxValueInQuotaWindow(Session session, String clientId) { | ||
if (quotasEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (!quotasEnabled()) return Double.MAX_VALUE;
var clientSensors = getOrCreateQuotaSensors(session, clientId);
var limit = quotaCallback.quotaLimit(clientQuotaType, clientSensors.metricTags());
if (limit != null) return limit * (config.numQuotaSamples - 1) * config.quotaWindowSizeSeconds;
return Double.MAX_VALUE;
Migrate ControllerMutationQuotaManager to Java implementation and move
to server module, including ClientQuotaManager and associated files.
Reviewers: PoAn Yang [email protected],
Ken Huang [email protected], TengYao
Chi [email protected], Chia-Ping
Tsai [email protected]