Skip to content

Commit

Permalink
[improve][broker] Exclude system topics from namespace level publish …
Browse files Browse the repository at this point in the history
…and dispatch rate limiting (apache#23589)
  • Loading branch information
poorbarcode authored Nov 12, 2024
1 parent 0969869 commit 9bcbb20
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
if (log.isDebugEnabled()) {
log.debug("[{}]updateTopicPolicyByNamespacePolicy,data={}", topic, namespacePolicies);
}
if (!isSystemTopic()) {
updateNamespacePublishRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName());
updateNamespaceDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName());
}
topicPolicies.getRetentionPolicies().updateNamespaceValue(namespacePolicies.retention_policies);
topicPolicies.getCompactionThreshold().updateNamespaceValue(namespacePolicies.compaction_threshold);
topicPolicies.getReplicationClusters().updateNamespaceValue(
Expand All @@ -305,7 +309,6 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled);
topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateNamespaceValue(
namespacePolicies.deduplicationSnapshotIntervalSeconds);
updateNamespacePublishRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName());
topicPolicies.getDelayedDeliveryEnabled().updateNamespaceValue(
Optional.ofNullable(namespacePolicies.delayed_delivery_policies)
.map(DelayedDeliveryPolicies::isActive).orElse(null));
Expand All @@ -326,7 +329,6 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
updateNamespaceSubscriptionDispatchRate(namespacePolicies,
brokerService.getPulsar().getConfig().getClusterName());
updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies);
updateNamespaceDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName());
topicPolicies.getSchemaValidationEnforced().updateNamespaceValue(namespacePolicies.schema_validation_enforced);
topicPolicies.getEntryFilters().updateNamespaceValue(namespacePolicies.entryFilters);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;

public class DisabledPublishRateLimiter implements PublishRateLimiter {

public static final DisabledPublishRateLimiter INSTANCE = new DisabledPublishRateLimiter();

private DisabledPublishRateLimiter() {}

@Override
public void handlePublishThrottling(Producer producer, int numOfMessages, long msgSizeInBytes) {

}

@Override
public void update(Policies policies, String clusterName) {

}

@Override
public void update(PublishRate maxPublishRate) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@
package org.apache.pulsar.broker.service.persistent;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.DisabledPublishRateLimiter;
import org.apache.pulsar.broker.service.PublishRateLimiter;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.Policies;

public class SystemTopic extends PersistentTopic {

Expand Down Expand Up @@ -111,4 +116,19 @@ public EntryFilters getEntryFiltersPolicy() {
public List<EntryFilter> getEntryFilters() {
return null;
}

@Override
public PublishRateLimiter getBrokerPublishRateLimiter() {
return DisabledPublishRateLimiter.INSTANCE;
}

@Override
public void updateResourceGroupLimiter(@Nonnull Policies namespacePolicies) {
// nothing todo.
}

@Override
public Optional<DispatchRateLimiter> getBrokerDispatchRateLimiter() {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
Expand Down Expand Up @@ -73,6 +75,22 @@ public void testProducerBlockedByPrecisTopicPublishRateLimiting() throws Excepti
}
}

@Test
public void testSystemTopicPublishNonBlock() throws Exception {
super.baseSetup();
PublishRate publishRate = new PublishRate(1,10);
admin.namespaces().setPublishRate("prop/ns-abc", publishRate);
final String topic = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp");
PulsarAdmin admin1 = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
? brokerUrl.toString() : brokerUrlTls.toString()).readTimeout(5, TimeUnit.SECONDS).build();
admin1.topics().createNonPartitionedTopic(topic);
admin1.topicPolicies().setDeduplicationStatus(topic, true);
admin1.topicPolicies().setDeduplicationStatus(topic, false);
// cleanup.
admin.namespaces().removePublishRate("prop/ns-abc");
admin1.close();
}

@Test
public void testPrecisTopicPublishRateLimitingProduceRefresh() throws Exception {
PublishRate publishRate = new PublishRate(1,10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -48,6 +49,8 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -214,6 +217,42 @@ public void testMessageRateDynamicallyChange() throws Exception {
producer.close();
}

@SuppressWarnings("deprecation")
@Test
public void testSystemTopicDeliveryNonBlock() throws Exception {
final String namespace = "my-property/throttling_ns";
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
final String topicName = "persistent://" + namespace + "/" + UUID.randomUUID().toString().replaceAll("-", "");
admin.topics().createNonPartitionedTopic(topicName);
// Set a rate limitation.
DispatchRate dispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(1)
.dispatchThrottlingRateInByte(-1)
.ratePeriodInSecond(360)
.build();
admin.namespaces().setDispatchRate(namespace, dispatchRate);

// Verify the limitation does not take effect. in other words, the topic policies should takes effect.
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get();
admin.topicPolicies().setPublishRate(topicName, new PublishRate(1000, 1000));
Awaitility.await().untilAsserted(() -> {
assertNotNull(persistentTopic.getHierarchyTopicPolicies().getPublishRate().getTopicValue());
});
admin.topicPolicies().setRetention(topicName, new RetentionPolicies(1000, 1000));
Awaitility.await().untilAsserted(() -> {
assertNotNull(persistentTopic.getHierarchyTopicPolicies().getRetentionPolicies().getTopicValue());
});
admin.topicPolicies().setMessageTTL(topicName, 1000);
Awaitility.await().untilAsserted(() -> {
assertNotNull(persistentTopic.getHierarchyTopicPolicies().getMessageTTLInSeconds().getTopicValue());
});

// cleanup.
admin.topics().delete(topicName);
admin.namespaces().removeDispatchRate(namespace);
}

/**
* verify: consumer should not receive all messages due to message-rate throttling
*
Expand Down

0 comments on commit 9bcbb20

Please sign in to comment.