diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/entity/EntityUtil.java b/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/entity/EntityUtil.java index 1c3df152c..20990020d 100644 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/entity/EntityUtil.java +++ b/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/entity/EntityUtil.java @@ -13,14 +13,14 @@ package com.baidu.bifromq.dist.entity; -import static com.baidu.bifromq.dist.util.TopicUtil.escape; -import static com.baidu.bifromq.dist.util.TopicUtil.isNormalTopicFilter; -import static com.baidu.bifromq.dist.util.TopicUtil.unescape; +import static com.baidu.bifromq.util.TopicUtil.escape; +import static com.baidu.bifromq.util.TopicUtil.unescape; import static com.baidu.bifromq.util.TopicConst.DELIMITER_CHAR; import static com.baidu.bifromq.util.TopicConst.NUL; import static com.baidu.bifromq.util.TopicConst.NUL_CHAR; import static com.baidu.bifromq.util.TopicConst.ORDERED_SHARE; import static com.baidu.bifromq.util.TopicConst.UNORDERED_SHARE; +import static com.baidu.bifromq.util.TopicUtil.isNormalTopicFilter; import static com.google.protobuf.ByteString.copyFromUtf8; import com.baidu.bifromq.dist.rpc.proto.GroupMatchRecord; diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/entity/GroupMatching.java b/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/entity/GroupMatching.java index d0424065a..2674cd205 100644 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/entity/GroupMatching.java +++ b/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/entity/GroupMatching.java @@ -16,7 +16,7 @@ import static com.baidu.bifromq.util.TopicConst.DELIMITER_CHAR; import static com.baidu.bifromq.util.TopicConst.ORDERED_SHARE; import static com.baidu.bifromq.util.TopicConst.UNORDERED_SHARE; -import static com.baidu.bifromq.dist.util.TopicUtil.unescape; +import static com.baidu.bifromq.util.TopicUtil.unescape; import com.google.common.collect.Sets; import com.google.protobuf.ByteString; diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/entity/NormalMatching.java b/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/entity/NormalMatching.java index ffb3b13e0..c14e4343a 100644 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/entity/NormalMatching.java +++ b/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/entity/NormalMatching.java @@ -14,7 +14,7 @@ package com.baidu.bifromq.dist.entity; -import static com.baidu.bifromq.dist.util.TopicUtil.unescape; +import static com.baidu.bifromq.util.TopicUtil.unescape; import static com.baidu.bifromq.util.TopicConst.NUL; import com.baidu.bifromq.type.MatchInfo; diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicUtil.java b/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicUtil.java deleted file mode 100644 index 64f2dcb15..000000000 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicUtil.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright (c) 2023. The BifroMQ Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - */ - -package com.baidu.bifromq.dist.util; - -import static com.baidu.bifromq.util.TopicConst.DELIMITER; -import static com.baidu.bifromq.util.TopicConst.DELIMITER_CHAR; -import static com.baidu.bifromq.util.TopicConst.MULTI_WILDCARD; -import static com.baidu.bifromq.util.TopicConst.NUL; -import static com.baidu.bifromq.util.TopicConst.NUL_CHAR; -import static com.baidu.bifromq.util.TopicConst.ORDERED_SHARE; -import static com.baidu.bifromq.util.TopicConst.SINGLE_WILDCARD; -import static com.baidu.bifromq.util.TopicConst.UNORDERED_SHARE; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.function.Function; - -public class TopicUtil { - public static String escape(String topicFilter) { - assert !topicFilter.contains(NUL); - return topicFilter.replace(DELIMITER, NUL); - } - - public static String unescape(String topicFilter) { - return topicFilter.replace(NUL, DELIMITER); - } - - public static List parse(String tenantId, String topic, boolean isEscaped) { - List topicLevels = new ArrayList<>(); - topicLevels.add(tenantId); - return parse(topic, isEscaped, topicLevels); - } - - // parse a topic or topic filter string into a list of topic levels - // eg. "/" -> ["",""], "/a" -> ["",a], "a/" -> [a,""] - public static List parse(String topic, boolean isEscaped) { - return parse(topic, isEscaped, new ArrayList<>()); - } - - // parse a topic or topic filter string into a list of topic levels - // eg. "/" -> ["",""], "/a" -> ["",a], "a/" -> [a,""] - private static List parse(String topic, boolean isEscaped, List topicLevels) { - char splitter = isEscaped ? NUL_CHAR : DELIMITER_CHAR; - StringBuilder tl = new StringBuilder(); - for (int i = 0; i < topic.length(); i++) { - if (topic.charAt(i) == splitter) { - topicLevels.add(tl.toString()); - tl.delete(0, tl.length()); - } else { - tl.append(topic.charAt(i)); - } - } - topicLevels.add(tl.toString()); - return topicLevels; - } - - public static boolean isWildcardTopicFilter(String topicFilter) { - return topicFilter.contains(SINGLE_WILDCARD) || topicFilter.contains(MULTI_WILDCARD); - } - - public static boolean isNormalTopicFilter(String topicFilter) { - return !isUnorderedShared(topicFilter) && !isOrderedShared(topicFilter); - } - - public static boolean isUnorderedShared(String topicFilter) { - return topicFilter.startsWith(UNORDERED_SHARE); - } - - public static boolean isOrderedShared(String topicFilter) { - return topicFilter.startsWith(ORDERED_SHARE); - } - - public static String fastJoin(CharSequence delimiter, Iterable strings) { - StringBuilder sb = new StringBuilder(); - Iterator itr = strings.iterator(); - while (itr.hasNext()) { - sb.append(itr.next()); - if (itr.hasNext()) { - sb.append(delimiter); - } - } - return sb.toString(); - } - - public static String fastJoin(CharSequence delimiter, Iterable items, - Function toCharSequence) { - StringBuilder sb = new StringBuilder(); - Iterator itr = items.iterator(); - while (itr.hasNext()) { - sb.append(toCharSequence.apply(itr.next())); - if (itr.hasNext()) { - sb.append(delimiter); - } - } - return sb.toString(); - } -} diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TestUtil.java b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TestUtil.java index 664f43f82..6df8af846 100644 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TestUtil.java +++ b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TestUtil.java @@ -13,8 +13,8 @@ package com.baidu.bifromq.dist; -import static com.baidu.bifromq.dist.util.TopicUtil.fastJoin; -import static com.baidu.bifromq.dist.util.TopicUtil.parse; +import static com.baidu.bifromq.util.TopicUtil.fastJoin; +import static com.baidu.bifromq.util.TopicUtil.parse; import static com.baidu.bifromq.util.TopicConst.DELIMITER; import static com.baidu.bifromq.util.TopicConst.MULTI_WILDCARD; import static com.baidu.bifromq.util.TopicConst.NUL; diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TopicMatcher.java b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TopicMatcher.java index e1878bbb3..ba1a68a34 100644 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TopicMatcher.java +++ b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TopicMatcher.java @@ -19,7 +19,7 @@ import static com.baidu.bifromq.util.TopicConst.SINGLE_WILDCARD; import static com.baidu.bifromq.util.TopicConst.SYS_PREFIX; -import com.baidu.bifromq.dist.util.TopicUtil; +import com.baidu.bifromq.util.TopicUtil; import java.util.List; import java.util.Optional; diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TopicMatcherTest.java b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TopicMatcherTest.java index a3e69c7fb..335049516 100644 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TopicMatcherTest.java +++ b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TopicMatcherTest.java @@ -13,7 +13,7 @@ package com.baidu.bifromq.dist; -import static com.baidu.bifromq.dist.util.TopicUtil.escape; +import static com.baidu.bifromq.util.TopicUtil.escape; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/entity/EntityUtilTest.java b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/entity/EntityUtilTest.java index 404694c0a..0990e9a25 100644 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/entity/EntityUtilTest.java +++ b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/entity/EntityUtilTest.java @@ -17,7 +17,7 @@ import static com.baidu.bifromq.dist.entity.EntityUtil.parseTopicFilter; import static com.baidu.bifromq.dist.entity.EntityUtil.toMatchRecordKey; import static com.baidu.bifromq.dist.entity.EntityUtil.toQInboxId; -import static com.baidu.bifromq.dist.util.TopicUtil.escape; +import static com.baidu.bifromq.util.TopicUtil.escape; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/trie/TopicFilterIteratorTest.java b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/trie/TopicFilterIteratorTest.java index ce5bd4875..23c1ac2eb 100644 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/trie/TopicFilterIteratorTest.java +++ b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/trie/TopicFilterIteratorTest.java @@ -15,15 +15,15 @@ import static com.baidu.bifromq.dist.TestUtil.randomTopic; import static com.baidu.bifromq.dist.TestUtil.randomTopicFilter; -import static com.baidu.bifromq.dist.util.TopicUtil.escape; -import static com.baidu.bifromq.dist.util.TopicUtil.fastJoin; -import static com.baidu.bifromq.dist.util.TopicUtil.parse; +import static com.baidu.bifromq.util.TopicUtil.escape; +import static com.baidu.bifromq.util.TopicUtil.fastJoin; +import static com.baidu.bifromq.util.TopicUtil.parse; import static com.baidu.bifromq.util.TopicConst.NUL; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import com.baidu.bifromq.dist.TestUtil; -import com.baidu.bifromq.dist.util.TopicUtil; +import com.baidu.bifromq.util.TopicUtil; import com.baidu.bifromq.util.TopicConst; import java.util.ArrayList; import java.util.HashSet; diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/trie/benchmark/TopicTrieBuilderBenchmarkState.java b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/trie/benchmark/TopicTrieBuilderBenchmarkState.java index 52d4d9120..70fcbba3f 100644 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/trie/benchmark/TopicTrieBuilderBenchmarkState.java +++ b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/trie/benchmark/TopicTrieBuilderBenchmarkState.java @@ -15,7 +15,7 @@ import com.baidu.bifromq.dist.trie.TopicTrieNode; import com.baidu.bifromq.dist.TestUtil; -import com.baidu.bifromq.dist.util.TopicUtil; +import com.baidu.bifromq.util.TopicUtil; import java.util.List; import lombok.extern.slf4j.Slf4j; import org.openjdk.jmh.annotations.Level; diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/TopicUtilTest.java b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/TopicUtilTest.java deleted file mode 100644 index 8dab26511..000000000 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/TopicUtilTest.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright (c) 2023. The BifroMQ Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - */ - -package com.baidu.bifromq.dist.util; - - -import static com.baidu.bifromq.dist.util.TopicUtil.escape; -import static com.baidu.bifromq.dist.util.TopicUtil.isWildcardTopicFilter; -import static com.baidu.bifromq.dist.util.TopicUtil.parse; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; - -import com.google.common.collect.Lists; -import lombok.extern.slf4j.Slf4j; -import org.testng.annotations.Test; - -@Slf4j -public class TopicUtilTest { - @Test - public void check() { - assertTrue(isWildcardTopicFilter("#")); - assertTrue(isWildcardTopicFilter("+")); - assertTrue(isWildcardTopicFilter("/#")); - assertTrue(isWildcardTopicFilter("/+")); - assertFalse(isWildcardTopicFilter("/")); - } - - @Test - public void testParse() { - assertEquals(parse("", false), Lists.newArrayList("")); - assertEquals(parse(" ", false), Lists.newArrayList(" ")); - assertEquals(parse("/", false), Lists.newArrayList("", "")); - assertEquals(parse(escape("/"), true), Lists.newArrayList("", "")); - assertEquals(parse("//", false), Lists.newArrayList("", "", "")); - assertEquals(parse(escape("//"), true), Lists.newArrayList("", "", "")); - assertEquals(parse(" //", false), Lists.newArrayList(" ", "", "")); - assertEquals(parse(escape(" //"), true), Lists.newArrayList(" ", "", "")); - assertEquals(parse(" / / ", false), Lists.newArrayList(" ", " ", " ")); - assertEquals(parse(escape(" / / "), true), Lists.newArrayList(" ", " ", " ")); - assertEquals(parse("a/", false), Lists.newArrayList("a", "")); - assertEquals(parse(escape("a/"), true), Lists.newArrayList("a", "")); - assertEquals(parse("a/b", false), Lists.newArrayList("a", "b")); - assertEquals(parse(escape("a/b"), true), Lists.newArrayList("a", "b")); - assertEquals(parse("a/b/", false), Lists.newArrayList("a", "b", "")); - assertEquals(parse(escape("a/b/"), true), Lists.newArrayList("a", "b", "")); - } - -} diff --git a/bifromq-dist/bifromq-dist-server/src/main/java/com/baidu/bifromq/dist/server/scheduler/DistCallScheduler.java b/bifromq-dist/bifromq-dist-server/src/main/java/com/baidu/bifromq/dist/server/scheduler/DistCallScheduler.java index 14347d63d..8423975c2 100644 --- a/bifromq-dist/bifromq-dist-server/src/main/java/com/baidu/bifromq/dist/server/scheduler/DistCallScheduler.java +++ b/bifromq-dist/bifromq-dist-server/src/main/java/com/baidu/bifromq/dist/server/scheduler/DistCallScheduler.java @@ -39,7 +39,7 @@ import com.baidu.bifromq.dist.rpc.proto.DistServiceROCoProcInput; import com.baidu.bifromq.dist.trie.TopicFilterIterator; import com.baidu.bifromq.dist.trie.TopicTrieNode; -import com.baidu.bifromq.dist.util.TopicUtil; +import com.baidu.bifromq.util.TopicUtil; import com.baidu.bifromq.plugin.settingprovider.ISettingProvider; import com.baidu.bifromq.plugin.settingprovider.Setting; import com.baidu.bifromq.sysprops.props.DataPlaneBurstLatencyMillis; diff --git a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/DeliverExecutorGroup.java b/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/DeliverExecutorGroup.java index 49d2dbc00..8299e724f 100644 --- a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/DeliverExecutorGroup.java +++ b/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/DeliverExecutorGroup.java @@ -13,7 +13,7 @@ package com.baidu.bifromq.dist.worker; -import static com.baidu.bifromq.dist.util.TopicUtil.escape; +import static com.baidu.bifromq.util.TopicUtil.escape; import static com.baidu.bifromq.metrics.TenantMetric.MqttPersistentFanOutBytes; import static com.baidu.bifromq.plugin.eventcollector.ThreadLocalEventPool.getLocal; import static com.bifromq.plugin.resourcethrottler.TenantResourceType.TotalPersistentFanOutBytesPerSeconds; diff --git a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/DistWorkerCoProc.java b/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/DistWorkerCoProc.java index ad8b92af9..4adc57cd6 100644 --- a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/DistWorkerCoProc.java +++ b/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/DistWorkerCoProc.java @@ -28,7 +28,7 @@ import static com.baidu.bifromq.dist.entity.EntityUtil.toGroupMatchRecordKey; import static com.baidu.bifromq.dist.entity.EntityUtil.toNormalMatchRecordKey; import static com.baidu.bifromq.dist.entity.EntityUtil.toScopedTopicFilter; -import static com.baidu.bifromq.dist.util.TopicUtil.isNormalTopicFilter; +import static com.baidu.bifromq.util.TopicUtil.isNormalTopicFilter; import static java.util.Collections.singletonMap; import com.baidu.bifromq.basekv.proto.Boundary; diff --git a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/index/TopicIndex.java b/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/TopicIndex.java similarity index 81% rename from bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/index/TopicIndex.java rename to bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/TopicIndex.java index 32ea64e6b..8b0a6672c 100644 --- a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/index/TopicIndex.java +++ b/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/TopicIndex.java @@ -11,13 +11,15 @@ * See the License for the specific language governing permissions and limitations under the License. */ -package com.baidu.bifromq.dist.worker.index; +package com.baidu.bifromq.dist.worker; import static com.baidu.bifromq.util.TopicConst.MULTI_WILDCARD; import static com.baidu.bifromq.util.TopicConst.SINGLE_WILDCARD; import static com.baidu.bifromq.util.TopicConst.SYS_PREFIX; -import com.baidu.bifromq.dist.util.TopicUtil; +import com.baidu.bifromq.util.TopicUtil; +import com.baidu.bifromq.util.index.Branch; +import com.baidu.bifromq.util.index.TopicLevelTrie; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -35,6 +37,8 @@ public Map, Action> selectBranch(Map> branches, if (currentLevel < topicLevels.size() - 1) { // not last level String topicLevelToMatch = topicLevels.get(currentLevel); + boolean matchParent = currentLevel + 1 == topicLevels.size() - 1 + && topicLevels.get(currentLevel + 1).equals(MULTI_WILDCARD); switch (topicLevelToMatch) { case SINGLE_WILDCARD -> { Map, Action> result = new HashMap<>(); @@ -44,30 +48,15 @@ public Map, Action> selectBranch(Map> branches, // + skip SYS topic continue; } - result.put(branch, Action.CONTINUE); - } - return result; - } - case MULTI_WILDCARD -> { - Map, Action> result = new HashMap<>(); - for (Map.Entry> entry : branches.entrySet()) { - Branch branch = entry.getValue(); - if (currentLevel == 0 && entry.getKey().startsWith(SYS_PREFIX)) { - // # skip SYS topic - continue; - } - result.put(branch, Action.MATCH_AND_CONTINUE); + result.put(branch, matchParent ? Action.MATCH_AND_CONTINUE : Action.CONTINUE); } return result; } default -> { + assert !topicLevelToMatch.equals(MULTI_WILDCARD) : "MULTI_WILDCARD should be the last level"; if (branches.containsKey(topicLevelToMatch)) { - if (currentLevel + 1 < topicLevels.size() - && topicLevels.get(currentLevel + 1).equals(MULTI_WILDCARD)) { - // # match parent level - return Map.of(branches.get(topicLevelToMatch), Action.MATCH_AND_CONTINUE); - } - return Map.of(branches.get(topicLevelToMatch), Action.CONTINUE); + return Map.of(branches.get(topicLevelToMatch), + matchParent ? Action.MATCH_AND_CONTINUE : Action.CONTINUE); } return Collections.emptyMap(); } diff --git a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/cache/TenantRouteCache.java b/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/cache/TenantRouteCache.java index 9864073aa..5852bbbdf 100644 --- a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/cache/TenantRouteCache.java +++ b/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/cache/TenantRouteCache.java @@ -16,10 +16,10 @@ import com.baidu.bifromq.basekv.proto.Boundary; import com.baidu.bifromq.dist.entity.GroupMatching; import com.baidu.bifromq.dist.entity.Matching; -import com.baidu.bifromq.dist.worker.index.TopicIndex; import com.baidu.bifromq.metrics.ITenantMeter; import com.baidu.bifromq.metrics.TenantMetric; import com.baidu.bifromq.sysprops.props.DistMaxCachedRoutesPerTenant; +import com.baidu.bifromq.dist.worker.TopicIndex; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Expiry; diff --git a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/cache/TenantRouteMatcher.java b/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/cache/TenantRouteMatcher.java index def45e2d4..f0eaa7362 100644 --- a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/cache/TenantRouteMatcher.java +++ b/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/cache/TenantRouteMatcher.java @@ -24,7 +24,7 @@ import com.baidu.bifromq.dist.entity.Matching; import com.baidu.bifromq.dist.trie.TopicFilterIterator; import com.baidu.bifromq.dist.trie.TopicTrieNode; -import com.baidu.bifromq.dist.util.TopicUtil; +import com.baidu.bifromq.util.TopicUtil; import com.google.common.collect.Sets; import com.google.protobuf.ByteString; import io.micrometer.core.instrument.Timer; diff --git a/bifromq-dist/bifromq-dist-worker/src/test/java/com/baidu/bifromq/dist/worker/index/TopicIndexTest.java b/bifromq-dist/bifromq-dist-worker/src/test/java/com/baidu/bifromq/dist/worker/TopicIndexTest.java similarity index 88% rename from bifromq-dist/bifromq-dist-worker/src/test/java/com/baidu/bifromq/dist/worker/index/TopicIndexTest.java rename to bifromq-dist/bifromq-dist-worker/src/test/java/com/baidu/bifromq/dist/worker/TopicIndexTest.java index 4c1b04be9..669a0e7c4 100644 --- a/bifromq-dist/bifromq-dist-worker/src/test/java/com/baidu/bifromq/dist/worker/index/TopicIndexTest.java +++ b/bifromq-dist/bifromq-dist-worker/src/test/java/com/baidu/bifromq/dist/worker/TopicIndexTest.java @@ -11,12 +11,12 @@ * See the License for the specific language governing permissions and limitations under the License. */ -package com.baidu.bifromq.dist.worker.index; +package com.baidu.bifromq.dist.worker; import static org.testng.Assert.assertEquals; -import com.google.common.collect.Sets; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -48,16 +48,21 @@ public void testMatch() { assertMatch(topicIndex.match("")); assertMatch(topicIndex.match("fakeTopic")); - assertMatch(topicIndex.match("+"), "a"); assertMatch(topicIndex.match("#"), "/", "/a", "/b", "a", "a/", "a/b", "a/b/c"); + assertMatch(topicIndex.match("+"), "a"); + assertMatch(topicIndex.match("+/#"), "/", "/a", "/b", "a", "a/", "a/b", "a/b/c"); + assertMatch(topicIndex.match("+/+"), "/", "/a", "/b", "a/", "a/b"); + assertMatch(topicIndex.match("+/+/#"), "/", "/a", "/b", "a/", "a/b", "a/b/c"); assertMatch(topicIndex.match("/+"), "/", "/a", "/b"); + assertMatch(topicIndex.match("/+/#"), "/", "/a", "/b"); assertMatch(topicIndex.match("/#"), "/", "/a", "/b"); assertMatch(topicIndex.match("a/+"), "a/", "a/b"); assertMatch(topicIndex.match("a/#"), "a", "a/", "a/b", "a/b/c"); assertMatch(topicIndex.match("$a/+"), "$a/", "$a/b"); + assertMatch(topicIndex.match("$a/+/#"), "$a/", "$a/b"); assertMatch(topicIndex.match("$a/#"), "$a", "$a/", "$a/b"); } @@ -104,6 +109,6 @@ private CompletableFuture add(String... topics) { } private void assertMatch(List matches, String... expected) { - assertEquals(Sets.newHashSet(matches), Set.of(expected)); + assertEquals(new HashSet<>(matches), Set.of(expected)); } } diff --git a/bifromq-dist/bifromq-dist-worker/src/test/java/com/baidu/bifromq/dist/worker/cache/TenantRouteCacheTest.java b/bifromq-dist/bifromq-dist-worker/src/test/java/com/baidu/bifromq/dist/worker/cache/TenantRouteCacheTest.java index 1700b7660..0767b92ce 100644 --- a/bifromq-dist/bifromq-dist-worker/src/test/java/com/baidu/bifromq/dist/worker/cache/TenantRouteCacheTest.java +++ b/bifromq-dist/bifromq-dist-worker/src/test/java/com/baidu/bifromq/dist/worker/cache/TenantRouteCacheTest.java @@ -15,7 +15,7 @@ import static com.baidu.bifromq.basekv.utils.BoundaryUtil.FULL_BOUNDARY; import static com.baidu.bifromq.dist.entity.EntityUtil.toQInboxId; -import static com.baidu.bifromq.dist.util.TopicUtil.unescape; +import static com.baidu.bifromq.util.TopicUtil.unescape; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; diff --git a/bifromq-plugin/bifromq-plugin-event-collector/src/main/java/com/baidu/bifromq/plugin/eventcollector/mqttbroker/retainhandling/MsgRetainedError.java b/bifromq-plugin/bifromq-plugin-event-collector/src/main/java/com/baidu/bifromq/plugin/eventcollector/mqttbroker/retainhandling/MsgRetainedError.java index c7f212111..f8c29a96a 100644 --- a/bifromq-plugin/bifromq-plugin-event-collector/src/main/java/com/baidu/bifromq/plugin/eventcollector/mqttbroker/retainhandling/MsgRetainedError.java +++ b/bifromq-plugin/bifromq-plugin-event-collector/src/main/java/com/baidu/bifromq/plugin/eventcollector/mqttbroker/retainhandling/MsgRetainedError.java @@ -18,11 +18,13 @@ import java.nio.ByteBuffer; import lombok.Getter; import lombok.Setter; +import lombok.ToString; import lombok.experimental.Accessors; @Getter @Setter @Accessors(fluent = true, chain = true) +@ToString(callSuper = true) public final class MsgRetainedError extends RetainEvent { private String topic; diff --git a/bifromq-retain/bifromq-retain-coproc-proto/pom.xml b/bifromq-retain/bifromq-retain-coproc-proto/pom.xml index 63fb48f8e..b7c435ff3 100644 --- a/bifromq-retain/bifromq-retain-coproc-proto/pom.xml +++ b/bifromq-retain/bifromq-retain-coproc-proto/pom.xml @@ -25,6 +25,10 @@ bifromq-retain-coproc-proto + + com.baidu.bifromq + bifromq-util + com.baidu.bifromq bifromq-common-type diff --git a/bifromq-retain/bifromq-retain-coproc-proto/src/main/java/com/baidu/bifromq/retain/utils/KeyUtil.java b/bifromq-retain/bifromq-retain-coproc-proto/src/main/java/com/baidu/bifromq/retain/utils/KeyUtil.java index cd1b402b6..aa7fbfdba 100644 --- a/bifromq-retain/bifromq-retain-coproc-proto/src/main/java/com/baidu/bifromq/retain/utils/KeyUtil.java +++ b/bifromq-retain/bifromq-retain-coproc-proto/src/main/java/com/baidu/bifromq/retain/utils/KeyUtil.java @@ -13,9 +13,12 @@ package com.baidu.bifromq.retain.utils; -import static com.baidu.bifromq.retain.utils.TopicUtil.NUL; -import static com.baidu.bifromq.retain.utils.TopicUtil.escape; -import static com.baidu.bifromq.retain.utils.TopicUtil.parse; +import static com.baidu.bifromq.util.TopicConst.MULTI_WILDCARD; +import static com.baidu.bifromq.util.TopicConst.NUL; +import static com.baidu.bifromq.util.TopicConst.SINGLE_WILDCARD; +import static com.baidu.bifromq.util.TopicUtil.escape; +import static com.baidu.bifromq.util.TopicUtil.fastJoin; +import static com.baidu.bifromq.util.TopicUtil.parse; import static com.google.protobuf.ByteString.copyFromUtf8; import static com.google.protobuf.UnsafeByteOperations.unsafeWrap; @@ -35,11 +38,30 @@ public static ByteString tenantNS(String tenantId) { return SCHEMA_VER.concat(toByteString(tenantIdBS.size())).concat(tenantIdBS); } + public static ByteString retainKey(String tenantId, String topic) { + return retainKey(tenantNS(tenantId), topic); + } + public static ByteString retainKey(ByteString tenantNS, String topic) { return tenantNS.concat(unsafeWrap(new byte[] {(byte) parse(topic, false).size()})) .concat(copyFromUtf8(escape(topic))); } + public static List filterPrefix(List filterLevels) { + int firstWildcard = filterLevels.indexOf(SINGLE_WILDCARD); + if (firstWildcard == -1) { + assert filterLevels.get(filterLevels.size() - 1).equals(MULTI_WILDCARD); + firstWildcard = filterLevels.size() - 1; + } + return filterLevels.subList(0, firstWildcard); + } + + public static ByteString retainKeyPrefix(String tenantId, int levels, List filterPrefix) { + return tenantNS(tenantId) + .concat(unsafeWrap(new byte[] {(byte) levels})) + .concat(copyFromUtf8(fastJoin(NUL, filterPrefix))); + } + public static ByteString retainKeyPrefix(ByteString tenantNS, List topicFilterLevels) { ByteString prefix = ByteString.empty(); byte leastLevels = 0; diff --git a/bifromq-retain/bifromq-retain-coproc-proto/src/main/java/com/baidu/bifromq/retain/utils/TopicUtil.java b/bifromq-retain/bifromq-retain-coproc-proto/src/main/java/com/baidu/bifromq/retain/utils/TopicUtil.java index 3d0a049fc..52da490c1 100644 --- a/bifromq-retain/bifromq-retain-coproc-proto/src/main/java/com/baidu/bifromq/retain/utils/TopicUtil.java +++ b/bifromq-retain/bifromq-retain-coproc-proto/src/main/java/com/baidu/bifromq/retain/utils/TopicUtil.java @@ -13,22 +13,9 @@ package com.baidu.bifromq.retain.utils; -import java.util.ArrayList; import java.util.List; public class TopicUtil { - public static final String NUL = String.valueOf('\u0000'); - public static final char TOPIC_SEPARATOR = '/'; - - public static boolean isWildcardTopicFilter(String topicFilter) { - return topicFilter.contains("+") || topicFilter.contains("#"); - } - - public static String escape(String topicFilter) { - assert !topicFilter.contains(NUL); - return topicFilter.replace("/", NUL); - } - public static boolean match(List topicLevels, List matchLevels) { boolean matchMultiLevel = matchLevels.get(matchLevels.size() - 1).equals("#"); if (!matchMultiLevel && topicLevels.size() != matchLevels.size()) { @@ -49,22 +36,4 @@ public static boolean match(List topicLevels, List matchLevels) } return true; } - - public static List parse(String topic, boolean isEscaped) { - // parse a topic or topic filter string into a list of topic levels - // eg. "/" -> ["",""], "/a" -> ["",a], "a/" -> [a,""] - List topicLevels = new ArrayList<>(); - char splitter = isEscaped ? '\u0000' : TOPIC_SEPARATOR; - StringBuilder tl = new StringBuilder(); - for (int i = 0; i < topic.length(); i++) { - if (topic.charAt(i) == splitter) { - topicLevels.add(tl.toString()); - tl.delete(0, tl.length()); - } else { - tl.append(topic.charAt(i)); - } - } - topicLevels.add(tl.toString()); - return topicLevels; - } } diff --git a/bifromq-retain/bifromq-retain-coproc-proto/src/test/java/com/baidu/bifromq/retain/utils/KeyUtilTest.java b/bifromq-retain/bifromq-retain-coproc-proto/src/test/java/com/baidu/bifromq/retain/utils/KeyUtilTest.java index ba03593be..e8874cc10 100644 --- a/bifromq-retain/bifromq-retain-coproc-proto/src/test/java/com/baidu/bifromq/retain/utils/KeyUtilTest.java +++ b/bifromq-retain/bifromq-retain-coproc-proto/src/test/java/com/baidu/bifromq/retain/utils/KeyUtilTest.java @@ -20,8 +20,8 @@ import static com.baidu.bifromq.retain.utils.KeyUtil.retainKey; import static com.baidu.bifromq.retain.utils.KeyUtil.retainKeyPrefix; import static com.baidu.bifromq.retain.utils.KeyUtil.tenantNS; -import static com.baidu.bifromq.retain.utils.TopicUtil.NUL; -import static com.baidu.bifromq.retain.utils.TopicUtil.parse; +import static com.baidu.bifromq.util.TopicConst.NUL; +import static com.baidu.bifromq.util.TopicUtil.parse; import static com.google.protobuf.ByteString.copyFromUtf8; import static com.google.protobuf.UnsafeByteOperations.unsafeWrap; import static org.testng.Assert.assertEquals; diff --git a/bifromq-retain/bifromq-retain-coproc-proto/src/test/java/com/baidu/bifromq/retain/utils/TopicUtilTest.java b/bifromq-retain/bifromq-retain-coproc-proto/src/test/java/com/baidu/bifromq/retain/utils/TopicUtilTest.java index 9bca21c38..58c5c7456 100644 --- a/bifromq-retain/bifromq-retain-coproc-proto/src/test/java/com/baidu/bifromq/retain/utils/TopicUtilTest.java +++ b/bifromq-retain/bifromq-retain-coproc-proto/src/test/java/com/baidu/bifromq/retain/utils/TopicUtilTest.java @@ -14,7 +14,7 @@ package com.baidu.bifromq.retain.utils; import static com.baidu.bifromq.retain.utils.TopicUtil.match; -import static com.baidu.bifromq.retain.utils.TopicUtil.parse; +import static com.baidu.bifromq.util.TopicUtil.parse; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; diff --git a/bifromq-retain/bifromq-retain-gc/src/main/java/com/baidu/bifromq/retain/store/gc/RetainStoreGCProcessor.java b/bifromq-retain/bifromq-retain-gc/src/main/java/com/baidu/bifromq/retain/store/gc/RetainStoreGCProcessor.java index 039b09805..8687a7c1d 100644 --- a/bifromq-retain/bifromq-retain-gc/src/main/java/com/baidu/bifromq/retain/store/gc/RetainStoreGCProcessor.java +++ b/bifromq-retain/bifromq-retain-gc/src/main/java/com/baidu/bifromq/retain/store/gc/RetainStoreGCProcessor.java @@ -14,17 +14,17 @@ package com.baidu.bifromq.retain.store.gc; import static com.baidu.bifromq.basekv.client.KVRangeRouterUtil.findByBoundary; -import static com.baidu.bifromq.basekv.client.KVRangeRouterUtil.findByKey; import static com.baidu.bifromq.basekv.utils.BoundaryUtil.FULL_BOUNDARY; +import static com.baidu.bifromq.basekv.utils.BoundaryUtil.upperBound; import static com.baidu.bifromq.retain.utils.KeyUtil.tenantNS; import static com.baidu.bifromq.retain.utils.MessageUtil.buildGCRequest; import com.baidu.bifromq.basekv.client.IBaseKVStoreClient; import com.baidu.bifromq.basekv.client.KVRangeSetting; +import com.baidu.bifromq.basekv.proto.Boundary; import com.baidu.bifromq.basekv.store.proto.KVRangeRWRequest; import com.baidu.bifromq.basekv.store.proto.RWCoProcInput; import java.util.Arrays; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; @@ -44,31 +44,25 @@ public CompletableFuture gc(long reqId, @Nullable String tenantId, @Nullable Integer expirySeconds, long now) { - if (tenantId != null) { - Optional rangeSettingOpt = - findByKey(tenantNS(tenantId), storeClient.latestEffectiveRouter()); - if (rangeSettingOpt.isEmpty()) { - return CompletableFuture.completedFuture(Result.ERROR); - } - KVRangeSetting rangeSetting = rangeSettingOpt.get(); - return gcRange(reqId, rangeSetting, tenantId, expirySeconds, now); - } else { - CompletableFuture[] gcFutures = findByBoundary(FULL_BOUNDARY, storeClient.latestEffectiveRouter()) - .stream() - .filter(k -> localServerId == null || k.leader.equals(localServerId)) - .map(rangeSetting -> gcRange(reqId, rangeSetting, null, expirySeconds, now)) - .toArray(CompletableFuture[]::new); - return CompletableFuture.allOf(gcFutures) - .thenApply(v -> Arrays.stream(gcFutures).map(CompletableFuture::join).toList()) - .thenApply(v -> { - log.debug("All range gc succeed"); - return v.stream().anyMatch(r -> r != Result.OK) ? Result.ERROR : Result.OK; - }) - .exceptionally(e -> { - log.error("Some range gc failed"); - return Result.ERROR; - }); - } + Boundary boundary = tenantId != null ? Boundary.newBuilder() + .setStartKey(tenantNS(tenantId)) + .setEndKey(upperBound(tenantNS(tenantId))) + .build() : FULL_BOUNDARY; + CompletableFuture[] gcFutures = findByBoundary(boundary, storeClient.latestEffectiveRouter()) + .stream() + .filter(k -> localServerId == null || k.leader.equals(localServerId)) + .map(rangeSetting -> gcRange(reqId, rangeSetting, tenantId, expirySeconds, now)) + .toArray(CompletableFuture[]::new); + return CompletableFuture.allOf(gcFutures) + .thenApply(v -> Arrays.stream(gcFutures).map(CompletableFuture::join).toList()) + .thenApply(v -> { + log.debug("All range gc succeed"); + return v.stream().anyMatch(r -> r != Result.OK) ? Result.ERROR : Result.OK; + }) + .exceptionally(e -> { + log.error("Some range gc failed"); + return Result.ERROR; + }); } private CompletableFuture gcRange(long reqId, diff --git a/bifromq-retain/bifromq-retain-gc/src/test/java/com/baidu/bifromq/retain/store/gc/RetainStoreGCProcessorTest.java b/bifromq-retain/bifromq-retain-gc/src/test/java/com/baidu/bifromq/retain/store/gc/RetainStoreGCProcessorTest.java index 3a7a8eec6..cd5764a8a 100644 --- a/bifromq-retain/bifromq-retain-gc/src/test/java/com/baidu/bifromq/retain/store/gc/RetainStoreGCProcessorTest.java +++ b/bifromq-retain/bifromq-retain-gc/src/test/java/com/baidu/bifromq/retain/store/gc/RetainStoreGCProcessorTest.java @@ -66,7 +66,7 @@ public void testGCNonExistTenant() { IRetainStoreGCProcessor.Result result = gcProcessor.gc(System.nanoTime(), tenantId, null, HLC.INST.getPhysical()).join(); - assertEquals(result, IRetainStoreGCProcessor.Result.ERROR); + assertEquals(result, IRetainStoreGCProcessor.Result.OK); } @Test diff --git a/bifromq-retain/bifromq-retain-server/pom.xml b/bifromq-retain/bifromq-retain-server/pom.xml index 56e9fc465..fa4e0b418 100644 --- a/bifromq-retain/bifromq-retain-server/pom.xml +++ b/bifromq-retain/bifromq-retain-server/pom.xml @@ -36,6 +36,10 @@ com.baidu.bifromq bifromq-common-type + + com.baidu.bifromq + bifromq-util + com.baidu.bifromq bifromq-metrics diff --git a/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/AbstractRetainServer.java b/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/AbstractRetainServer.java index f8a93f2ff..8711202ca 100644 --- a/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/AbstractRetainServer.java +++ b/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/AbstractRetainServer.java @@ -27,7 +27,7 @@ abstract class AbstractRetainServer implements IRetainServer { this.retainService = new RetainService( new RetainStoreGCProcessor(builder.retainStoreClient, null), new MessageDeliverer(builder.subBrokerManager), - new MatchCallScheduler(builder.retainStoreClient), + new MatchCallScheduler(builder.retainStoreClient, builder.settingProvider), new RetainCallScheduler(builder.retainStoreClient)); } diff --git a/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/RetainService.java b/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/RetainService.java index 2cb5ad6d4..8ed88470e 100644 --- a/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/RetainService.java +++ b/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/RetainService.java @@ -63,13 +63,13 @@ public void retain(RetainRequest request, StreamObserver responseOb log.trace("Handling retain request:\n{}", request); response((tenantId, metadata) -> retainCallScheduler.schedule(request) .exceptionally(e -> { - log.error("Retain failed", e); if (e instanceof BackPressureException || e.getCause() instanceof BackPressureException) { return RetainReply.newBuilder() .setReqId(request.getReqId()) .setResult(RetainReply.Result.BACK_PRESSURE_REJECTED) .build(); } + log.debug("Retain failed", e); return RetainReply.newBuilder() .setReqId(request.getReqId()) .setResult(RetainReply.Result.ERROR) @@ -126,13 +126,13 @@ public void match(MatchRequest request, StreamObserver responseObser .build()); }) .exceptionally(e -> { - log.error("Match failed", e); if (e instanceof BackPressureException || e.getCause() instanceof BackPressureException) { return MatchReply.newBuilder() .setReqId(request.getReqId()) .setResult(MatchReply.Result.BACK_PRESSURE_REJECTED) .build(); } + log.debug("Match failed", e); return MatchReply.newBuilder() .setReqId(request.getReqId()) .setResult(MatchReply.Result.ERROR) diff --git a/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/scheduler/BatchMatchCall.java b/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/scheduler/BatchMatchCall.java index f88a381fc..7f6f52e4b 100644 --- a/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/scheduler/BatchMatchCall.java +++ b/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/scheduler/BatchMatchCall.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023. The BifroMQ Authors. All Rights Reserved. + * Copyright (c) 2024. The BifroMQ Authors. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,73 +13,198 @@ package com.baidu.bifromq.retain.server.scheduler; +import static com.baidu.bifromq.basekv.client.KVRangeRouterUtil.findByBoundary; +import static com.baidu.bifromq.basekv.client.KVRangeRouterUtil.findByKey; +import static com.baidu.bifromq.retain.rpc.proto.MatchReply.Result.ERROR; +import static com.baidu.bifromq.retain.rpc.proto.MatchReply.Result.OK; +import static com.baidu.bifromq.retain.utils.KeyUtil.filterPrefix; +import static com.baidu.bifromq.retain.utils.KeyUtil.retainKeyPrefix; +import static com.baidu.bifromq.retain.utils.KeyUtil.tenantNS; +import static com.baidu.bifromq.util.TopicUtil.isMultiWildcardTopicFilter; +import static com.baidu.bifromq.util.TopicUtil.isNormalTopicFilter; +import static com.baidu.bifromq.util.TopicUtil.isWildcardTopicFilter; +import static com.baidu.bifromq.util.TopicUtil.parse; + import com.baidu.bifromq.basehlc.HLC; import com.baidu.bifromq.basekv.client.IBaseKVStoreClient; -import com.baidu.bifromq.basekv.client.scheduler.BatchQueryCall; -import com.baidu.bifromq.basekv.client.scheduler.QueryCallBatcherKey; -import com.baidu.bifromq.basekv.proto.KVRangeId; +import com.baidu.bifromq.basekv.client.KVRangeSetting; +import com.baidu.bifromq.basekv.proto.Boundary; +import com.baidu.bifromq.basekv.store.proto.KVRangeRORequest; import com.baidu.bifromq.basekv.store.proto.ROCoProcInput; -import com.baidu.bifromq.basekv.store.proto.ROCoProcOutput; +import com.baidu.bifromq.basekv.store.proto.ReplyCode; +import com.baidu.bifromq.basescheduler.IBatchCall; import com.baidu.bifromq.basescheduler.ICallTask; +import com.baidu.bifromq.plugin.settingprovider.ISettingProvider; +import com.baidu.bifromq.plugin.settingprovider.Setting; +import com.baidu.bifromq.retain.rpc.proto.BatchMatchReply; import com.baidu.bifromq.retain.rpc.proto.BatchMatchRequest; import com.baidu.bifromq.retain.rpc.proto.MatchParam; import com.baidu.bifromq.retain.rpc.proto.MatchReply; import com.baidu.bifromq.retain.rpc.proto.MatchResult; -import com.baidu.bifromq.retain.rpc.proto.MatchResultPack; import com.baidu.bifromq.retain.rpc.proto.RetainServiceROCoProcInput; -import java.time.Duration; +import com.baidu.bifromq.retain.utils.KeyUtil; +import com.baidu.bifromq.type.TopicMessage; +import com.google.common.primitives.UnsignedBytes; +import com.google.protobuf.ByteString; +import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.NavigableMap; +import java.util.Optional; import java.util.Queue; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class BatchMatchCall implements IBatchCall { + private final MatchCallBatcherKey batcherKey; + private final IBaseKVStoreClient retainStoreClient; + private final ISettingProvider settingProvider; + private final Queue> tasks = new ArrayDeque<>(128); + private Set topicFilters = new HashSet<>(128); -public class BatchMatchCall extends BatchQueryCall { - protected BatchMatchCall(KVRangeId rangeId, - IBaseKVStoreClient storeClient, - Duration pipelineExpiryTime) { - super(rangeId, storeClient, true, pipelineExpiryTime); + + public BatchMatchCall(MatchCallBatcherKey batcherKey, + IBaseKVStoreClient retainStoreClient, + ISettingProvider settingProvider) { + this.batcherKey = batcherKey; + this.retainStoreClient = retainStoreClient; + this.settingProvider = settingProvider; } @Override - protected ROCoProcInput makeBatch(Iterator matchRequestIterator) { - Map matchParamBuilders = new HashMap<>(128); - matchRequestIterator.forEachRemaining(request -> - matchParamBuilders.computeIfAbsent(request.tenantId(), k -> MatchParam.newBuilder()) - .setNow(HLC.INST.getPhysical()) - .putTopicFilters(request.topicFilter(), request.limit())); - long reqId = System.nanoTime(); - BatchMatchRequest.Builder reqBuilder = BatchMatchRequest - .newBuilder() - .setReqId(reqId); - matchParamBuilders.forEach((tenantId, matchParamsBuilder) -> - reqBuilder.putMatchParams(tenantId, matchParamsBuilder.build())); + public void add(ICallTask task) { + tasks.add(task); + topicFilters.add(task.call().topicFilter()); + } - return ROCoProcInput.newBuilder() - .setRetainService(RetainServiceROCoProcInput.newBuilder() - .setBatchMatch(reqBuilder.build()) - .build()) - .build(); + @Override + public void reset() { + topicFilters = new HashSet<>(128); } @Override - protected void handleOutput(Queue> batchedTasks, - ROCoProcOutput output) { - ICallTask task; - while ((task = batchedTasks.poll()) != null) { - task.resultPromise().complete(new MatchCallResult(MatchReply.Result.OK, output.getRetainService() - .getBatchMatch() - .getResultPackMap() - .getOrDefault(task.call().tenantId(), - MatchResultPack.getDefaultInstance()) - .getResultsOrDefault(task.call().topicFilter(), - MatchResult.getDefaultInstance()).getOk() - .getMessagesList())); + public CompletableFuture execute() { + long now = HLC.INST.getPhysical(); + long reqId = System.nanoTime(); + Map> topicFiltersByRange = + rangeLookup(topicFilters, retainStoreClient.latestEffectiveRouter()); + List> futures = new ArrayList<>(topicFiltersByRange.size()); + int limit = settingProvider.provide(Setting.RetainMessageMatchLimit, batcherKey.tenantId()); + for (KVRangeSetting rangeSetting : topicFiltersByRange.keySet()) { + Set topicFilters = topicFiltersByRange.get(rangeSetting); + futures.add(queryCoProc(BatchMatchRequest.newBuilder() + .putMatchParams(batcherKey.tenantId(), MatchParam.newBuilder() + .putAllTopicFilters(topicFilters.stream().collect(Collectors.toMap(k -> k, v -> limit))) + .setNow(now) + .build()) + .setReqId(reqId) + .build(), rangeSetting)); } + return CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)) + .handle((v, e) -> { + ICallTask task; + if (e != null) { + while ((task = tasks.poll()) != null) { + task.resultPromise() + .complete(new MatchCallResult(MatchReply.Result.ERROR, Collections.emptyList())); + } + } else { + // aggregate result from each reply + Map> aggregatedResults = new HashMap<>(); + for (CompletableFuture future : futures) { + BatchMatchReply reply = future.join(); + Map matchResults = + reply.getResultPackMap().get(batcherKey.tenantId()).getResultsMap(); + matchResults.forEach((topic, matchResult) -> + aggregatedResults.computeIfAbsent(topic, k -> new ArrayList<>()).add(matchResult)); + } + while ((task = tasks.poll()) != null) { + List matchResults = aggregatedResults.get(task.call().topicFilter()); + List messages = new LinkedList<>(); + int i = 0; + boolean anySucceed = false; + out: + for (MatchResult matchResult : matchResults) { + if (matchResult.hasOk()) { + for (TopicMessage message : matchResult.getOk().getMessagesList()) { + if (i++ < limit) { + messages.add(message); + } else { + break out; + } + } + anySucceed = true; + } + } + if (anySucceed) { + task.resultPromise().complete(new MatchCallResult(OK, messages)); + } else { + task.resultPromise().complete(new MatchCallResult(ERROR, Collections.emptyList())); + } + } + } + return null; + }); } - @Override - protected void handleException(ICallTask callTask, Throwable e) { - callTask.resultPromise().complete(new MatchCallResult(MatchReply.Result.ERROR, Collections.emptyList())); + private CompletableFuture queryCoProc(BatchMatchRequest request, KVRangeSetting rangeSetting) { + return retainStoreClient.query(rangeSetting.randomReplica(), KVRangeRORequest.newBuilder() + .setReqId(request.getReqId()) + .setKvRangeId(rangeSetting.id) + .setVer(rangeSetting.ver) + .setRoCoProc(ROCoProcInput.newBuilder() + .setRetainService(RetainServiceROCoProcInput.newBuilder() + .setBatchMatch(request) + .build()) + .build()) + .build()) + .thenApply(v -> { + if (v.getCode() == ReplyCode.Ok) { + BatchMatchReply batchMatchReply = v.getRoCoProcResult() + .getRetainService() + .getBatchMatch(); + assert batchMatchReply.getReqId() == request.getReqId(); + return batchMatchReply; + } + log.warn("Failed to exec ro co-proc[code={}]", v.getCode()); + throw new RuntimeException("Failed to exec rw co-proc"); + }); + } + + private Map> rangeLookup(Set topicFilters, + NavigableMap effectiveRouter) { + Map> topicFiltersByRange = new HashMap<>(); + for (String topicFilter : topicFilters) { + // not shared subscription + assert isNormalTopicFilter(topicFilter); + if (isWildcardTopicFilter(topicFilter)) { + List filterLevels = parse(topicFilter, false); + ByteString startKey = isMultiWildcardTopicFilter(topicFilter) + ? retainKeyPrefix(batcherKey.tenantId(), filterLevels.size() - 1, filterPrefix(filterLevels)) : + retainKeyPrefix(batcherKey.tenantId(), filterLevels.size(), filterPrefix(filterLevels)); + ByteString endKey = isMultiWildcardTopicFilter(topicFilter) + ? retainKeyPrefix(batcherKey.tenantId(), UnsignedBytes.MAX_VALUE, filterPrefix(filterLevels)) : + retainKeyPrefix(batcherKey.tenantId(), filterLevels.size() + 1, filterLevels); + Boundary topicBoundary = Boundary.newBuilder().setStartKey(startKey).setEndKey(endKey).build(); + List rangeSettingList = findByBoundary(topicBoundary, effectiveRouter); + for (KVRangeSetting rangeSetting : rangeSettingList) { + topicFiltersByRange.computeIfAbsent(rangeSetting, k -> new HashSet<>()).add(topicFilter); + } + } else { + ByteString retainKey = KeyUtil.retainKey(tenantNS(batcherKey.tenantId()), topicFilter); + Optional rangeSetting = findByKey(retainKey, retainStoreClient.latestEffectiveRouter()); + assert rangeSetting.isPresent(); + topicFiltersByRange.computeIfAbsent(rangeSetting.get(), k -> new HashSet<>()).add(topicFilter); + } + } + return topicFiltersByRange; } } diff --git a/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/scheduler/MatchCallBatcher.java b/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/scheduler/MatchCallBatcher.java new file mode 100644 index 000000000..480a84062 --- /dev/null +++ b/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/scheduler/MatchCallBatcher.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2024. The BifroMQ Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.baidu.bifromq.retain.server.scheduler; + +import com.baidu.bifromq.basekv.client.IBaseKVStoreClient; +import com.baidu.bifromq.basescheduler.Batcher; +import com.baidu.bifromq.basescheduler.IBatchCall; +import com.baidu.bifromq.plugin.settingprovider.ISettingProvider; + +class MatchCallBatcher extends Batcher { + private final IBaseKVStoreClient retainStoreClient; + private final ISettingProvider settingProvider; + + protected MatchCallBatcher(MatchCallBatcherKey batcherKey, + String name, + long tolerableLatencyNanos, + long burstLatencyNanos, + IBaseKVStoreClient retainStoreClient, + ISettingProvider settingProvider) { + super(batcherKey, name, tolerableLatencyNanos, burstLatencyNanos); + this.retainStoreClient = retainStoreClient; + this.settingProvider = settingProvider; + } + + @Override + protected IBatchCall newBatch() { + return new BatchMatchCall(batcherKey, retainStoreClient, settingProvider); + } +} diff --git a/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/scheduler/MatchCallBatcherKey.java b/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/scheduler/MatchCallBatcherKey.java new file mode 100644 index 000000000..09f0e75bf --- /dev/null +++ b/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/scheduler/MatchCallBatcherKey.java @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2024. The BifroMQ Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.baidu.bifromq.retain.server.scheduler; + +public record MatchCallBatcherKey(String tenantId, int queueId) { + +} diff --git a/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/scheduler/MatchCallScheduler.java b/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/scheduler/MatchCallScheduler.java index 9845ac520..dcc5a9c70 100644 --- a/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/scheduler/MatchCallScheduler.java +++ b/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/scheduler/MatchCallScheduler.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023. The BifroMQ Authors. All Rights Reserved. + * Copyright (c) 2024. The BifroMQ Authors. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,60 +13,40 @@ package com.baidu.bifromq.retain.server.scheduler; -import static com.baidu.bifromq.retain.utils.KeyUtil.tenantNS; - import com.baidu.bifromq.basekv.client.IBaseKVStoreClient; -import com.baidu.bifromq.basekv.client.scheduler.QueryCallBatcher; -import com.baidu.bifromq.basekv.client.scheduler.QueryCallBatcherKey; -import com.baidu.bifromq.basekv.client.scheduler.QueryCallScheduler; +import com.baidu.bifromq.basescheduler.BatchCallScheduler; import com.baidu.bifromq.basescheduler.Batcher; -import com.baidu.bifromq.basescheduler.IBatchCall; +import com.baidu.bifromq.plugin.settingprovider.ISettingProvider; import com.baidu.bifromq.sysprops.props.DataPlaneBurstLatencyMillis; import com.baidu.bifromq.sysprops.props.DataPlaneTolerableLatencyMillis; -import com.google.protobuf.ByteString; import java.time.Duration; -import java.util.concurrent.ThreadLocalRandom; -import lombok.extern.slf4j.Slf4j; +import java.util.Optional; + +public class MatchCallScheduler extends BatchCallScheduler + implements IMatchCallScheduler { + private final IBaseKVStoreClient retainStoreClient; + private final ISettingProvider settingProvider; -@Slf4j -public class MatchCallScheduler extends QueryCallScheduler implements IMatchCallScheduler { - public MatchCallScheduler(IBaseKVStoreClient retainStoreClient) { - super("retain_server_match_batcher", retainStoreClient, + public MatchCallScheduler(IBaseKVStoreClient retainStoreClient, ISettingProvider settingProvider) { + super("retain_server_match_batcher", Duration.ofMillis(DataPlaneTolerableLatencyMillis.INSTANCE.get()), Duration.ofSeconds(DataPlaneBurstLatencyMillis.INSTANCE.get())); + this.retainStoreClient = retainStoreClient; + this.settingProvider = settingProvider; } @Override - protected Batcher newBatcher(String name, + protected Batcher newBatcher(String name, long tolerableLatencyNanos, long burstLatencyNanos, - QueryCallBatcherKey batcherKey) { - return new MatchCallBatcher(batcherKey, name, tolerableLatencyNanos, burstLatencyNanos, storeClient); - } - - @Override - protected int selectQueue(MatchCall request) { - return ThreadLocalRandom.current().nextInt(5); + MatchCallBatcherKey key) { + return new MatchCallBatcher(key, name, tolerableLatencyNanos, burstLatencyNanos, + retainStoreClient, settingProvider); } @Override - protected ByteString rangeKey(MatchCall request) { - return tenantNS(request.tenantId()); - } - - public static class MatchCallBatcher extends QueryCallBatcher { - - protected MatchCallBatcher(QueryCallBatcherKey batcherKey, - String name, - long tolerableLatencyNanos, - long burstLatencyNanos, - IBaseKVStoreClient storeClient) { - super(name, tolerableLatencyNanos, burstLatencyNanos, batcherKey, storeClient); - } - - @Override - protected IBatchCall newBatch() { - return new BatchMatchCall(batcherKey.id, storeClient, Duration.ofMinutes(5)); - } + protected Optional find(MatchCall call) { + // TODO: implement multi batcher for tenant + return Optional.of(new MatchCallBatcherKey(call.tenantId(), 0)); } } diff --git a/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/scheduler/RetainCallScheduler.java b/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/scheduler/RetainCallScheduler.java index 03d9a466b..c5307f7b3 100644 --- a/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/scheduler/RetainCallScheduler.java +++ b/bifromq-retain/bifromq-retain-server/src/main/java/com/baidu/bifromq/retain/server/scheduler/RetainCallScheduler.java @@ -13,7 +13,7 @@ package com.baidu.bifromq.retain.server.scheduler; -import static com.baidu.bifromq.retain.utils.KeyUtil.tenantNS; +import static com.baidu.bifromq.retain.utils.KeyUtil.retainKey; import com.baidu.bifromq.basekv.client.IBaseKVStoreClient; import com.baidu.bifromq.basekv.client.scheduler.MutationCallBatcher; @@ -51,7 +51,7 @@ protected Batcher newBatcher @Override protected ByteString rangeKey(RetainRequest request) { - return tenantNS(request.getPublisher().getTenantId()); + return retainKey(request.getPublisher().getTenantId(), request.getTopic()); } private static class RetainCallBatcher extends MutationCallBatcher { diff --git a/bifromq-retain/bifromq-retain-store/pom.xml b/bifromq-retain/bifromq-retain-store/pom.xml index 7a01db48a..9afa76863 100644 --- a/bifromq-retain/bifromq-retain-store/pom.xml +++ b/bifromq-retain/bifromq-retain-store/pom.xml @@ -52,6 +52,10 @@ com.baidu.bifromq base-kv-store-balance-controller + + com.baidu.bifromq + bifromq-util + com.baidu.bifromq bifromq-retain-gc diff --git a/bifromq-retain/bifromq-retain-store/src/main/java/com/baidu/bifromq/retain/store/RetainStoreCoProc.java b/bifromq-retain/bifromq-retain-store/src/main/java/com/baidu/bifromq/retain/store/RetainStoreCoProc.java index 130de1c00..be4639b74 100644 --- a/bifromq-retain/bifromq-retain-store/src/main/java/com/baidu/bifromq/retain/store/RetainStoreCoProc.java +++ b/bifromq-retain/bifromq-retain-store/src/main/java/com/baidu/bifromq/retain/store/RetainStoreCoProc.java @@ -13,16 +13,12 @@ package com.baidu.bifromq.retain.store; -import static com.baidu.bifromq.basekv.utils.BoundaryUtil.compare; -import static com.baidu.bifromq.basekv.utils.BoundaryUtil.upperBound; -import static com.baidu.bifromq.metrics.TenantMetric.MqttRetainNumGauge; -import static com.baidu.bifromq.metrics.TenantMetric.MqttRetainSpaceGauge; +import static com.baidu.bifromq.retain.utils.KeyUtil.isTenantNS; import static com.baidu.bifromq.retain.utils.KeyUtil.parseTenantId; import static com.baidu.bifromq.retain.utils.KeyUtil.parseTenantNS; -import static com.baidu.bifromq.retain.utils.KeyUtil.tenantNS; -import static com.baidu.bifromq.retain.utils.TopicUtil.isWildcardTopicFilter; +import static com.baidu.bifromq.retain.utils.KeyUtil.retainKey; +import static com.baidu.bifromq.util.TopicConst.MULTI_WILDCARD; import static java.util.Collections.emptyList; -import static java.util.Collections.singletonList; import com.baidu.bifromq.basekv.proto.Boundary; import com.baidu.bifromq.basekv.proto.KVRangeId; @@ -36,7 +32,6 @@ import com.baidu.bifromq.basekv.store.proto.RWCoProcInput; import com.baidu.bifromq.basekv.store.proto.RWCoProcOutput; import com.baidu.bifromq.basekv.utils.KVRangeIdUtil; -import com.baidu.bifromq.metrics.ITenantMeter; import com.baidu.bifromq.retain.rpc.proto.BatchMatchReply; import com.baidu.bifromq.retain.rpc.proto.BatchMatchRequest; import com.baidu.bifromq.retain.rpc.proto.BatchRetainReply; @@ -53,21 +48,21 @@ import com.baidu.bifromq.retain.rpc.proto.RetainServiceROCoProcOutput; import com.baidu.bifromq.retain.rpc.proto.RetainServiceRWCoProcInput; import com.baidu.bifromq.retain.rpc.proto.RetainServiceRWCoProcOutput; -import com.baidu.bifromq.retain.rpc.proto.RetainSetMetadata; -import com.baidu.bifromq.retain.utils.KeyUtil; -import com.baidu.bifromq.retain.utils.TopicUtil; +import com.baidu.bifromq.retain.store.index.RetainTopicIndex; +import com.baidu.bifromq.retain.store.index.RetainedMsgInfo; import com.baidu.bifromq.type.Message; import com.baidu.bifromq.type.TopicMessage; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import java.time.Duration; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import lombok.SneakyThrows; @@ -76,8 +71,9 @@ @Slf4j class RetainStoreCoProc implements IKVRangeCoProc { private final Supplier rangeReaderProvider; - private final Map metadataMap = new ConcurrentHashMap<>(); + private final TenantsState tenantsState; private final String[] tags; + private RetainTopicIndex index; RetainStoreCoProc(String clusterId, String storeId, @@ -85,7 +81,8 @@ class RetainStoreCoProc implements IKVRangeCoProc { Supplier rangeReaderProvider) { this.tags = new String[] {"clusterId", clusterId, "storeId", storeId, "rangeId", KVRangeIdUtil.toString(id)}; this.rangeReaderProvider = rangeReaderProvider; - loadMetadata(); + this.tenantsState = new TenantsState(rangeReaderProvider.get(), tags); + load(); } @Override @@ -118,7 +115,7 @@ public Supplier mutate(RWCoProcInput input, IKVReader reader, IK } case GC -> { GCReply.Builder replyBuilder = GCReply.newBuilder(); - afterMutate.set(gc(coProcInput.getGc(), replyBuilder, reader, writer)); + afterMutate.set(gc(coProcInput.getGc(), replyBuilder, writer)); outputBuilder.setGc(replyBuilder); } } @@ -131,299 +128,169 @@ public Supplier mutate(RWCoProcInput input, IKVReader reader, IK @Override public void reset(Boundary boundary) { - clearMetrics(); - loadMetadata(); + load(); } @Override public void close() { - metadataMap.keySet().forEach(tenantId -> { - ITenantMeter.stopGauging(tenantId, MqttRetainNumGauge, tags); - ITenantMeter.stopGauging(tenantId, MqttRetainSpaceGauge, tags); - }); + index = null; + tenantsState.destroy(); } private CompletableFuture batchMatch(BatchMatchRequest request, IKVReader reader) { BatchMatchReply.Builder replyBuilder = BatchMatchReply.newBuilder().setReqId(request.getReqId()); - request.getMatchParamsMap().forEach((tenantId, matchParams) -> { + for (String tenantId : request.getMatchParamsMap().keySet()) { MatchResultPack.Builder resultPackBuilder = MatchResultPack.newBuilder(); - matchParams.getTopicFiltersMap().forEach((topicFilter, limit) -> { + for (String topicFilter : request.getMatchParamsMap().get(tenantId).getTopicFiltersMap().keySet()) { MatchResult.Builder resultBuilder = MatchResult.newBuilder(); try { resultBuilder.setOk(Matched.newBuilder() - .addAllMessages(match(tenantNS(tenantId), topicFilter, limit, matchParams.getNow(), reader))); + .addAllMessages(match(tenantId, topicFilter, + request.getMatchParamsMap().get(tenantId).getTopicFiltersMap().get(topicFilter), + request.getMatchParamsMap().get(tenantId).getNow(), reader))); } catch (Throwable e) { resultBuilder.setError(MatchError.getDefaultInstance()); } resultPackBuilder.putResults(topicFilter, resultBuilder.build()); - }); + } replyBuilder.putResultPack(tenantId, resultPackBuilder.build()); - }); + } return CompletableFuture.completedFuture(replyBuilder.build()); } - private List match(ByteString tenantNS, + private List match(String tenantId, String topicFilter, int limit, long now, IKVReader reader) throws Exception { if (limit == 0) { - // TODO: report event: nothing to match - return emptyList(); - } - Boundary range = Boundary.newBuilder() - .setStartKey(tenantNS) - .setEndKey(upperBound(tenantNS)) - .build(); - reader.refresh(); - IKVIterator itr = reader.iterator(); - itr.seek(range.getStartKey()); - if (!itr.isValid()) { return emptyList(); } - if (!isWildcardTopicFilter(topicFilter)) { - Optional val = reader.get(KeyUtil.retainKey(tenantNS, topicFilter)); + List matchedMsgInfos = index.match(tenantId, topicFilter); + List messages = new LinkedList<>(); + for (RetainedMsgInfo msgInfo : matchedMsgInfos) { + if (messages.size() >= limit) { + break; + } + Optional val = reader.get(retainKey(msgInfo.tenantId, msgInfo.topic)); if (val.isPresent()) { TopicMessage message = TopicMessage.parseFrom(val.get()); if (expireAt(message.getMessage()) > now) { - return singletonList(message); - } - } - return emptyList(); - } - // deal with wildcard topic filter - List matchLevels = TopicUtil.parse(topicFilter, false); - List messages = new LinkedList<>(); - itr.seek(KeyUtil.retainKeyPrefix(tenantNS, matchLevels)); - out: - while (itr.isValid() && compare(itr.key(), range.getEndKey()) < 0 && messages.size() < limit) { - List topicLevels = KeyUtil.parseTopic(itr.key()); - switch (RetainMatcher.match(topicLevels, matchLevels)) { - case MATCHED_AND_CONTINUE -> { - TopicMessage message = TopicMessage.parseFrom(itr.value()); - if (expireAt(message.getMessage()) > now) { - messages.add(message); - } - itr.next(); - } - case MATCHED_AND_STOP -> { - TopicMessage message = TopicMessage.parseFrom(itr.value()); - if (expireAt(message.getMessage()) > now) { - messages.add(message); - } - break out; - } - case MISMATCH_AND_CONTINUE -> itr.next(); - case MISMATCH_AND_STOP -> { - break out; + messages.add(message); } } -// // TODO: optimize single level wildcard match to stop early -// if (TopicUtil.match(topicLevels, matchLevels)) { -// TopicMessage message = TopicMessage.parseFrom(itr.value()); -// if (expireAt(message.getMessage()) > now) { -// messages.add(message); -// } -// } -// itr.next(); } return messages; } + private Runnable batchRetain(BatchRetainRequest request, BatchRetainReply.Builder replyBuilder, IKVReader reader, IKVWriter writer) { replyBuilder.setReqId(request.getReqId()); - Map toBeCached = new HashMap<>(); - request.getParamsMap().forEach((tenantId, retainParam) -> { - RetainSetMetadata.Builder metadataBuilder = getMetadata(tenantId) - .map(RetainSetMetadata::toBuilder) - .orElse(RetainSetMetadata.newBuilder().setEstExpire(Long.MAX_VALUE)); - Map results = - retain(tenantId, retainParam.getTopicMessagesMap(), metadataBuilder, reader, writer); + Map> addTopics = new HashMap<>(); + Map> updateTopics = new HashMap<>(); + Map> removeTopics = new HashMap<>(); + for (String tenantId : request.getParamsMap().keySet()) { + Map results = new HashMap<>(); + for (Map.Entry entry : + request.getParamsMap().get(tenantId).getTopicMessagesMap().entrySet()) { + String topic = entry.getKey(); + RetainMessage retainMessage = entry.getValue(); + try { + TopicMessage topicMessage = TopicMessage.newBuilder() + .setTopic(topic) + .setMessage(retainMessage.getMessage()) + .setPublisher(retainMessage.getPublisher()) + .build(); + ByteString retainKey = retainKey(tenantId, topicMessage.getTopic()); + List retainedMsgInfos = index.match(tenantId, topic); + if (topicMessage.getMessage().getPayload().isEmpty()) { + // delete existing retained + if (!retainedMsgInfos.isEmpty()) { + writer.delete(retainKey); + removeTopics.computeIfAbsent(tenantId, k -> new HashSet<>()).add(topic); + } + results.put(topic, RetainResult.Code.CLEARED); + continue; + } + if (retainedMsgInfos.isEmpty()) { + // retain new message + writer.put(retainKey, topicMessage.toByteString()); + addTopics.computeIfAbsent(tenantId, k -> new HashMap<>()) + .put(topic, topicMessage.getMessage()); + } else { + // replace existing + writer.put(retainKey, topicMessage.toByteString()); + updateTopics.computeIfAbsent(tenantId, k -> new HashMap<>()) + .put(topic, topicMessage.getMessage()); + } + results.put(topic, RetainResult.Code.RETAINED); + } catch (Throwable e) { + log.error("Retain failed", e); + results.put(topic, RetainResult.Code.ERROR); + } + } replyBuilder.putResults(tenantId, RetainResult.newBuilder() .putAllResults(results) .build()); - if (metadataBuilder.getCount() > 0) { - writer.put(tenantNS(tenantId), metadataBuilder.build().toByteString()); - } else { - writer.delete(tenantNS(tenantId)); - } - toBeCached.put(tenantId, metadataBuilder.build()); - }); + } return () -> { - toBeCached.forEach(this::cacheMetadata); + addTopics.forEach((tenantId, topics) -> { + topics.forEach( + (topic, msg) -> index.add(tenantId, topic, msg.getTimestamp(), msg.getExpiryInterval())); + tenantsState.increaseTopicCount(tenantId, topics.size()); + }); + updateTopics.forEach((tenantId, topics) -> { + topics.forEach((topic, msg) -> index.remove(tenantId, topic)); + topics.forEach((topic, msg) -> index.add(tenantId, topic, msg.getTimestamp(), msg.getExpiryInterval())); + }); + removeTopics.forEach((tenantId, topics) -> { + topics.forEach(topic -> index.remove(tenantId, topic)); + tenantsState.increaseTopicCount(tenantId, -topics.size()); + }); }; } - private Map retain(String tenantId, - Map retainMessages, - RetainSetMetadata.Builder metadataBuilder, - IKVReader reader, - IKVWriter writer) { - Map results = new HashMap<>(); - for (Map.Entry entry : retainMessages.entrySet()) { - String topic = entry.getKey(); - RetainMessage retainMessage = entry.getValue(); - RetainResult.Code result = doRetain(tenantId, topic, retainMessage, metadataBuilder, reader, writer); - results.put(topic, result); - } - return results; - } - - private RetainResult.Code doRetain(String tenantId, - String topic, - RetainMessage retainMessage, - RetainSetMetadata.Builder metadataBuilder, - IKVReader reader, - IKVWriter writer) { - try { - TopicMessage topicMessage = TopicMessage.newBuilder() - .setTopic(topic) - .setMessage(retainMessage.getMessage()) - .setPublisher(retainMessage.getPublisher()) - .build(); - ByteString tenantNS = KeyUtil.tenantNS(tenantId); - ByteString retainKey = KeyUtil.retainKey(tenantNS, topicMessage.getTopic()); - Optional val = reader.get(retainKey); - if (topicMessage.getMessage().getPayload().isEmpty()) { - // delete existing retained - if (val.isPresent()) { - TopicMessage existing = TopicMessage.parseFrom(val.get()); - writer.delete(retainKey); - metadataBuilder - .setCount(metadataBuilder.getCount() - 1) - .setUsedSpace(metadataBuilder.getUsedSpace() - sizeOf(existing)); - } - return RetainResult.Code.CLEARED; - } - if (val.isEmpty()) { - // retain new message - writer.put(retainKey, topicMessage.toByteString()); - metadataBuilder - .setEstExpire(Math.min(expireAt(topicMessage.getMessage()), metadataBuilder.getEstExpire())) - .setUsedSpace(metadataBuilder.getUsedSpace() + sizeOf(topicMessage)) - .setCount(metadataBuilder.getCount() + 1).build(); - } else { - // replace existing - TopicMessage existing = TopicMessage.parseFrom(val.get()); - writer.put(retainKey, topicMessage.toByteString()); - metadataBuilder - .setEstExpire(Math.min(expireAt(topicMessage.getMessage()), metadataBuilder.getEstExpire())) - .setUsedSpace(metadataBuilder.getUsedSpace() + sizeOf(topicMessage) - sizeOf(existing)); - } - return RetainResult.Code.RETAINED; - } catch (Throwable e) { - log.error("Retain failed", e); - return RetainResult.Code.ERROR; - } - } - - private void doGC(long now, - String tenantId, - RetainSetMetadata.Builder metadataBuilder, - IKVReader reader, - IKVWriter writer) { - doGC(now, null, tenantId, metadataBuilder, reader, writer); - } - - @SneakyThrows - private void doGC(long now, - Integer expirySeconds, - String tenantId, - RetainSetMetadata.Builder metadataBuilder, - IKVReader reader, - IKVWriter writer) { - Boundary range = Boundary.newBuilder() - .setStartKey(tenantNS(tenantId)) - .setEndKey(upperBound(tenantNS(tenantId))).build(); - reader.refresh(); - IKVIterator itr = reader.iterator(); - itr.seek(range.getStartKey()); - itr.next(); - int expires = 0; - int freedSpace = 0; - long earliestExp = Long.MAX_VALUE; - for (; itr.isValid() && compare(itr.key(), range.getEndKey()) < 0; itr.next()) { - TopicMessage retainedMsg = TopicMessage.parseFrom(itr.value()); - long expireTime = expireAt(retainedMsg.getMessage().getTimestamp(), - expirySeconds != null ? expirySeconds : retainedMsg.getMessage().getExpiryInterval()); - if (expireTime <= now) { - writer.delete(itr.key()); - freedSpace += sizeOf(retainedMsg); - expires++; - } else { - earliestExp = Math.min(expireTime, earliestExp); - } - } - metadataBuilder - .setCount(metadataBuilder.getCount() - expires) - .setUsedSpace(metadataBuilder.getUsedSpace() - freedSpace) - .setEstExpire(earliestExp == Long.MAX_VALUE ? now : earliestExp); - } - - private Runnable gc(GCRequest request, GCReply.Builder replyBuilder, IKVReader reader, IKVWriter writer) { + private Runnable gc(GCRequest request, GCReply.Builder replyBuilder, IKVWriter writer) { replyBuilder.setReqId(request.getReqId()); long now = request.getNow(); - Map toBeCached = new HashMap<>(); - if (request.hasTenantId()) { - String tenantId = request.getTenantId(); - Optional metadata = getMetadata(tenantId); - if (metadata.isPresent()) { - RetainSetMetadata.Builder metadataBuilder = metadata.get().toBuilder(); - if (request.hasExpirySeconds()) { - doGC(now, request.getExpirySeconds(), tenantId, metadataBuilder, reader, writer); - } else { - doGC(now, tenantId, metadataBuilder, reader, writer); - } - if (!metadataBuilder.build().equals(metadata.get())) { - if (metadataBuilder.getCount() > 0) { - writer.put(tenantNS(tenantId), metadataBuilder.build().toByteString()); - } else { - writer.delete(tenantNS(tenantId)); - } - toBeCached.put(tenantId, metadataBuilder.build()); - } - } - } else { - for (Map.Entry entry : metadataMap.entrySet()) { - String tenantId = entry.getKey(); - RetainSetMetadata metadata = entry.getValue(); - RetainSetMetadata.Builder metadataBuilder = metadata.toBuilder(); - if (request.hasExpirySeconds()) { - doGC(now, request.getExpirySeconds(), tenantId, metadataBuilder, reader, writer); - } else { - doGC(now, tenantId, metadataBuilder, reader, writer); - } - if (!metadataBuilder.build().equals(metadata)) { - if (metadataBuilder.getCount() > 0) { - writer.put(tenantNS(tenantId), metadataBuilder.build().toByteString()); - } else { - writer.delete(tenantNS(tenantId)); - } - toBeCached.put(tenantId, metadataBuilder.build()); - } + Map> removedTopics = new HashMap<>(); + List retainedMsgInfos = request.hasTenantId() + ? index.match(request.getTenantId(), MULTI_WILDCARD) : index.findAll(); + for (RetainedMsgInfo msgInfo : retainedMsgInfos) { + long expireTime = expireAt(msgInfo.timestamp, + (request.hasExpirySeconds() ? request.getExpirySeconds() : msgInfo.expirySeconds)); + if (expireTime <= now) { + writer.delete(retainKey(msgInfo.tenantId, msgInfo.topic)); + removedTopics.computeIfAbsent(msgInfo.tenantId, k -> new HashSet<>()).add(msgInfo.topic); } } return () -> { - toBeCached.forEach(this::cacheMetadata); + removedTopics.forEach((tenantId, topics) -> topics.forEach(topic -> index.remove(tenantId, topic))); + removedTopics.forEach((tenantId, topics) -> tenantsState.increaseTopicCount(tenantId, -topics.size())); }; } - private void loadMetadata() { + private void load() { + index = new RetainTopicIndex(); + tenantsState.destroy(); + try (IKVCloseableReader reader = rangeReaderProvider.get()) { IKVIterator itr = reader.iterator(); - for (itr.seekToFirst(); itr.isValid(); ) { - ByteString tenantNS = parseTenantNS(itr.key()); - String tenantId = parseTenantId(tenantNS); - try { - RetainSetMetadata metadata = RetainSetMetadata.parseFrom(itr.value()); - cacheMetadata(tenantId, metadata); - } catch (InvalidProtocolBufferException e) { - log.error("Unable to parse RetainSetMetadata", e); - } finally { - itr.seek(upperBound(tenantNS)); + for (itr.seekToFirst(); itr.isValid(); itr.next()) { + if (!isTenantNS(itr.key())) { + try { + ByteString tenantNS = parseTenantNS(itr.key()); + String tenantId = parseTenantId(tenantNS); + TopicMessage topicMessage = TopicMessage.parseFrom(itr.value()); + index.add(tenantId, topicMessage.getTopic(), topicMessage.getMessage().getTimestamp(), + topicMessage.getMessage().getExpiryInterval()); + tenantsState.increaseTopicCount(tenantId, 1); + } catch (InvalidProtocolBufferException e) { + log.error("Failed to parse retained message", e); + } } } } @@ -436,42 +303,4 @@ private long expireAt(Message message) { private long expireAt(long timestamp, int expirySeconds) { return Duration.ofMillis(timestamp).plusSeconds(expirySeconds).toMillis(); } - - private int sizeOf(TopicMessage retained) { - return retained.getTopic().length() + retained.getMessage().getPayload().size(); - } - - private Optional getMetadata(String tenantId) { - return Optional.ofNullable(metadataMap.get(tenantId)); - } - - private void clearMetrics() { - metadataMap.keySet().forEach(tenantId -> { - ITenantMeter.stopGauging(tenantId, MqttRetainNumGauge, tags); - ITenantMeter.stopGauging(tenantId, MqttRetainSpaceGauge, tags); - }); - } - - private void cacheMetadata(String tenantId, RetainSetMetadata metadata) { - metadataMap.compute(tenantId, (k, v) -> { - if (v == null) { - if (metadata.getCount() > 0) { - ITenantMeter.gauging(tenantId, MqttRetainNumGauge, - () -> metadataMap.getOrDefault(k, RetainSetMetadata.getDefaultInstance()).getCount(), tags); - - ITenantMeter.gauging(tenantId, MqttRetainSpaceGauge, - () -> metadataMap.getOrDefault(k, RetainSetMetadata.getDefaultInstance()).getUsedSpace(), tags); - return metadata; - } - return null; - } else { - if (metadata.getCount() == 0) { - ITenantMeter.stopGauging(tenantId, MqttRetainNumGauge, tags); - ITenantMeter.stopGauging(tenantId, MqttRetainSpaceGauge, tags); - return null; - } - return metadata; - } - }); - } } diff --git a/bifromq-retain/bifromq-retain-store/src/main/java/com/baidu/bifromq/retain/store/RetainStoreCoProcFactory.java b/bifromq-retain/bifromq-retain-store/src/main/java/com/baidu/bifromq/retain/store/RetainStoreCoProcFactory.java index c9e5cb79a..63cf5b132 100644 --- a/bifromq-retain/bifromq-retain-store/src/main/java/com/baidu/bifromq/retain/store/RetainStoreCoProcFactory.java +++ b/bifromq-retain/bifromq-retain-store/src/main/java/com/baidu/bifromq/retain/store/RetainStoreCoProcFactory.java @@ -13,29 +13,21 @@ package com.baidu.bifromq.retain.store; -import static com.baidu.bifromq.basekv.utils.BoundaryUtil.upperBound; -import static com.baidu.bifromq.retain.utils.KeyUtil.parseTenantNS; - import com.baidu.bifromq.basekv.proto.KVRangeId; import com.baidu.bifromq.basekv.store.api.IKVCloseableReader; import com.baidu.bifromq.basekv.store.api.IKVRangeCoProc; import com.baidu.bifromq.basekv.store.api.IKVRangeCoProcFactory; import com.baidu.bifromq.basekv.store.api.IKVRangeSplitHinter; -import com.baidu.bifromq.basekv.store.api.IKVReader; import com.baidu.bifromq.basekv.store.range.hinter.MutationKVLoadBasedSplitHinter; import com.baidu.bifromq.basekv.utils.KVRangeIdUtil; import java.time.Duration; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; public class RetainStoreCoProcFactory implements IKVRangeCoProcFactory { private final Duration loadEstWindow; - private final Map tenantRetainSpaceMap = new ConcurrentHashMap<>(); - private final Map tenantRetainCountMap = new ConcurrentHashMap<>(); public RetainStoreCoProcFactory(Duration loadEstimateWindow) { this.loadEstWindow = loadEstimateWindow; @@ -45,10 +37,8 @@ public RetainStoreCoProcFactory(Duration loadEstimateWindow) { public List createHinters(String clusterId, String storeId, KVRangeId id, Supplier rangeReaderProvider) { return Collections.singletonList( - new MutationKVLoadBasedSplitHinter(loadEstWindow, key -> { - // make sure retain message from one tenant do no cross range - return Optional.of(upperBound(parseTenantNS(key))); - }, "clusterId", clusterId, "storeId", storeId, "rangeId", KVRangeIdUtil.toString(id))); + new MutationKVLoadBasedSplitHinter(loadEstWindow, Optional::of, + "clusterId", clusterId, "storeId", storeId, "rangeId", KVRangeIdUtil.toString(id))); } @Override diff --git a/bifromq-retain/bifromq-retain-store/src/main/java/com/baidu/bifromq/retain/store/TenantRetainedSet.java b/bifromq-retain/bifromq-retain-store/src/main/java/com/baidu/bifromq/retain/store/TenantRetainedSet.java new file mode 100644 index 000000000..b468ce227 --- /dev/null +++ b/bifromq-retain/bifromq-retain-store/src/main/java/com/baidu/bifromq/retain/store/TenantRetainedSet.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2024. The BifroMQ Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.baidu.bifromq.retain.store; + +import static com.baidu.bifromq.basekv.utils.BoundaryUtil.intersect; +import static com.baidu.bifromq.basekv.utils.BoundaryUtil.upperBound; +import static com.baidu.bifromq.metrics.TenantMetric.MqttRetainNumGauge; +import static com.baidu.bifromq.metrics.TenantMetric.MqttRetainSpaceGauge; +import static com.baidu.bifromq.retain.utils.KeyUtil.tenantNS; + +import com.baidu.bifromq.basekv.proto.Boundary; +import com.baidu.bifromq.basekv.store.api.IKVReader; +import com.baidu.bifromq.metrics.ITenantMeter; +import java.util.concurrent.atomic.AtomicLong; + +public class TenantRetainedSet { + private final AtomicLong topicCount = new AtomicLong(); + private final String tenantId; + private final String[] tags; + + public TenantRetainedSet(String tenantId, IKVReader reader, String... tags) { + this.tenantId = tenantId; + this.tags = tags; + ITenantMeter.gauging(tenantId, MqttRetainSpaceGauge, () -> reader.size(intersect(Boundary.newBuilder() + .setStartKey(tenantNS(tenantId)) + .setEndKey(upperBound(tenantNS(tenantId))) + .build(), reader.boundary())), tags); + ITenantMeter.gauging(tenantId, MqttRetainNumGauge, topicCount::get, tags); + } + + public long incrementTopicCount(int delta) { + return topicCount.addAndGet(delta); + } + + void destroy() { + ITenantMeter.stopGauging(tenantId, MqttRetainSpaceGauge, tags); + ITenantMeter.stopGauging(tenantId, MqttRetainNumGauge, tags); + } +} diff --git a/bifromq-retain/bifromq-retain-store/src/main/java/com/baidu/bifromq/retain/store/TenantsState.java b/bifromq-retain/bifromq-retain-store/src/main/java/com/baidu/bifromq/retain/store/TenantsState.java new file mode 100644 index 000000000..46d72511f --- /dev/null +++ b/bifromq-retain/bifromq-retain-store/src/main/java/com/baidu/bifromq/retain/store/TenantsState.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2024. The BifroMQ Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.baidu.bifromq.retain.store; + +import com.baidu.bifromq.basekv.store.api.IKVReader; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +class TenantsState { + private final Map retainedSet = new ConcurrentHashMap<>(); + private final IKVReader reader; + private final String[] tags; + + TenantsState(IKVReader reader, String... tags) { + this.reader = reader; + this.tags = tags; + } + + void increaseTopicCount(String tenantId, int delta) { + retainedSet.compute(tenantId, (k, v) -> { + if (v == null) { + v = new TenantRetainedSet(tenantId, reader, tags); + } + if (v.incrementTopicCount(delta) == 0) { + v.destroy(); + return null; + } + return v; + }); + } + + void destroy() { + retainedSet.values().forEach(TenantRetainedSet::destroy); + retainedSet.clear(); + } +} diff --git a/bifromq-retain/bifromq-retain-store/src/main/java/com/baidu/bifromq/retain/store/index/IRetainTopicIndex.java b/bifromq-retain/bifromq-retain-store/src/main/java/com/baidu/bifromq/retain/store/index/IRetainTopicIndex.java new file mode 100644 index 000000000..e2edf3414 --- /dev/null +++ b/bifromq-retain/bifromq-retain-store/src/main/java/com/baidu/bifromq/retain/store/index/IRetainTopicIndex.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2024. The BifroMQ Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.baidu.bifromq.retain.store.index; + +import java.util.List; + +/** + * The index of retain topics. + */ +public interface IRetainTopicIndex { + void add(String tenantId, String topic, long timestamp, int expirySeconds); + + void remove(String tenantId, String topic); + + List match(String tenantId, String topicFilter); + + List findAll(); +} diff --git a/bifromq-retain/bifromq-retain-store/src/main/java/com/baidu/bifromq/retain/store/index/RetainTopicIndex.java b/bifromq-retain/bifromq-retain-store/src/main/java/com/baidu/bifromq/retain/store/index/RetainTopicIndex.java new file mode 100644 index 000000000..d103e8817 --- /dev/null +++ b/bifromq-retain/bifromq-retain-store/src/main/java/com/baidu/bifromq/retain/store/index/RetainTopicIndex.java @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2024. The BifroMQ Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.baidu.bifromq.retain.store.index; + +import static com.baidu.bifromq.util.TopicConst.MULTI_WILDCARD; +import static com.baidu.bifromq.util.TopicConst.SINGLE_WILDCARD; +import static com.baidu.bifromq.util.TopicConst.SYS_PREFIX; + +import com.baidu.bifromq.util.TopicUtil; +import com.baidu.bifromq.util.index.Branch; +import com.baidu.bifromq.util.index.TopicLevelTrie; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class RetainTopicIndex extends TopicLevelTrie implements IRetainTopicIndex { + private static final BranchSelector BranchSelector = new BranchSelector() { + @Override + public Map, Action> selectBranch(Map> branches, + List topicLevels, + int currentLevel) { + if (topicLevels.isEmpty()) { + Map, Action> result = new HashMap<>(); + for (Map.Entry> entry : branches.entrySet()) { + Branch branch = entry.getValue(); + result.put(branch, Action.MATCH_AND_CONTINUE); + } + return result; + } + if (currentLevel < topicLevels.size() - 1) { + // not last level + boolean matchParent = currentLevel + 1 == topicLevels.size() - 1 + && topicLevels.get(currentLevel + 1).equals(MULTI_WILDCARD); + String topicLevelToMatch = topicLevels.get(currentLevel); + switch (topicLevelToMatch) { + case SINGLE_WILDCARD -> { + Map, Action> result = new HashMap<>(); + for (Map.Entry> entry : branches.entrySet()) { + Branch branch = entry.getValue(); + // the first level represents tenant + if (currentLevel == 1 && entry.getKey().startsWith(SYS_PREFIX)) { + // + skip SYS topic + continue; + } + result.put(branch, matchParent ? Action.MATCH_AND_CONTINUE : Action.CONTINUE); + } + return result; + } + default -> { + assert !topicLevelToMatch.equals(MULTI_WILDCARD) : "MULTI_WILDCARD should be the last level"; + if (branches.containsKey(topicLevelToMatch)) { + return Map.of(branches.get(topicLevelToMatch), + matchParent ? Action.MATCH_AND_CONTINUE : Action.CONTINUE); + } + return Collections.emptyMap(); + } + } + } else if (currentLevel == topicLevels.size() - 1) { + // last level + String topicLevelToMatch = topicLevels.get(currentLevel); + switch (topicLevelToMatch) { + case SINGLE_WILDCARD -> { + Map, Action> result = new HashMap<>(); + for (Map.Entry> entry : branches.entrySet()) { + Branch branch = entry.getValue(); + // the first level represents tenant + if (currentLevel == 1 && entry.getKey().startsWith(SYS_PREFIX)) { + // + skip SYS topic + continue; + } + result.put(branch, Action.MATCH_AND_STOP); + } + return result; + } + case MULTI_WILDCARD -> { + Map, Action> result = new HashMap<>(); + for (Map.Entry> entry : branches.entrySet()) { + Branch branch = entry.getValue(); + // the first level represents tenant + if (currentLevel == 1 && entry.getKey().startsWith(SYS_PREFIX)) { + // # skip SYS topic + continue; + } + result.put(branch, Action.MATCH_AND_CONTINUE); + } + return result; + } + default -> { + if (branches.containsKey(topicLevelToMatch)) { + return Map.of(branches.get(topicLevelToMatch), Action.MATCH_AND_STOP); + } + return Collections.emptyMap(); + } + } + } else { + // # matches all descendant levels + Map, Action> result = new HashMap<>(); + for (Map.Entry> entry : branches.entrySet()) { + Branch branch = entry.getValue(); + result.put(branch, Action.MATCH_AND_CONTINUE); + } + return result; + } + } + }; + + public RetainTopicIndex() { + super(BranchSelector); + } + + public void add(String tenantId, String topic, long timestamp, int expirySeconds) { + add(TopicUtil.parse(tenantId, topic, false), + new RetainedMsgInfo(tenantId, topic, timestamp, expirySeconds)); + } + + public void remove(String tenantId, String topic) { + remove(TopicUtil.parse(tenantId, topic, false), + new RetainedMsgInfo(tenantId, topic, 0, 0)); + } + + public List match(String tenantId, String topicFilter) { + return lookup(TopicUtil.parse(tenantId, topicFilter, false)); + } + + @Override + public List findAll() { + return lookup(Collections.emptyList()); + } +} diff --git a/bifromq-retain/bifromq-retain-store/src/main/java/com/baidu/bifromq/retain/store/index/RetainedMsgInfo.java b/bifromq-retain/bifromq-retain-store/src/main/java/com/baidu/bifromq/retain/store/index/RetainedMsgInfo.java new file mode 100644 index 000000000..e43ebd554 --- /dev/null +++ b/bifromq-retain/bifromq-retain-store/src/main/java/com/baidu/bifromq/retain/store/index/RetainedMsgInfo.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2024. The BifroMQ Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.baidu.bifromq.retain.store.index; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +@EqualsAndHashCode +@AllArgsConstructor +@ToString +public class RetainedMsgInfo { + public final String tenantId; + public final String topic; + @EqualsAndHashCode.Exclude + public final long timestamp; + @EqualsAndHashCode.Exclude + public final int expirySeconds; +} diff --git a/bifromq-retain/bifromq-retain-store/src/test/java/com/baidu/bifromq/retain/store/MatchTest.java b/bifromq-retain/bifromq-retain-store/src/test/java/com/baidu/bifromq/retain/store/RetainMatchTest.java similarity index 96% rename from bifromq-retain/bifromq-retain-store/src/test/java/com/baidu/bifromq/retain/store/MatchTest.java rename to bifromq-retain/bifromq-retain-store/src/test/java/com/baidu/bifromq/retain/store/RetainMatchTest.java index a22c0a71d..90cbfcc56 100644 --- a/bifromq-retain/bifromq-retain-store/src/test/java/com/baidu/bifromq/retain/store/MatchTest.java +++ b/bifromq-retain/bifromq-retain-store/src/test/java/com/baidu/bifromq/retain/store/RetainMatchTest.java @@ -21,7 +21,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -public class MatchTest extends RetainStoreTest { +public class RetainMatchTest extends RetainStoreTest { private String tenantId; @BeforeMethod(alwaysRun = true) @@ -64,9 +64,9 @@ public void wildcardTopicFilter() { assertEquals(newHashSet(matchReply.getOk().getMessagesList()), newHashSet(message3)); matchReply = requestMatch(tenantId, "/#", 10); - assertEquals(matchReply.getOk().getMessagesCount(), 4); + assertEquals(matchReply.getOk().getMessagesCount(), 3); assertEquals(newHashSet(matchReply.getOk().getMessagesList()), - newHashSet(message1, message2, message3, message4)); + newHashSet(message1, message2, message3)); matchReply = requestMatch(tenantId, "/c/#", 10); assertEquals(matchReply.getOk().getMessagesCount(), 1); diff --git a/bifromq-retain/bifromq-retain-store/src/test/java/com/baidu/bifromq/retain/store/RetainMatcherTest.java b/bifromq-retain/bifromq-retain-store/src/test/java/com/baidu/bifromq/retain/store/RetainMatcherTest.java index a850f3516..5940652fa 100644 --- a/bifromq-retain/bifromq-retain-store/src/test/java/com/baidu/bifromq/retain/store/RetainMatcherTest.java +++ b/bifromq-retain/bifromq-retain-store/src/test/java/com/baidu/bifromq/retain/store/RetainMatcherTest.java @@ -18,11 +18,13 @@ import static com.baidu.bifromq.retain.store.RetainMatcher.MatchResult.MISMATCH_AND_CONTINUE; import static com.baidu.bifromq.retain.store.RetainMatcher.MatchResult.MISMATCH_AND_STOP; import static com.baidu.bifromq.retain.store.RetainMatcher.match; -import static com.baidu.bifromq.retain.utils.TopicUtil.parse; +import static com.baidu.bifromq.util.TopicUtil.parse; import static org.testng.Assert.assertEquals; +import lombok.extern.slf4j.Slf4j; import org.testng.annotations.Test; +@Slf4j public class RetainMatcherTest { @Test public void matches() { diff --git a/bifromq-retain/bifromq-retain-store/src/test/java/com/baidu/bifromq/retain/store/TenantRetainSetTest.java b/bifromq-retain/bifromq-retain-store/src/test/java/com/baidu/bifromq/retain/store/TenantRetainSetTest.java new file mode 100644 index 000000000..af1c2e6f9 --- /dev/null +++ b/bifromq-retain/bifromq-retain-store/src/test/java/com/baidu/bifromq/retain/store/TenantRetainSetTest.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2024. The BifroMQ Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.baidu.bifromq.retain.store; + +import static com.baidu.bifromq.basekv.utils.BoundaryUtil.FULL_BOUNDARY; +import static com.baidu.bifromq.basekv.utils.BoundaryUtil.intersect; +import static com.baidu.bifromq.basekv.utils.BoundaryUtil.upperBound; +import static com.baidu.bifromq.retain.utils.KeyUtil.tenantNS; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import com.baidu.bifromq.basekv.proto.Boundary; +import com.baidu.bifromq.basekv.store.api.IKVReader; +import com.baidu.bifromq.metrics.ITenantMeter; +import com.baidu.bifromq.metrics.TenantMetric; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import java.util.Optional; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class TenantRetainSetTest { + private SimpleMeterRegistry meterRegistry; + private IKVReader reader; + + + @BeforeMethod + public void setup() { + reader = mock(IKVReader.class); + meterRegistry = new SimpleMeterRegistry(); + Metrics.globalRegistry.add(meterRegistry); + } + + @AfterMethod + public void tearDown() { + Metrics.globalRegistry.getMeters().forEach(Metrics.globalRegistry::remove); + Metrics.globalRegistry.remove(meterRegistry); + } + + @Test + public void metricValue() { + String tenantId = "tenant" + System.nanoTime(); + when(reader.boundary()).thenReturn(FULL_BOUNDARY); + when(reader.size(eq(intersect(FULL_BOUNDARY, Boundary.newBuilder() + .setStartKey(tenantNS(tenantId)) + .setEndKey(upperBound(tenantNS(tenantId))) + .build())))).thenReturn(10L); + TenantRetainedSet tenantRetainedSet = new TenantRetainedSet(tenantId, reader); + tenantRetainedSet.incrementTopicCount(10); + assertGaugeValue(tenantId, TenantMetric.MqttRetainSpaceGauge, 10); + assertGaugeValue(tenantId, TenantMetric.MqttRetainNumGauge, 10); + } + + protected void assertGaugeValue(String tenantId, TenantMetric tenantMetric, double value) { + Optional meter = getGauge(tenantId, tenantMetric); + assertTrue(meter.isPresent()); + assertEquals(((Gauge) meter.get()).value(), value); + } + + protected Optional getGauge(String tenantId, TenantMetric tenantMetric) { + return meterRegistry.getMeters().stream() + .filter(m -> m.getId().getName().equals(tenantMetric.metricName) + && tenantId.equals(m.getId().getTag(ITenantMeter.TAG_TENANT_ID))).findFirst(); + } + +} diff --git a/bifromq-retain/bifromq-retain-store/src/test/java/com/baidu/bifromq/retain/store/TenantsStateTest.java b/bifromq-retain/bifromq-retain-store/src/test/java/com/baidu/bifromq/retain/store/TenantsStateTest.java new file mode 100644 index 000000000..edb491b63 --- /dev/null +++ b/bifromq-retain/bifromq-retain-store/src/test/java/com/baidu/bifromq/retain/store/TenantsStateTest.java @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2024. The BifroMQ Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.baidu.bifromq.retain.store; + +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import com.baidu.bifromq.basekv.store.api.IKVReader; +import com.baidu.bifromq.metrics.ITenantMeter; +import com.baidu.bifromq.metrics.TenantMetric; +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import java.util.Optional; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class TenantsStateTest { + String tenantId = "tenant-" + System.nanoTime(); + private SimpleMeterRegistry meterRegistry; + private IKVReader reader; + + + @BeforeMethod + public void setup() { + reader = mock(IKVReader.class); + meterRegistry = new SimpleMeterRegistry(); + Metrics.globalRegistry.add(meterRegistry); + } + + @AfterMethod + public void tearDown() { + Metrics.globalRegistry.getMeters().forEach(Metrics.globalRegistry::remove); + Metrics.globalRegistry.remove(meterRegistry); + } + + @Test + public void increaseTopicCount() { + TenantsState tenantsState = new TenantsState(reader); + assertNoGauge(tenantId, TenantMetric.MqttRetainNumGauge); + assertNoGauge(tenantId, TenantMetric.MqttRetainSpaceGauge); + tenantsState.increaseTopicCount(tenantId, 1); + assertGauge(tenantId, TenantMetric.MqttRetainNumGauge); + assertGauge(tenantId, TenantMetric.MqttRetainSpaceGauge); + } + + @Test + public void decreaseTopicCount() { + TenantsState tenantsState = new TenantsState(reader); + tenantsState.increaseTopicCount(tenantId, 1); + assertGauge(tenantId, TenantMetric.MqttRetainNumGauge); + assertGauge(tenantId, TenantMetric.MqttRetainSpaceGauge); + + tenantsState.increaseTopicCount(tenantId, -1); + + assertNoGauge(tenantId, TenantMetric.MqttRetainNumGauge); + assertNoGauge(tenantId, TenantMetric.MqttRetainSpaceGauge); + } + + @Test + public void destroy() { + TenantsState tenantsState = new TenantsState(reader); + tenantsState.increaseTopicCount(tenantId, 1); + assertGauge(tenantId, TenantMetric.MqttRetainNumGauge); + assertGauge(tenantId, TenantMetric.MqttRetainSpaceGauge); + + tenantsState.destroy(); + assertNoGauge(tenantId, TenantMetric.MqttRetainNumGauge); + assertNoGauge(tenantId, TenantMetric.MqttRetainSpaceGauge); + } + + private void assertGauge(String tenantId, TenantMetric tenantMetric) { + Optional gauge = getGauge(tenantId, tenantMetric); + assertTrue(gauge.isPresent()); + assertEquals(gauge.get().getId().getType(), Meter.Type.GAUGE); + assertEquals(gauge.get().getId().getTag(ITenantMeter.TAG_TENANT_ID), tenantId); + } + + private void assertNoGauge(String tenantId, TenantMetric tenantMetric) { + Optional gauge = getGauge(tenantId, tenantMetric); + gauge.ifPresent(meter -> assertEquals(meter.getId().getTag(ITenantMeter.TAG_TENANT_ID), tenantId)); + } + + private Optional getGauge(String tenantId, TenantMetric tenantMetric) { + return meterRegistry.getMeters().stream() + .filter(m -> m.getId().getName().equals(tenantMetric.metricName) + && tenantId.equals(m.getId().getTag(ITenantMeter.TAG_TENANT_ID))).findFirst(); + } +} \ No newline at end of file diff --git a/bifromq-retain/bifromq-retain-store/src/test/java/com/baidu/bifromq/retain/store/index/RetainTopicIndexTest.java b/bifromq-retain/bifromq-retain-store/src/test/java/com/baidu/bifromq/retain/store/index/RetainTopicIndexTest.java new file mode 100644 index 000000000..a981a7c44 --- /dev/null +++ b/bifromq-retain/bifromq-retain-store/src/test/java/com/baidu/bifromq/retain/store/index/RetainTopicIndexTest.java @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2024. The BifroMQ Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.baidu.bifromq.retain.store.index; + +import static org.testng.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class RetainTopicIndexTest { + private RetainTopicIndex index; + + @BeforeMethod + public void setUp() { + index = new RetainTopicIndex(); + } + + @Test + public void testMatch() { + String tenantId = "tenantA"; + add(tenantId, "/", "/a", "/b", "a", "a/", "a/b", "a/b/c", "$a", "$a/", "$a/b").join(); + assertMatch(tenantId, index.match(tenantId, "/"), "/"); + assertMatch(tenantId, index.match(tenantId, "/a"), "/a"); + assertMatch(tenantId, index.match(tenantId, "/b"), "/b"); + assertMatch(tenantId, index.match(tenantId, "a"), "a"); + assertMatch(tenantId, index.match(tenantId, "a/"), "a/"); + assertMatch(tenantId, index.match(tenantId, "a/b"), "a/b"); + assertMatch(tenantId, index.match(tenantId, "a/b/c"), "a/b/c"); + assertMatch(tenantId, index.match(tenantId, "$a"), "$a"); + assertMatch(tenantId, index.match(tenantId, "$a/"), "$a/"); + assertMatch(tenantId, index.match(tenantId, "$a/b"), "$a/b"); + + assertMatch(tenantId, index.match(tenantId, "")); + assertMatch(tenantId, index.match(tenantId, "fakeTopic")); + assertMatch(tenantId, index.match(tenantId, "+"), "a"); + assertMatch(tenantId, index.match(tenantId, "+/#"), "/", "/a", "/b", "a", "a/", "a/b", "a/b/c"); + assertMatch(tenantId, index.match(tenantId, "+/+"), "/", "/a", "/b", "a/", "a/b"); + + assertMatch(tenantId, index.match(tenantId, "#"), "/", "/a", "/b", "a", "a/", "a/b", "a/b/c"); + + assertMatch(tenantId, index.match(tenantId, "/+"), "/", "/a", "/b"); + assertMatch(tenantId, index.match(tenantId, "/#"), "/", "/a", "/b"); + + assertMatch(tenantId, index.match(tenantId, "a/+"), "a/", "a/b"); + assertMatch(tenantId, index.match(tenantId, "a/#"), "a", "a/", "a/b", "a/b/c"); + + assertMatch(tenantId, index.match(tenantId, "$a/+"), "$a/", "$a/b"); + assertMatch(tenantId, index.match(tenantId, "$a/+/#"), "$a/", "$a/b"); + assertMatch(tenantId, index.match(tenantId, "$a/#"), "$a", "$a/", "$a/b"); + + assertMatch(tenantId, index.match("tenantB", "#")); + } + + @Test + public void testFindAll() { + String tenantId = "tenantA"; + add(tenantId, "/", "/a", "/b", "a", "a/", "a/b", "a/b/c", "$a", "$a/", "$a/b").join(); + assertMatch(tenantId, index.findAll(), "/", "/a", "/b", "a", "a/", "a/b", "a/b/c", "$a", "$a/", "$a/b"); + } + + @Test + public void testRemove() { + String tenantId = "tenantA"; + add("/", "/a", "/b", "a", "a/", "a/b", "a/b/c", "$a", "$a/", "$a/b").join(); + index.remove(tenantId, "/"); + assertMatch(tenantId, index.match(tenantId, "/")); + index.remove(tenantId, "/a"); + assertMatch(tenantId, index.match(tenantId, "/a")); + index.remove(tenantId, "/b"); + assertMatch(tenantId, index.match(tenantId, "/b")); + index.remove(tenantId, "a"); + assertMatch(tenantId, index.match(tenantId, "a")); + index.remove(tenantId, "a/"); + assertMatch(tenantId, index.match(tenantId, "a/")); + index.remove(tenantId, "a/b"); + assertMatch(tenantId, index.match(tenantId, "a/b")); + index.remove(tenantId, "a/b/c"); + assertMatch(tenantId, index.match(tenantId, "a/b/c")); + index.remove(tenantId, "$a"); + assertMatch(tenantId, index.match(tenantId, "$a")); + index.remove(tenantId, "$a/"); + assertMatch(tenantId, index.match(tenantId, "$a/")); + index.remove(tenantId, "$a/b"); + assertMatch(tenantId, index.match(tenantId, "$a/b")); + assertMatch(tenantId, index.match(tenantId, "#")); + } + + + @Test + public void testEdgeCases() { + String tenantId = "tenantA"; + add(tenantId, "/", "/").join(); + assertMatch(tenantId, index.match(tenantId, "#"), "/"); + } + + private CompletableFuture add(String tenantId, String... topics) { + List> futures = new ArrayList<>(); + for (String topic : topics) { + futures.add(CompletableFuture.runAsync(() -> index.add(tenantId, topic, System.nanoTime(), 1))); + } + return CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)); + } + + private void assertMatch(String tenantId, List matches, String... expected) { + assertEquals(new HashSet<>(matches), + Set.of(expected).stream().map(topic -> new RetainedMsgInfo(tenantId, topic, 0, 0)) + .collect(Collectors.toSet())); + } +} diff --git a/bifromq-util/src/main/java/com/baidu/bifromq/util/TopicUtil.java b/bifromq-util/src/main/java/com/baidu/bifromq/util/TopicUtil.java index a556c9923..60801497a 100644 --- a/bifromq-util/src/main/java/com/baidu/bifromq/util/TopicUtil.java +++ b/bifromq-util/src/main/java/com/baidu/bifromq/util/TopicUtil.java @@ -13,13 +13,21 @@ package com.baidu.bifromq.util; +import static com.baidu.bifromq.util.TopicConst.DELIMITER; import static com.baidu.bifromq.util.TopicConst.DELIMITER_CHAR; import static com.baidu.bifromq.util.TopicConst.MULTIPLE_WILDCARD_CHAR; +import static com.baidu.bifromq.util.TopicConst.MULTI_WILDCARD; +import static com.baidu.bifromq.util.TopicConst.NUL; import static com.baidu.bifromq.util.TopicConst.NUL_CHAR; import static com.baidu.bifromq.util.TopicConst.ORDERED_SHARE; import static com.baidu.bifromq.util.TopicConst.SINGLE_WILDCARD_CHAR; import static com.baidu.bifromq.util.TopicConst.UNORDERED_SHARE; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.function.Function; + public class TopicUtil { private static final String PREFIX_UNORDERED_SHARE = UNORDERED_SHARE + DELIMITER_CHAR; private static final String PREFIX_ORDERED_SHARE = ORDERED_SHARE + DELIMITER_CHAR; @@ -148,26 +156,89 @@ public static boolean isValidTopicFilter(String topicFilter, int maxLevelLength, } public static boolean isWildcardTopicFilter(String topicFilter) { - return topicFilter.indexOf(SINGLE_WILDCARD_CHAR) >= 0 || topicFilter.indexOf(MULTIPLE_WILDCARD_CHAR) >= 0; + return topicFilter.indexOf(SINGLE_WILDCARD_CHAR) >= 0 || isMultiWildcardTopicFilter(topicFilter); + } + + public static boolean isMultiWildcardTopicFilter(String topicFilter) { + return topicFilter.endsWith(MULTI_WILDCARD); } public static boolean isSharedSubscription(String topicFilter) { - return topicFilter.startsWith(PREFIX_ORDERED_SHARE) || topicFilter.startsWith(PREFIX_UNORDERED_SHARE); + return isOrderedShared(topicFilter) || isUnorderedShared(topicFilter); } - public static String parseTopicFilter(String topicFilter) { - // must be valid topic filter - if (isSharedSubscription(topicFilter)) { - // validate share name - int i; - for (i = topicFilter.indexOf(DELIMITER_CHAR) + 1; i < topicFilter.length(); i++) { - char c = topicFilter.charAt(i); - if (c == DELIMITER_CHAR) { - break; - } + public static boolean isNormalTopicFilter(String topicFilter) { + return !isSharedSubscription(topicFilter); + } + + public static boolean isUnorderedShared(String topicFilter) { + return topicFilter.startsWith(PREFIX_UNORDERED_SHARE); + } + + public static boolean isOrderedShared(String topicFilter) { + return topicFilter.startsWith(PREFIX_ORDERED_SHARE); + } + + public static String escape(String topicFilter) { + assert !topicFilter.contains(NUL); + return topicFilter.replace(DELIMITER, NUL); + } + + public static String unescape(String topicFilter) { + return topicFilter.replace(NUL, DELIMITER); + } + + public static List parse(String tenantId, String topic, boolean isEscaped) { + List topicLevels = new ArrayList<>(); + topicLevels.add(tenantId); + return parse(topic, isEscaped, topicLevels); + } + + // parse a topic or topic filter string into a list of topic levels + // eg. "/" -> ["",""], "/a" -> ["",a], "a/" -> [a,""] + public static List parse(String topic, boolean isEscaped) { + return parse(topic, isEscaped, new ArrayList<>()); + } + + // parse a topic or topic filter string into a list of topic levels + // eg. "/" -> ["",""], "/a" -> ["",a], "a/" -> [a,""] + private static List parse(String topic, boolean isEscaped, List topicLevels) { + char splitter = isEscaped ? NUL_CHAR : DELIMITER_CHAR; + StringBuilder tl = new StringBuilder(); + for (int i = 0; i < topic.length(); i++) { + if (topic.charAt(i) == splitter) { + topicLevels.add(tl.toString()); + tl.delete(0, tl.length()); + } else { + tl.append(topic.charAt(i)); + } + } + topicLevels.add(tl.toString()); + return topicLevels; + } + + public static String fastJoin(CharSequence delimiter, Iterable strings) { + StringBuilder sb = new StringBuilder(); + Iterator itr = strings.iterator(); + while (itr.hasNext()) { + sb.append(itr.next()); + if (itr.hasNext()) { + sb.append(delimiter); + } + } + return sb.toString(); + } + + public static String fastJoin(CharSequence delimiter, Iterable items, + Function toCharSequence) { + StringBuilder sb = new StringBuilder(); + Iterator itr = items.iterator(); + while (itr.hasNext()) { + sb.append(toCharSequence.apply(itr.next())); + if (itr.hasNext()) { + sb.append(delimiter); } - return topicFilter.substring(i + 1); } - return topicFilter; + return sb.toString(); } } diff --git a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/index/Branch.java b/bifromq-util/src/main/java/com/baidu/bifromq/util/index/Branch.java similarity index 97% rename from bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/index/Branch.java rename to bifromq-util/src/main/java/com/baidu/bifromq/util/index/Branch.java index d304ffb07..1ed385cb7 100644 --- a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/index/Branch.java +++ b/bifromq-util/src/main/java/com/baidu/bifromq/util/index/Branch.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and limitations under the License. */ -package com.baidu.bifromq.dist.worker.index; +package com.baidu.bifromq.util.index; import java.util.ArrayList; import java.util.Collections; diff --git a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/index/CNode.java b/bifromq-util/src/main/java/com/baidu/bifromq/util/index/CNode.java similarity index 98% rename from bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/index/CNode.java rename to bifromq-util/src/main/java/com/baidu/bifromq/util/index/CNode.java index ac08dd582..738a825bd 100644 --- a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/index/CNode.java +++ b/bifromq-util/src/main/java/com/baidu/bifromq/util/index/CNode.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and limitations under the License. */ -package com.baidu.bifromq.dist.worker.index; +package com.baidu.bifromq.util.index; import java.util.HashMap; import java.util.List; diff --git a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/index/INode.java b/bifromq-util/src/main/java/com/baidu/bifromq/util/index/INode.java similarity index 96% rename from bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/index/INode.java rename to bifromq-util/src/main/java/com/baidu/bifromq/util/index/INode.java index b43abfbc1..9b073bd10 100644 --- a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/index/INode.java +++ b/bifromq-util/src/main/java/com/baidu/bifromq/util/index/INode.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and limitations under the License. */ -package com.baidu.bifromq.dist.worker.index; +package com.baidu.bifromq.util.index; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; diff --git a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/index/MainNode.java b/bifromq-util/src/main/java/com/baidu/bifromq/util/index/MainNode.java similarity index 95% rename from bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/index/MainNode.java rename to bifromq-util/src/main/java/com/baidu/bifromq/util/index/MainNode.java index 83c78a782..be631adcc 100644 --- a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/index/MainNode.java +++ b/bifromq-util/src/main/java/com/baidu/bifromq/util/index/MainNode.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and limitations under the License. */ -package com.baidu.bifromq.dist.worker.index; +package com.baidu.bifromq.util.index; public class MainNode { final CNode cNode; diff --git a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/index/TNode.java b/bifromq-util/src/main/java/com/baidu/bifromq/util/index/TNode.java similarity index 93% rename from bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/index/TNode.java rename to bifromq-util/src/main/java/com/baidu/bifromq/util/index/TNode.java index 88ff21906..23438fa38 100644 --- a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/index/TNode.java +++ b/bifromq-util/src/main/java/com/baidu/bifromq/util/index/TNode.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and limitations under the License. */ -package com.baidu.bifromq.dist.worker.index; +package com.baidu.bifromq.util.index; public class TNode { } diff --git a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/index/TopicLevelTrie.java b/bifromq-util/src/main/java/com/baidu/bifromq/util/index/TopicLevelTrie.java similarity index 99% rename from bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/index/TopicLevelTrie.java rename to bifromq-util/src/main/java/com/baidu/bifromq/util/index/TopicLevelTrie.java index ff04b1e42..3a22e93bb 100644 --- a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/index/TopicLevelTrie.java +++ b/bifromq-util/src/main/java/com/baidu/bifromq/util/index/TopicLevelTrie.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and limitations under the License. */ -package com.baidu.bifromq.dist.worker.index; +package com.baidu.bifromq.util.index; import java.util.ArrayList; import java.util.HashMap; diff --git a/bifromq-util/src/test/java/com/baidu/bifromq/util/TopicUtilsTest.java b/bifromq-util/src/test/java/com/baidu/bifromq/util/TopicUtilsTest.java index 188dec9c9..7003b9738 100644 --- a/bifromq-util/src/test/java/com/baidu/bifromq/util/TopicUtilsTest.java +++ b/bifromq-util/src/test/java/com/baidu/bifromq/util/TopicUtilsTest.java @@ -13,13 +13,45 @@ package com.baidu.bifromq.util; +import static com.baidu.bifromq.util.TopicUtil.escape; +import static com.baidu.bifromq.util.TopicUtil.isWildcardTopicFilter; +import static com.baidu.bifromq.util.TopicUtil.parse; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; -import org.testng.Assert; +import java.util.List; import org.testng.annotations.Test; public class TopicUtilsTest { + @Test + public void check() { + assertTrue(isWildcardTopicFilter("#")); + assertTrue(isWildcardTopicFilter("+")); + assertTrue(isWildcardTopicFilter("/#")); + assertTrue(isWildcardTopicFilter("/+")); + assertFalse(isWildcardTopicFilter("/")); + } + + @Test + public void testParse() { + assertEquals(parse("", false), List.of("")); + assertEquals(parse(" ", false), List.of(" ")); + assertEquals(parse("/", false), List.of("", "")); + assertEquals(parse(escape("/"), true), List.of("", "")); + assertEquals(parse("//", false), List.of("", "", "")); + assertEquals(parse(escape("//"), true), List.of("", "", "")); + assertEquals(parse(" //", false), List.of(" ", "", "")); + assertEquals(parse(escape(" //"), true), List.of(" ", "", "")); + assertEquals(parse(" / / ", false), List.of(" ", " ", " ")); + assertEquals(parse(escape(" / / "), true), List.of(" ", " ", " ")); + assertEquals(parse("a/", false), List.of("a", "")); + assertEquals(parse(escape("a/"), true), List.of("a", "")); + assertEquals(parse("a/b", false), List.of("a", "b")); + assertEquals(parse(escape("a/b"), true), List.of("a", "b")); + assertEquals(parse("a/b/", false), List.of("a", "b", "")); + assertEquals(parse(escape("a/b/"), true), List.of("a", "b", "")); + } @Test public void testIsValidTopic() { @@ -110,17 +142,4 @@ public void testIsValidTopicFilter() { assertFalse(TopicUtil.isValidTopicFilter("$share/g/#/a", 10, 4, 100)); } - - @Test - public void testParseTopicFilter() { - Assert.assertEquals("/a/b/c", TopicUtil.parseTopicFilter("$share/g1//a/b/c")); - Assert.assertEquals("/a/b/c", TopicUtil.parseTopicFilter("$oshare/g1//a/b/c")); - Assert.assertEquals("a/b/c", TopicUtil.parseTopicFilter("$share/g1/a/b/c")); - Assert.assertEquals("a/b/c", TopicUtil.parseTopicFilter("$oshare/g1/a/b/c")); - Assert.assertEquals("/a/b/c", TopicUtil.parseTopicFilter("/a/b/c")); - Assert.assertEquals("$share", TopicUtil.parseTopicFilter("$share")); - Assert.assertEquals("$oshare", TopicUtil.parseTopicFilter("$oshare")); - Assert.assertEquals("$oshared/a", TopicUtil.parseTopicFilter("$oshared/a")); - } - } diff --git a/build/build-bifromq-starters/src/main/java/com/baidu/bifromq/starter/config/standalone/model/StateStoreConfig.java b/build/build-bifromq-starters/src/main/java/com/baidu/bifromq/starter/config/standalone/model/StateStoreConfig.java index 9a6a2702b..164d405fa 100644 --- a/build/build-bifromq-starters/src/main/java/com/baidu/bifromq/starter/config/standalone/model/StateStoreConfig.java +++ b/build/build-bifromq-starters/src/main/java/com/baidu/bifromq/starter/config/standalone/model/StateStoreConfig.java @@ -108,7 +108,7 @@ public static class RetainStoreConfig { @JsonSetter(nulls = Nulls.SKIP) private StorageEngineConfig walEngineConfig = new RocksDBEngineConfig() .setManualCompaction(true) - .setCompactMinTombstoneKeys(5000) + .setCompactMinTombstoneKeys(2500) .setCompactMinTombstoneRanges(2); @JsonSetter(nulls = Nulls.SKIP) private BalancerOptions balanceConfig = new BalancerOptions();