diff --git a/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java b/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java index 8bdb2879df6..1267856da69 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java @@ -24,6 +24,7 @@ import org.apache.commons.lang3.tuple.Triple; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; +import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.entity.ClientGroup; @@ -445,15 +446,36 @@ protected class ClientEventSet { private final String group; private volatile long lastAccessTime = System.currentTimeMillis(); private volatile long lastConsumeTime = System.currentTimeMillis(); + /** + * Cache resolved max capacity to avoid per-offer SubscriptionGroupConfig lookup + attribute + * parsing on the hot dispatch path. Soft-cap semantics tolerate a short staleness window, + * so refresh lazily by TTL {@link BrokerConfig#getLiteEventCapacityCacheTtlMs()}. + */ + private volatile int maxCapacityCache; + private volatile long capacityRefreshTime = System.currentTimeMillis(); public ClientEventSet(String group) { this.group = group; - events = new LinkedBlockingQueue<>(LiteMetadataUtil.getMaxClientEventCount(group, brokerController)); + // Use a large bounded queue as a hard ceiling; the effective capacity is enforced + // dynamically via soft-cap in offer() so that maxClientEventCount can be changed + // at runtime without restart. + this.events = new LinkedBlockingQueue<>(100_000); + this.maxCapacityCache = LiteMetadataUtil.getMaxClientEventCount(group, brokerController); + } + + private int getMaxCapacity() { + long now = System.currentTimeMillis(); + long ttl = brokerController.getBrokerConfig().getLiteEventCapacityCacheTtlMs(); + if (now - capacityRefreshTime > ttl) { + maxCapacityCache = LiteMetadataUtil.getMaxClientEventCount(group, brokerController); + capacityRefreshTime = now; + } + return maxCapacityCache; } // return false if and only if the queue is full, has race condition with poll(), but no side effect. public boolean offer(String event) { - if (events.remainingCapacity() == 0) { + if (events.size() >= getMaxCapacity()) { return false; } boolean rst; @@ -486,7 +508,8 @@ public boolean maybeBlock() { public boolean isLowWaterMark() { int used = events.size(); - return (double) used / (used + events.remainingCapacity()) < LOW_WATER_MARK; + int maxCapacity = getMaxCapacity(); + return maxCapacity <= 0 || (double) used / maxCapacity < LOW_WATER_MARK; } public boolean isActiveConsuming() { @@ -516,7 +539,7 @@ public void onUnregister(String clientId, String group, String lmqName) { } /** - * Mostly triggered when client channel closed, ensure that lite subscriptions is cleared before. + * Mostly triggered when client channel closed, ensure that lite subscriptions is cleared before. */ @Override public void onRemoveAll(String clientId, String group) { @@ -553,10 +576,12 @@ public String next() { static class LiteSubscriptionIterator implements Iterator { private final Iterator iterator; private final String parentTopic; + public LiteSubscriptionIterator(String parentTopic, Iterator iterator) { this.parentTopic = parentTopic; this.iterator = iterator; } + @Override public boolean hasNext() { return iterator.hasNext(); @@ -572,6 +597,7 @@ protected static class FullDispatchRequest { private final String clientId; private final String group; private final long timestamp; + public FullDispatchRequest(String clientId, String group, long delayMillis) { this.clientId = clientId; this.group = group; diff --git a/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteEventDispatcherTest.java b/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteEventDispatcherTest.java index 31d5562f928..36e4ae23780 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteEventDispatcherTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteEventDispatcherTest.java @@ -580,9 +580,9 @@ public void testDoFullDispatchForClientNormalCase() { when(consumerOffsetManager.queryOffset(group, lmqName, 0)).thenReturn(50L); LiteEventDispatcher.ClientEventSet eventSet = spy(liteEventDispatcher.new ClientEventSet(group)); - when(eventSet.maybeBlock()).thenReturn(false); - when(eventSet.isLowWaterMark()).thenReturn(true); - when(eventSet.offer(lmqName)).thenReturn(true); + doReturn(false).when(eventSet).maybeBlock(); + doReturn(true).when(eventSet).isLowWaterMark(); + doReturn(true).when(eventSet).offer(lmqName); liteEventDispatcher.clientEventMap.put(clientId, eventSet); diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 08e27a20ee3..8f3ea6b6c88 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -558,6 +558,8 @@ public class BrokerConfig extends BrokerIdentity { private int maxClientEventCount = 100; + private long liteEventCapacityCacheTtlMs = 5000; + private long liteEventFullDispatchDelayTime = 10 * 1000; private long liteEventFullDispatchDelayTimeForWildcardGroup = 10 * 1000; @@ -2441,6 +2443,14 @@ public void setMaxClientEventCount(int maxClientEventCount) { this.maxClientEventCount = maxClientEventCount; } + public long getLiteEventCapacityCacheTtlMs() { + return liteEventCapacityCacheTtlMs; + } + + public void setLiteEventCapacityCacheTtlMs(long liteEventCapacityCacheTtlMs) { + this.liteEventCapacityCacheTtlMs = liteEventCapacityCacheTtlMs; + } + public long getLiteEventFullDispatchDelayTime() { return liteEventFullDispatchDelayTime; }