From 5e76092e15a1806729d472afecfd132246f2d8ca Mon Sep 17 00:00:00 2001 From: Yonny Hao Date: Fri, 13 Sep 2024 15:23:27 +0800 Subject: [PATCH] 1. clean up unused codes 2. optimize range/replica lookup logic in DistCallScheduler --- .../bifromq/basekv/client/KVRangeSetting.java | 34 +- .../baidu/bifromq/dist/entity/EntityUtil.java | 33 +- .../dist/entity/SharedTopicFilter.java | 28 ++ .../bifromq/dist/trie/TopicTrieNode.java | 2 +- .../bifromq/dist/util/TopicFilterMatcher.java | 148 ------- .../bifromq/dist/util/TopicFilterTrie.java | 395 ------------------ .../dist/util/TopicFilterTrieIterator.java | 69 --- .../baidu/bifromq/dist/util/TopicTrie.java | 42 -- .../bifromq/dist/util/TopicTrieIterator.java | 47 --- .../baidu/bifromq/dist/util/TopicUtil.java | 250 +---------- .../com/baidu/bifromq/dist/util/TrieNode.java | 102 ----- .../java/com/baidu/bifromq/dist/TestUtil.java | 223 ++++++++++ .../com/baidu/bifromq/dist/TestUtilTest.java | 39 ++ .../com/baidu/bifromq/dist}/TopicMatcher.java | 5 +- .../baidu/bifromq/dist/TopicMatcherTest.java | 87 ++++ .../com/baidu/bifromq/dist/trie/Fixtures.java | 14 + .../dist/trie/TopicFilterIteratorTest.java | 35 +- .../benchmark/TopicTrieBuilderBenchmark.java | 4 +- .../TopicTrieBuilderBenchmarkState.java | 20 +- .../com/baidu/bifromq/dist/util/TestUtil.java | 87 ---- .../dist/util/TopicFilterMatcherTest.java | 176 -------- .../dist/util/TopicFilterTrieTest.java | 45 -- .../dist/util/TopicTrieBuilderTest.java | 65 --- .../bifromq/dist/util/TopicUtilTest.java | 78 ---- .../bifromq/dist/server/DistService.java | 3 +- .../server/scheduler/DistCallScheduler.java | 255 ++++++++--- 26 files changed, 699 insertions(+), 1587 deletions(-) create mode 100644 bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/entity/SharedTopicFilter.java delete mode 100644 bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicFilterMatcher.java delete mode 100644 bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicFilterTrie.java delete mode 100644 bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicFilterTrieIterator.java delete mode 100644 bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicTrie.java delete mode 100644 bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicTrieIterator.java delete mode 100644 bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TrieNode.java create mode 100644 bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TestUtil.java create mode 100644 bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TestUtilTest.java rename bifromq-dist/bifromq-dist-rpc-definition/src/{main/java/com/baidu/bifromq/dist/util => test/java/com/baidu/bifromq/dist}/TopicMatcher.java (96%) create mode 100644 bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TopicMatcherTest.java rename bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/{util => trie}/benchmark/TopicTrieBuilderBenchmark.java (93%) rename bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/{util => trie}/benchmark/TopicTrieBuilderBenchmarkState.java (69%) delete mode 100644 bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/TestUtil.java delete mode 100644 bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/TopicFilterMatcherTest.java delete mode 100644 bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/TopicFilterTrieTest.java delete mode 100644 bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/TopicTrieBuilderTest.java diff --git a/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/KVRangeSetting.java b/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/KVRangeSetting.java index b640f655a..cf2c6f2be 100644 --- a/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/KVRangeSetting.java +++ b/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/KVRangeSetting.java @@ -23,9 +23,9 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.ThreadLocalRandom; import lombok.EqualsAndHashCode; import lombok.ToString; @@ -51,12 +51,12 @@ public KVRangeSetting(String clusterId, String leaderStoreId, KVRangeDescriptor ver = desc.getVer(); boundary = desc.getBoundary(); leader = leaderStoreId; - Set voters = new HashSet<>(); - Set inProcVoters = new HashSet<>(); - Set followers = new HashSet<>(); - Set inProcFollowers = new HashSet<>(); - Set allReplicas = new HashSet<>(); - Set inProcReplicas = new HashSet<>(); + Set voters = new TreeSet<>(); + Set inProcVoters = new TreeSet<>(); + Set followers = new TreeSet<>(); + Set inProcFollowers = new TreeSet<>(); + Set allReplicas = new TreeSet<>(); + Set inProcReplicas = new TreeSet<>(); Set allVoters = Sets.newHashSet(Iterables.concat(desc.getConfig().getVotersList(), desc.getConfig().getNextVotersList())); @@ -97,10 +97,24 @@ public KVRangeSetting(String clusterId, String leaderStoreId, KVRangeDescriptor this.inProcReplicas = Collections.unmodifiableList(Lists.newArrayList(inProcReplicas)); } + public boolean hasInProcVoter() { + return !inProcVoters.isEmpty(); + } + + public boolean hasInProcReplica() { + return !inProcReplicas.isEmpty(); + } + public String randomReplica() { if (!inProcReplicas.isEmpty()) { + if (inProcReplicas.size() == 1) { + return inProcReplicas.get(0); + } return inProcReplicas.get(ThreadLocalRandom.current().nextInt(inProcReplicas.size())); } + if (allReplicas.size() == 1) { + return allReplicas.get(0); + } return allReplicas.get(ThreadLocalRandom.current().nextInt(allReplicas.size())); } @@ -108,8 +122,14 @@ public String randomVoters() { if (getInProcStores(clusterId).contains(leader)) { return leader; } else if (!inProcVoters.isEmpty()) { + if (inProcVoters.size() == 1) { + return inProcVoters.get(0); + } return inProcVoters.get(ThreadLocalRandom.current().nextInt(inProcVoters.size())); } + if (voters.size() == 1) { + return voters.get(0); + } return voters.get(ThreadLocalRandom.current().nextInt(voters.size())); } } diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/entity/EntityUtil.java b/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/entity/EntityUtil.java index e3ad15048..1c3df152c 100644 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/entity/EntityUtil.java +++ b/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/entity/EntityUtil.java @@ -15,7 +15,6 @@ import static com.baidu.bifromq.dist.util.TopicUtil.escape; import static com.baidu.bifromq.dist.util.TopicUtil.isNormalTopicFilter; -import static com.baidu.bifromq.dist.util.TopicUtil.parseSharedTopic; import static com.baidu.bifromq.dist.util.TopicUtil.unescape; import static com.baidu.bifromq.util.TopicConst.DELIMITER_CHAR; import static com.baidu.bifromq.util.TopicConst.NUL; @@ -25,7 +24,6 @@ import static com.google.protobuf.ByteString.copyFromUtf8; import com.baidu.bifromq.dist.rpc.proto.GroupMatchRecord; -import com.baidu.bifromq.dist.util.TopicUtil; import com.google.protobuf.ByteString; import java.nio.charset.StandardCharsets; import java.util.Base64; @@ -160,7 +158,7 @@ public static ByteString toNormalMatchRecordKey(String tenantId, String topicFil public static ByteString toGroupMatchRecordKey(String tenantId, String topicFilter) { assert !isNormalTopicFilter(topicFilter); - TopicUtil.SharedTopicFilter stf = parseSharedTopic(topicFilter); + SharedTopicFilter stf = parseSharedTopic(topicFilter); return matchRecordKeyPrefix(tenantId, stf.topicFilter) .concat(stf.ordered ? FLAG_ORDERD_SHARE : FLAG_UNORDERD_SHARE) .concat(copyFromUtf8(stf.shareGroup)); @@ -172,7 +170,7 @@ public static ByteString toMatchRecordKey(String tenantId, String topicFilter, S .concat(FLAG_NORMAL) .concat(copyFromUtf8(qInboxId)); } else { - TopicUtil.SharedTopicFilter stf = parseSharedTopic(topicFilter); + SharedTopicFilter stf = parseSharedTopic(topicFilter); return matchRecordKeyPrefix(tenantId, stf.topicFilter) .concat(stf.ordered ? FLAG_ORDERD_SHARE : FLAG_UNORDERD_SHARE) .concat(copyFromUtf8(stf.shareGroup)); @@ -187,11 +185,22 @@ public static ByteString toMatchRecordKeyPrefix(String tenantId, String topicFil if (isNormalTopicFilter(topicFilter)) { return matchRecordKeyPrefix(tenantId, topicFilter); } else { - TopicUtil.SharedTopicFilter stf = parseSharedTopic(topicFilter); + SharedTopicFilter stf = parseSharedTopic(topicFilter); return matchRecordKeyPrefix(tenantId, stf.topicFilter); } } + private static SharedTopicFilter parseSharedTopic(String topicFilter) { + assert !isNormalTopicFilter(topicFilter); + String sharePrefix = topicFilter.startsWith(UNORDERED_SHARE) ? UNORDERED_SHARE : ORDERED_SHARE; + boolean ordered = !topicFilter.startsWith(UNORDERED_SHARE); + String rest = topicFilter.substring((sharePrefix + DELIMITER_CHAR).length()); + int firstTopicSeparatorIndex = rest.indexOf(DELIMITER_CHAR); + String shareGroup = rest.substring(0, firstTopicSeparatorIndex); + return new SharedTopicFilter(topicFilter, ordered, shareGroup, + rest.substring(firstTopicSeparatorIndex + 1)); + } + public static String parseTopicFilter(String matchRecordKeyStr) { // <1> int firstSplit = matchRecordKeyStr.indexOf(NUL_CHAR); @@ -238,4 +247,18 @@ public static String parseOriginalTopicFilter(String matchRecordKeyStr) { } } } + + public static TenantAndEscapedTopicFilter parseTenantAndEscapedTopicFilter(ByteString matchRecordKey) { + String matchRecordKeyStr = matchRecordKey.toStringUtf8(); + int firstSplit = matchRecordKeyStr.indexOf(NUL_CHAR); + int lastSplit = matchRecordKeyStr.lastIndexOf(NUL_CHAR); + return new TenantAndEscapedTopicFilter(matchRecordKeyStr.substring(0, firstSplit), + matchRecordKeyStr.substring(firstSplit + 2, lastSplit)); + } + + public record TenantAndEscapedTopicFilter(String tenantId, String escapedTopicFilter) { + public String toGlobalTopicFilter() { + return tenantId + NUL + escapedTopicFilter; + } + } } diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/entity/SharedTopicFilter.java b/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/entity/SharedTopicFilter.java new file mode 100644 index 000000000..e5cc8594b --- /dev/null +++ b/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/entity/SharedTopicFilter.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2024. The BifroMQ Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.baidu.bifromq.dist.entity; + +public class SharedTopicFilter { + public String originTopicFilter; + public boolean ordered; + public String shareGroup; + public String topicFilter; + + public SharedTopicFilter(String originTopicFilter, boolean ordered, String shareName, String filter) { + this.originTopicFilter = originTopicFilter; + this.ordered = ordered; + this.shareGroup = shareName; + this.topicFilter = filter; + } +} diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/trie/TopicTrieNode.java b/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/trie/TopicTrieNode.java index 47e293541..8c59c4243 100644 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/trie/TopicTrieNode.java +++ b/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/trie/TopicTrieNode.java @@ -97,7 +97,7 @@ TopicTrieNode child(String levelName) { } /** - * The builder for building a TopicTrie. + * The builder for building a TopicTrieNode. * * @param the value associated with the topic */ diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicFilterMatcher.java b/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicFilterMatcher.java deleted file mode 100644 index fbfb8a214..000000000 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicFilterMatcher.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Copyright (c) 2023. The BifroMQ Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - */ - -package com.baidu.bifromq.dist.util; - -import static com.baidu.bifromq.util.TopicConst.MULTI_WILDCARD; -import static com.baidu.bifromq.util.TopicConst.NUL; -import static com.baidu.bifromq.util.TopicConst.SINGLE_WILDCARD; - -import com.baidu.bifromq.type.TopicMessage; -import com.baidu.bifromq.util.TopicConst; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -public class TopicFilterMatcher { - private final TopicTrie topicTrie; - private final TopicFilterTrieIterator iterator; - - public TopicFilterMatcher(TopicTrie root) { - topicTrie = root; - iterator = new TopicFilterTrieIterator(root); - } - - public Optional>> match(String escapedTopicFilter) { - List filterLevels = parse(escapedTopicFilter); - Map> matchedTopics = new HashMap<>(); - match(0, filterLevels, topicTrie.root(), matchedTopics); - return matchedTopics.isEmpty() ? Optional.empty() : Optional.of(matchedTopics); - } - - private void match(int matchingLevel, List filterLevels, TrieNode node, - Map> matchedTopics) { - String filterName = filterLevels.get(matchingLevel); - if (matchingLevel + 1 < filterLevels.size() && filterLevels.get(matchingLevel + 1).equals("#")) { - // # match parent level as well. [MQTT-4.7.1-2] - if (node.isLastTopicLevel()) { - if (node.levelName().equals(filterName) || filterName.equals("+")) { - // current node matches real topic - filterLevels.set(matchingLevel, node.levelName()); - matchedTopics.put(TopicUtil.fastJoin("/", filterLevels.subList(1, matchingLevel + 1)), - node.messages()); - filterLevels.set(matchingLevel, filterName); - } - } - } - switch (filterName) { - case MULTI_WILDCARD: - if (matchingLevel == 1 && node.levelName().startsWith(TopicConst.SYS_PREFIX)) { - // system topic should not be matched by first "#". [MQTT-4.7.2-1] - break; - } - // '#' matches current node, so we replacing it with actual level name - filterLevels.set(matchingLevel, node.levelName()); - if (node.isLastTopicLevel()) { - // current node matches real topic - matchedTopics.put(TopicUtil.fastJoin("/", filterLevels.subList(1, filterLevels.size())), - node.messages()); - } - // append a # for matching next level - filterLevels.add("#"); - for (TrieNode childNode : node.children()) { - match(matchingLevel + 1, filterLevels, childNode, matchedTopics); - } - filterLevels.set(matchingLevel, "#"); - filterLevels.remove(matchingLevel + 1); - break; - case SINGLE_WILDCARD: - if (matchingLevel == 1 && node.levelName().startsWith(TopicConst.SYS_PREFIX)) { - // system topic should not be matched by first "+". [MQTT-4.7.2-1] - break; - } - // '+' matches current node, so we replacing it with actual level name - filterLevels.set(matchingLevel, node.levelName()); - if (matchingLevel + 1 == filterLevels.size()) { - // if no more level to match and current node matches real topic - if (node.isLastTopicLevel()) { - matchedTopics.put(TopicUtil.fastJoin("/", filterLevels.subList(1, filterLevels.size())), - node.messages()); - } - } else { - // else there are more levels to match - for (TrieNode childNode : node.children()) { - match(matchingLevel + 1, filterLevels, childNode, matchedTopics); - } - } - filterLevels.set(matchingLevel, "+"); - break; - default: - if (filterName.equals(node.levelName())) { - if (matchingLevel + 1 == filterLevels.size()) { - // if no more level to match and current node matches real topic - if (node.isLastTopicLevel()) { - // current node matches real topic - matchedTopics.put(TopicUtil.fastJoin("/", filterLevels.subList(1, filterLevels.size())), - node.messages()); - } - } else { - // else there are more levels to match - for (TrieNode childNode : node.children()) { - match(matchingLevel + 1, filterLevels, childNode, matchedTopics); - } - } - } - } - } - - public Optional higher(String escapedTopicFilter) { - iterator.forward(escapedTopicFilter); - while (iterator.hasNext()) { - String topicFilter = iterator.next(); - if (topicFilter.compareTo(escapedTopicFilter) > 0) { - return Optional.of(topicFilter); - } - } - return Optional.empty(); - } - - private static List parse(String escapedTopicFilter) { - List topicLevels = new ArrayList<>(); - topicLevels.add(NUL); // always starts with NUL - char splitter = '\u0000'; - StringBuilder tl = new StringBuilder(); - for (int i = 0; i < escapedTopicFilter.length(); i++) { - if (escapedTopicFilter.charAt(i) == splitter) { - topicLevels.add(tl.toString()); - tl.delete(0, tl.length()); - } else { - tl.append(escapedTopicFilter.charAt(i)); - } - } - topicLevels.add(tl.toString()); - return topicLevels; - } - -} diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicFilterTrie.java b/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicFilterTrie.java deleted file mode 100644 index a3dd6d889..000000000 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicFilterTrie.java +++ /dev/null @@ -1,395 +0,0 @@ -/* - * Copyright (c) 2023. The BifroMQ Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - */ - -package com.baidu.bifromq.dist.util; - -import static com.baidu.bifromq.util.TopicConst.MULTI_WILDCARD; -import static com.baidu.bifromq.util.TopicConst.NUL; -import static com.baidu.bifromq.util.TopicConst.SINGLE_WILDCARD; -import static com.baidu.bifromq.util.TopicConst.SYS_PREFIX; -import static com.google.common.collect.Lists.newLinkedList; -import static java.util.Collections.emptyIterator; -import static java.util.Collections.emptyList; - -import com.google.common.collect.AbstractIterator; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Optional; -import javax.annotation.CheckForNull; - -public abstract class TopicFilterTrie { - private static final List MULTI_FILTER = List.of(TrieNode.MULTI); - private static final List MULTI_SINGLE_FILTER = List.of(TrieNode.MULTI, TrieNode.SINGLE); - - public static TopicFilterTrie build(TopicTrie trie) { - return new TopicFilterTrieRoot(trie.root()); - } - - abstract String levelName(); - - abstract boolean isMatchTopic(); - - abstract Iterator children(); - - abstract void forward(List filterLevels); - - private static class TopicFilterTrieRoot extends TopicFilterTrie { - private final TrieNode root; - private final boolean allSySTopics; - private final IChildren children; - - TopicFilterTrieRoot(TrieNode trieRoot) { - assert trieRoot.levelName().equals(NUL); - this.root = trieRoot; - allSySTopics = root.children().stream().allMatch(child -> child.levelName().startsWith(SYS_PREFIX)); - if (root.children().isEmpty()) { - children = NoChildren.INSTANCE; - } else { - if (allSySTopics) { - children = new MergedChildren(List.of(root.children()), true); - } else { - List> mergingChildren = List.of(MULTI_SINGLE_FILTER, root.children()); - children = new MergedChildren(mergingChildren, true); - } - } - } - - String levelName() { - return root.levelName(); - } - - @Override - boolean isMatchTopic() { - return false; - } - - @Override - Iterator children() { - return children.iterator(); - } - - @Override - void forward(List filterLevels) { - // filterLevels must be no less than floorFilterLevels in byte-wise order - children.forward(filterLevels); - } - } - - private static class TopicFilterTrieMultiLevel extends TopicFilterTrie { - static final TopicFilterTrie INSTANCE = new TopicFilterTrieMultiLevel(); - - @Override - String levelName() { - return MULTI_WILDCARD; - } - - @Override - boolean isMatchTopic() { - return true; - } - - @Override - Iterator children() { - return emptyIterator(); - } - - @Override - void forward(List filterLevels) { - - } - } - - private static class TopicFilterTrieMergedLevel extends TopicFilterTrie { - private final String levelName; - private final boolean isMatchTopic; - - private final IChildren children; - - TopicFilterTrieMergedLevel(String levelName, List> childrenList, boolean isMatchTopic) { - this.levelName = levelName; - this.isMatchTopic = isMatchTopic; - List> mergingChildren = new ArrayList<>(childrenList.size() + 1); - if (!levelName.isEmpty()) { - // # matching parent level - mergingChildren.add(childrenList.isEmpty() ? MULTI_FILTER : MULTI_SINGLE_FILTER); - mergingChildren.addAll(childrenList); - } else if (!childrenList.isEmpty()) { - mergingChildren.add(MULTI_SINGLE_FILTER); - mergingChildren.addAll(childrenList); - } - children = new MergedChildren(mergingChildren); - } - - @Override - String levelName() { - return levelName; - } - - @Override - boolean isMatchTopic() { - return isMatchTopic; - } - - @Override - Iterator children() { - return children.iterator(); - } - - @Override - void forward(List filterLevels) { - children.forward(filterLevels); - } - } - - private interface IChildren extends Iterable { - void forward(List filterLevels); - } - - private static class NoChildren implements IChildren { - public static final IChildren INSTANCE = new NoChildren(); - - @Override - public void forward(List filterLevels) { - - } - - @Override - public Iterator iterator() { - return emptyIterator(); - } - } - - private static class MergedChildren implements IChildren { - private final List> mergingChildren; - private final boolean skipSysTopic; - private final ChildAccessor childAccessor; - - - MergedChildren(List> mergingChildren) { - this(mergingChildren, false); - } - - MergedChildren(List> mergingChildren, boolean skipSysTopic) { - this.mergingChildren = mergingChildren; - this.skipSysTopic = skipSysTopic; - this.childAccessor = new ChildAccessor(); - forward(emptyList()); - } - - public void forward(List filterLevels) { - childAccessor.forward(filterLevels); - } - - @Override - public Iterator iterator() { - return new AbstractIterator<>() { - private final ChildAccessor childIterAccessor = childAccessor.duplicate(); - - @CheckForNull - @Override - protected TopicFilterTrie computeNext() { - TopicFilterTrie next = childIterAccessor.currentFilterTrie(); - if (next == null) { - return endOfData(); - } - // find next - childIterAccessor.next(); - return next; - } - }; - } - - private class ChildAccessor { - private final int[] pos; - private int next = -1; - private TopicFilterTrie filterTrie = null; // the generated filter trie corresponding to current trie node - private boolean done; - - ChildAccessor() { - pos = new int[mergingChildren.size()]; - findNext(); - } - - ChildAccessor(int[] pos, int next, boolean done, TopicFilterTrie filterTrie) { - this.pos = Arrays.copyOf(pos, pos.length); - this.next = next; - this.done = done; - this.filterTrie = filterTrie; - } - - ChildAccessor duplicate() { - return new ChildAccessor(pos, next, done, filterTrie); - } - - TopicFilterTrie currentFilterTrie() { - return filterTrie; - } - - void forward(List filterLevels) { - Optional currentChild = current(); - if (!currentChild.isPresent()) { - return; - } - List childFloorFilterLevels = emptyList(); - if (!filterLevels.isEmpty()) { - String floorLevelName = filterLevels.get(0); - int c = currentChild.get().levelName().compareTo(floorLevelName); - if (c == 0) { - // level name match - if (filterLevels.size() > 1) { - // more filter levels to compare - childFloorFilterLevels = filterLevels.subList(1, filterLevels.size()); - } else { - // current level equals or greater - childFloorFilterLevels = emptyList(); - } - } else if (c < 0) { - // forward to next smallest - advance(); - currentChild = current(); - while (currentChild.isPresent()) { - if (currentChild.get().levelName().compareTo(floorLevelName) < 0) { - advance(); - currentChild = current(); - } else { - break; - } - } - if (currentChild.isPresent()) { - if (currentChild.get().levelName().compareTo(floorLevelName) == 0) { - // level name match - if (filterLevels.size() > 1) { - // more filter levels to compare - childFloorFilterLevels = filterLevels.subList(1, filterLevels.size()); - } else { - // current level equals or greater - childFloorFilterLevels = emptyList(); - } - } else { - // level name is greater, no more filter levels to compare - childFloorFilterLevels = emptyList(); - } - } - } - } else { - childFloorFilterLevels = emptyList(); - } - if (currentChild.isPresent()) { - genFilterTrie(childFloorFilterLevels.isEmpty()); - currentFilterTrie().forward(childFloorFilterLevels); - } - } - - void next() { - if (advance()) { - genFilterTrie(true); - } - } - - // if has next - private boolean advance() { - if (next >= 0) { - pos[next]++; - findNext(); - } - filterTrie = null; - return next >= 0; - } - - private void findNext() { - if (done) { - return; - } - next = -1; - for (int i = 0; i < pos.length; i++) { - if (pos[i] < mergingChildren.get(i).size()) { - if (next < 0) { - next = i; - } else { - if (mergingChildren.get(i).get(pos[i]).levelName() - .compareTo(mergingChildren.get(next).get(pos[next]).levelName()) < 0) { - next = i; - } - } - } - } - done = next < 0; - } - - private Optional current() { - return next >= 0 ? Optional.of(mergingChildren.get(next).get(pos[next])) : Optional.empty(); - } - - private void genFilterTrie(boolean startMatching) { - assert current().isPresent(); - TrieNode node = current().get(); - switch (node.levelName()) { - case MULTI_WILDCARD: { - filterTrie = TopicFilterTrie.TopicFilterTrieMultiLevel.INSTANCE; - break; - } - case SINGLE_WILDCARD: { - // merge children's children - LinkedList> grandChildren = newLinkedList(); - boolean isMatchTopic = false; - for (int i = 1; i < mergingChildren.size(); i++) { - for (TrieNode child : mergingChildren.get(i)) { - if (skipSysTopic && child.levelName().startsWith(SYS_PREFIX)) { - continue; - } - if (child.isLastTopicLevel()) { - isMatchTopic = true; - } - if (!child.children().isEmpty()) { - grandChildren.add(child.children()); - } - } - } - filterTrie = new TopicFilterTrie.TopicFilterTrieMergedLevel(SINGLE_WILDCARD, grandChildren, - startMatching && isMatchTopic); - break; - } - default: { - LinkedList> grandChildren = newLinkedList(); - if (!node.children().isEmpty()) { - grandChildren.add(node.children()); - } - boolean isMatchTopic = node.isLastTopicLevel(); - - // merge all children of trie node with same level name - ChildAccessor childAccessor = duplicate(); - childAccessor.advance(); - Optional nextNode = childAccessor.current(); - while (nextNode.isPresent()) { - if (nextNode.get().levelName().equals(node.levelName())) { - isMatchTopic |= nextNode.get().isLastTopicLevel(); - grandChildren.add(nextNode.get().children()); - childAccessor.advance(); - nextNode = childAccessor.current(); - } else { - break; - } - } - filterTrie = new TopicFilterTrie.TopicFilterTrieMergedLevel(node.levelName(), grandChildren, - startMatching && isMatchTopic); - break; - } - } - } - } - } -} - diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicFilterTrieIterator.java b/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicFilterTrieIterator.java deleted file mode 100644 index 1768a6c58..000000000 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicFilterTrieIterator.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (c) 2023. The BifroMQ Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - */ - -package com.baidu.bifromq.dist.util; - -import static com.baidu.bifromq.dist.util.TopicUtil.fastJoin; -import static com.baidu.bifromq.util.TopicConst.NUL; - -import com.google.common.collect.AbstractIterator; -import java.util.Iterator; -import javax.annotation.CheckForNull; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.map.LinkedMap; - -@Slf4j -public class TopicFilterTrieIterator extends AbstractIterator { - private final TopicFilterTrie root; - private final LinkedMap> current; - - public TopicFilterTrieIterator(TopicTrie topicTrie) { - root = TopicFilterTrie.build(topicTrie); - this.current = new LinkedMap<>(); - this.current.put(root, root.children()); - } - - public void forward(String escapedTopicFilter) { - root.forward(TopicUtil.parse(escapedTopicFilter, true)); - this.current.clear(); - this.current.put(root, root.children()); - } - - @CheckForNull - @Override - protected String computeNext() { - findNext(); - if (current.isEmpty()) { - return endOfData(); - } - return fastJoin(NUL, current.keySet(), TopicFilterTrie::levelName).substring(2); - } - - private void findNext() { - if (current.isEmpty()) { - return; - } - TopicFilterTrie node = current.lastKey(); - Iterator childItr = current.get(node); - if (childItr.hasNext()) { - TopicFilterTrie child = childItr.next(); - current.put(child, child.children()); - if (!child.isMatchTopic()) { - findNext(); - } - } else { - current.remove(node); - findNext(); - } - } -} diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicTrie.java b/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicTrie.java deleted file mode 100644 index 8b9286a6b..000000000 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicTrie.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2023. The BifroMQ Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - */ - -package com.baidu.bifromq.dist.util; - -import com.baidu.bifromq.type.TopicMessage; -import java.util.List; - -public class TopicTrie { - private int topicCount; - private final TrieNode root; - - public TopicTrie() { - root = new TrieNode(); - } - - public TrieNode root() { - return root; - } - - public void add(String topic, Iterable messages) { - assert !topic.isEmpty(); - List topicLevels = TopicUtil.parse(topic, false); - if (root.add(topicLevels, messages)) { - topicCount++; - } - } - - public int topicCount() { - return topicCount; - } -} diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicTrieIterator.java b/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicTrieIterator.java deleted file mode 100644 index 6f26063ae..000000000 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicTrieIterator.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2023. The BifroMQ Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - */ - -package com.baidu.bifromq.dist.util; - -import static com.baidu.bifromq.dist.util.TopicUtil.fastJoin; -import static com.baidu.bifromq.util.TopicConst.DELIMITER; -import static java.util.Collections.singleton; - -import com.baidu.bifromq.type.TopicMessage; -import com.google.common.collect.Lists; -import java.util.LinkedList; -import java.util.List; - -public interface TopicTrieIterator { - static void iterate(TrieNode root, TopicTrieIterator iterator) { - LinkedList> toVisit = Lists.newLinkedList(); - toVisit.add(Lists.newLinkedList(singleton(root))); - while (!toVisit.isEmpty()) { - LinkedList current = toVisit.poll(); - TrieNode lastTopicLevel = current.getLast(); - if (lastTopicLevel.isLastTopicLevel()) { - TrieNode first = current.removeFirst(); - iterator.next(fastJoin(DELIMITER, current, TrieNode::levelName), lastTopicLevel.messages()); - current.addFirst(first); - } - List children = lastTopicLevel.children(); - for (int i = children.size() - 1; i >= 0; i--) { - LinkedList next = Lists.newLinkedList(current); - next.addLast(children.get(i)); - toVisit.addFirst(next); - } - } - } - - void next(String topic, Iterable messages); -} diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicUtil.java b/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicUtil.java index 5e024437e..64f2dcb15 100644 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicUtil.java +++ b/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicUtil.java @@ -20,226 +20,14 @@ import static com.baidu.bifromq.util.TopicConst.NUL_CHAR; import static com.baidu.bifromq.util.TopicConst.ORDERED_SHARE; import static com.baidu.bifromq.util.TopicConst.SINGLE_WILDCARD; -import static com.baidu.bifromq.util.TopicConst.SYS_PREFIX; import static com.baidu.bifromq.util.TopicConst.UNORDERED_SHARE; -import static com.google.common.collect.Lists.newArrayList; -import static com.google.common.collect.Lists.newLinkedList; -import static java.util.Collections.singleton; -import com.baidu.bifromq.type.TopicMessage; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; -import java.util.Optional; -import java.util.TreeMap; import java.util.function.Function; -import java.util.stream.Collectors; public class TopicUtil { - public static List expand(String topic) { - List topicLevels = parse(topic, false); - List topicFilters = newArrayList(); - LinkedList> toVisit = newLinkedList(); - String rootLevel = topicLevels.get(0); - if (rootLevel.startsWith(SYS_PREFIX)) { - // sys topic don't match "#" and "+" - toVisit.add(newLinkedList(singleton(rootLevel))); - } else { - toVisit.addAll(toFilters(rootLevel)); - } - while (!toVisit.isEmpty()) { - LinkedList currentFilterLevels = toVisit.pollFirst(); - switch (currentFilterLevels.getLast()) { - case MULTI_WILDCARD: - topicFilters.add(fastJoin(NUL, currentFilterLevels)); - break; - case SINGLE_WILDCARD: - default: - if (currentFilterLevels.size() == topicLevels.size()) { - String tf = fastJoin(NUL, currentFilterLevels); - topicFilters.add(tf); - if (!currentFilterLevels.getLast().isEmpty()) { - // "#" also matches parent level if current level name is not empty string - topicFilters.add(fastJoin(NUL, List.of(tf, MULTI_WILDCARD))); - } - } else { - toFilters(topicLevels.get(currentFilterLevels.size())) - .descendingIterator() - .forEachRemaining(f -> { - currentFilterLevels.descendingIterator().forEachRemaining(f::addFirst); - toVisit.addFirst(f); - }); - } - } - } - return topicFilters; - } - - public static List expand(TopicTrie trie) { - List topicFilters = newArrayList(); - LinkedList> toVisit = newLinkedList(); - toVisit.addAll(genFilters(trie.root())); - while (!toVisit.isEmpty()) { - LinkedList currentFilterLevels = toVisit.pollFirst(); - TrieNode lastFilter = currentFilterLevels.getLast(); - switch (lastFilter.levelName()) { - case MULTI_WILDCARD: - topicFilters.add(fastJoin(NUL, currentFilterLevels, TrieNode::levelName)); - break; - case SINGLE_WILDCARD: - default: - TrieNode lastFilterLevel = currentFilterLevels.getLast(); - if (lastFilterLevel.isLastTopicLevel()) { - String tf = fastJoin(NUL, currentFilterLevels, TrieNode::levelName); - topicFilters.add(tf); - } - genFilters(lastFilterLevel).descendingIterator() - .forEachRemaining(f -> { - currentFilterLevels.descendingIterator().forEachRemaining(f::addFirst); - toVisit.addFirst(f); - }); - } - } - return topicFilters; - } - - public static Optional findNext(List topicLevels, String escapedTopicFilter) { - // the algorithm is O(log(n)) - String rootLevel = topicLevels.get(0); - LinkedList> toVisit = newLinkedList(); - if (rootLevel.startsWith(SYS_PREFIX)) { - // sys topic don't match "#" and "+" - toVisit.add(newLinkedList(singleton(rootLevel))); - } else { - toVisit.addAll(toFilters(rootLevel)); - } - - String nextFilter = null; - out: - while (!toVisit.isEmpty()) { - LinkedList currentFilterLevels = toVisit.pollFirst(); - switch (currentFilterLevels.getLast()) { - case MULTI_WILDCARD: { - String current = fastJoin(NUL, currentFilterLevels); - if (escapedTopicFilter.compareTo(current) < 0) { - nextFilter = current; - break out; - } else { - while (!toVisit.isEmpty()) { - String next = fastJoin(NUL, toVisit.peekFirst()); - if (escapedTopicFilter.compareTo(next) <= 0 || escapedTopicFilter.startsWith(next)) { - break; - } else { - toVisit.pollFirst(); - } - } - } - break; - } - case SINGLE_WILDCARD: - default: { - if (currentFilterLevels.size() == topicLevels.size()) { - String current = fastJoin(NUL, currentFilterLevels); - if (escapedTopicFilter.compareTo(current) < 0) { - nextFilter = current; - break out; - } - current = fastJoin(NUL, List.of(current, MULTI_WILDCARD)); - if (escapedTopicFilter.compareTo(current) < 0) { - nextFilter = current; - break out; - } - while (!toVisit.isEmpty()) { - String next = fastJoin(NUL, toVisit.peekFirst()); - if (escapedTopicFilter.compareTo(next) <= 0 || escapedTopicFilter.startsWith(next)) { - break; - } else { - toVisit.pollFirst(); - } - } - } else { - toFilters(topicLevels.get(currentFilterLevels.size())).descendingIterator() - .forEachRemaining(f -> { - currentFilterLevels.descendingIterator().forEachRemaining(f::addFirst); - toVisit.addFirst(f); - }); - } - - } - } - } - return Optional.ofNullable(nextFilter); - } - - private static LinkedList> genFilters(TrieNode node) { - LinkedList> filters = newLinkedList(); - if (node.levelName().equals(NUL) - && node.children().stream().allMatch(child -> child.levelName().startsWith(SYS_PREFIX))) { - // SYS topics are not matched by # and + - if (!node.children().isEmpty()) { - node.children().forEach(c -> filters.add(newLinkedList(singleton(c)))); - } - } else { - if (!node.children().isEmpty()) { - // # and + matches all non SYS topics - TrieNode singleLevelFilter = merge(node.children().stream() - .filter(child -> !node.levelName().equals(NUL) || !child.levelName().startsWith(SYS_PREFIX)) - .map(child -> child.duplicate(SINGLE_WILDCARD)).collect(Collectors.toList())) - .iterator().next(); - merge(Iterables.concat(newArrayList(singleLevelFilter, TrieNode.MULTI), node.children())) - .forEach(n -> filters.add(newLinkedList(singleton(n)))); - } else if (!node.levelName().isEmpty()) { - // # also match parent level if current level name is not empty string - filters.add(newLinkedList(singleton(TrieNode.MULTI))); - } - } - return filters; - } - - private static Iterable merge(Iterable nodes) { - class MergedState { - Iterable messages = new LinkedList<>(); - final List childList = new ArrayList<>(); - } - TreeMap merged = new TreeMap<>(); - for (TrieNode node : nodes) { - merged.compute(node.levelName(), (k, b) -> { - if (b == null) { - b = new MergedState(); - } - b.messages = Iterables.concat(b.messages, node.messages()); - Iterable mergedChildren = merge(Iterables.concat(b.childList, node.children())); - b.childList.clear(); - b.childList.addAll(Lists.newLinkedList(mergedChildren)); - return b; - }); - } - return Iterables.transform(merged.entrySet(), - entry -> new TrieNode(entry.getKey(), newArrayList(entry.getValue().messages), - entry.getValue().childList)); - } - - private static LinkedList> toFilters(String topicLevel) { - LinkedList> filters = newLinkedList(); - if (MULTI_WILDCARD.compareTo(topicLevel) > 0) { - filters.add(newLinkedList(singleton(topicLevel))); - filters.add(newLinkedList(singleton(MULTI_WILDCARD))); - filters.add(newLinkedList(singleton(SINGLE_WILDCARD))); - } else if (SINGLE_WILDCARD.compareTo(topicLevel) > 0) { - filters.add(newLinkedList(singleton(MULTI_WILDCARD))); - filters.add(newLinkedList(singleton(topicLevel))); - filters.add(newLinkedList(singleton(SINGLE_WILDCARD))); - } else { - filters.add(newLinkedList(singleton(MULTI_WILDCARD))); - filters.add(newLinkedList(singleton(SINGLE_WILDCARD))); - filters.add(newLinkedList(singleton(topicLevel))); - } - return filters; - } - public static String escape(String topicFilter) { assert !topicFilter.contains(NUL); return topicFilter.replace(DELIMITER, NUL); @@ -249,10 +37,21 @@ public static String unescape(String topicFilter) { return topicFilter.replace(NUL, DELIMITER); } + public static List parse(String tenantId, String topic, boolean isEscaped) { + List topicLevels = new ArrayList<>(); + topicLevels.add(tenantId); + return parse(topic, isEscaped, topicLevels); + } + // parse a topic or topic filter string into a list of topic levels // eg. "/" -> ["",""], "/a" -> ["",a], "a/" -> [a,""] public static List parse(String topic, boolean isEscaped) { - List topicLevels = new ArrayList<>(); + return parse(topic, isEscaped, new ArrayList<>()); + } + + // parse a topic or topic filter string into a list of topic levels + // eg. "/" -> ["",""], "/a" -> ["",a], "a/" -> [a,""] + private static List parse(String topic, boolean isEscaped, List topicLevels) { char splitter = isEscaped ? NUL_CHAR : DELIMITER_CHAR; StringBuilder tl = new StringBuilder(); for (int i = 0; i < topic.length(); i++) { @@ -283,31 +82,6 @@ public static boolean isOrderedShared(String topicFilter) { return topicFilter.startsWith(ORDERED_SHARE); } - public static SharedTopicFilter parseSharedTopic(String topicFilter) { - assert !isNormalTopicFilter(topicFilter); - String sharePrefix = topicFilter.startsWith(UNORDERED_SHARE) ? UNORDERED_SHARE : ORDERED_SHARE; - boolean ordered = !topicFilter.startsWith(UNORDERED_SHARE); - String rest = topicFilter.substring((sharePrefix + DELIMITER_CHAR).length()); - int firstTopicSeparatorIndex = rest.indexOf(DELIMITER_CHAR); - String shareGroup = rest.substring(0, firstTopicSeparatorIndex); - return new SharedTopicFilter(topicFilter, ordered, shareGroup, - rest.substring(firstTopicSeparatorIndex + 1)); - } - - public static class SharedTopicFilter { - public String originTopicFilter; - public boolean ordered; - public String shareGroup; - public String topicFilter; - - SharedTopicFilter(String originTopicFilter, boolean ordered, String shareName, String filter) { - this.originTopicFilter = originTopicFilter; - this.ordered = ordered; - this.shareGroup = shareName; - this.topicFilter = filter; - } - } - public static String fastJoin(CharSequence delimiter, Iterable strings) { StringBuilder sb = new StringBuilder(); Iterator itr = strings.iterator(); diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TrieNode.java b/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TrieNode.java deleted file mode 100644 index be5e603f8..000000000 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TrieNode.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright (c) 2023. The BifroMQ Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - */ - -package com.baidu.bifromq.dist.util; - -import static com.baidu.bifromq.util.TopicConst.MULTI_WILDCARD; -import static com.baidu.bifromq.util.TopicConst.NUL; -import static com.baidu.bifromq.util.TopicConst.SINGLE_WILDCARD; - -import com.baidu.bifromq.type.TopicMessage; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -public class TrieNode { - public static final TrieNode MULTI = new TrieNode(MULTI_WILDCARD); - public static final TrieNode SINGLE = new TrieNode(SINGLE_WILDCARD); - private final String levelName; - private final Map childMap = new HashMap<>(); - private boolean isLastTopicLevel; - private Iterable messages; - private List sortedChildList; - - public TrieNode() { - this(NUL); - } - - public TrieNode(String levelName) { - this(levelName, new LinkedList<>(), Collections.emptyList()); - } - - public TrieNode(String levelName, - Iterable msgs, - Iterable childList) { - this.levelName = levelName; - messages = msgs; - isLastTopicLevel = messages.iterator().hasNext(); - childList.forEach(child -> childMap.put(child.levelName, child)); - } - - public String levelName() { - return levelName; - } - - public boolean isLastTopicLevel() { - return isLastTopicLevel; - } - - public Iterable messages() { - return messages; - } - - public List children() { - if (sortedChildList == null) { - sortedChildList = Lists.newArrayList(childMap.values()); - sortedChildList.sort(Comparator.comparing(TrieNode::levelName)); - } - return sortedChildList; - } - - public boolean add(List topicLevels, Iterable messages) { - assert !topicLevels.isEmpty(); - String childLevelName = topicLevels.get(0); - TrieNode child = childMap.computeIfAbsent(childLevelName, k -> { - // new child added, reset the sorted list - sortedChildList = null; - return new TrieNode(k); - }); - if (topicLevels.size() > 1) { - return child.add(topicLevels.subList(1, topicLevels.size()), messages); - } else { - boolean newTopic = !child.isLastTopicLevel(); - child.addMessages(messages); - assert child.isLastTopicLevel(); - return newTopic; - } - } - - public TrieNode duplicate(String levelName) { - return new TrieNode(levelName, this.messages, this.childMap.values()); - } - - void addMessages(Iterable msgs) { - messages = Iterables.concat(messages, msgs); - isLastTopicLevel = messages.iterator().hasNext(); - } -} diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TestUtil.java b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TestUtil.java new file mode 100644 index 000000000..664f43f82 --- /dev/null +++ b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TestUtil.java @@ -0,0 +1,223 @@ +/* + * Copyright (c) 2023. The BifroMQ Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.baidu.bifromq.dist; + +import static com.baidu.bifromq.dist.util.TopicUtil.fastJoin; +import static com.baidu.bifromq.dist.util.TopicUtil.parse; +import static com.baidu.bifromq.util.TopicConst.DELIMITER; +import static com.baidu.bifromq.util.TopicConst.MULTI_WILDCARD; +import static com.baidu.bifromq.util.TopicConst.NUL; +import static com.baidu.bifromq.util.TopicConst.SINGLE_WILDCARD; +import static com.baidu.bifromq.util.TopicConst.SYS_PREFIX; +import static com.google.common.collect.Lists.newArrayList; +import static com.google.common.collect.Lists.newLinkedList; +import static java.util.Collections.singleton; + +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Optional; +import java.util.concurrent.ThreadLocalRandom; + +public class TestUtil { + public static String randomTopic() { + return randomTopic(16); + } + + public static String randomTopicFilter() { + return randomTopicFilter(16); + } + + public static String randomTopic(int maxLevel) { + int levels = ThreadLocalRandom.current().nextInt(1, maxLevel + 1); + String[] topicLevels = new String[levels]; + for (int i = 0; i < levels; i++) { + topicLevels[i] = RandomTopicName.nextString(8); + } + if (ThreadLocalRandom.current().nextFloat() > 0.5) { + topicLevels[0] = SYS_PREFIX + topicLevels[0];// make + } + String topic = String.join(DELIMITER, topicLevels); + return ThreadLocalRandom.current().nextFloat() > 0.5 ? DELIMITER + topic : topic; + } + + public static String randomTopicFilter(int maxLevel) { + int levels = ThreadLocalRandom.current().nextInt(1, maxLevel + 1); + String[] filterLevels = new String[levels]; + for (int i = 0; i < levels; i++) { + filterLevels[i] = RandomTopicName.nextString(8, true, i < levels - 1); + } + return String.join(DELIMITER, filterLevels); + } + + public static List expand(String topic) { + List topicLevels = parse(topic, false); + List topicFilters = newArrayList(); + LinkedList> toVisit = newLinkedList(); + String rootLevel = topicLevels.get(0); + if (rootLevel.startsWith(SYS_PREFIX)) { + // sys topic don't match "#" and "+" + toVisit.add(newLinkedList(singleton(rootLevel))); + } else { + toVisit.addAll(toFilters(rootLevel)); + } + while (!toVisit.isEmpty()) { + LinkedList currentFilterLevels = toVisit.pollFirst(); + switch (currentFilterLevels.getLast()) { + case MULTI_WILDCARD: + topicFilters.add(fastJoin(NUL, currentFilterLevels)); + break; + case SINGLE_WILDCARD: + default: + if (currentFilterLevels.size() == topicLevels.size()) { + String tf = fastJoin(NUL, currentFilterLevels); + topicFilters.add(tf); + if (!currentFilterLevels.getLast().isEmpty()) { + // "#" also matches parent level if current level name is not empty string + topicFilters.add(fastJoin(NUL, List.of(tf, MULTI_WILDCARD))); + } + } else { + toFilters(topicLevels.get(currentFilterLevels.size())) + .descendingIterator() + .forEachRemaining(f -> { + currentFilterLevels.descendingIterator().forEachRemaining(f::addFirst); + toVisit.addFirst(f); + }); + } + } + } + return topicFilters; + } + + public static Optional findNext(List topicLevels, String escapedTopicFilter) { + // the algorithm is O(log(n)) + String rootLevel = topicLevels.get(0); + LinkedList> toVisit = newLinkedList(); + if (rootLevel.startsWith(SYS_PREFIX)) { + // sys topic don't match "#" and "+" + toVisit.add(newLinkedList(singleton(rootLevel))); + } else { + toVisit.addAll(toFilters(rootLevel)); + } + + String nextFilter = null; + out: + while (!toVisit.isEmpty()) { + LinkedList currentFilterLevels = toVisit.pollFirst(); + switch (currentFilterLevels.getLast()) { + case MULTI_WILDCARD: { + String current = fastJoin(NUL, currentFilterLevels); + if (escapedTopicFilter.compareTo(current) < 0) { + nextFilter = current; + break out; + } else { + while (!toVisit.isEmpty()) { + String next = fastJoin(NUL, toVisit.peekFirst()); + if (escapedTopicFilter.compareTo(next) <= 0 || escapedTopicFilter.startsWith(next)) { + break; + } else { + toVisit.pollFirst(); + } + } + } + break; + } + case SINGLE_WILDCARD: + default: { + if (currentFilterLevels.size() == topicLevels.size()) { + String current = fastJoin(NUL, currentFilterLevels); + if (escapedTopicFilter.compareTo(current) < 0) { + nextFilter = current; + break out; + } + current = fastJoin(NUL, List.of(current, MULTI_WILDCARD)); + if (escapedTopicFilter.compareTo(current) < 0) { + nextFilter = current; + break out; + } + while (!toVisit.isEmpty()) { + String next = fastJoin(NUL, toVisit.peekFirst()); + if (escapedTopicFilter.compareTo(next) <= 0 || escapedTopicFilter.startsWith(next)) { + break; + } else { + toVisit.pollFirst(); + } + } + } else { + toFilters(topicLevels.get(currentFilterLevels.size())).descendingIterator() + .forEachRemaining(f -> { + currentFilterLevels.descendingIterator().forEachRemaining(f::addFirst); + toVisit.addFirst(f); + }); + } + + } + } + } + return Optional.ofNullable(nextFilter); + } + + private static LinkedList> toFilters(String topicLevel) { + LinkedList> filters = newLinkedList(); + if (MULTI_WILDCARD.compareTo(topicLevel) > 0) { + filters.add(newLinkedList(singleton(topicLevel))); + filters.add(newLinkedList(singleton(MULTI_WILDCARD))); + filters.add(newLinkedList(singleton(SINGLE_WILDCARD))); + } else if (SINGLE_WILDCARD.compareTo(topicLevel) > 0) { + filters.add(newLinkedList(singleton(MULTI_WILDCARD))); + filters.add(newLinkedList(singleton(topicLevel))); + filters.add(newLinkedList(singleton(SINGLE_WILDCARD))); + } else { + filters.add(newLinkedList(singleton(MULTI_WILDCARD))); + filters.add(newLinkedList(singleton(SINGLE_WILDCARD))); + filters.add(newLinkedList(singleton(topicLevel))); + } + return filters; + } + + private static class RandomTopicName { + public static final String upper = "ABCDEFGHIJKLMNOPQRSTUVWXYZ你好😄"; + + public static final String lower = upper.toLowerCase(Locale.ROOT); + + public static final String digits = "0123456789"; + + public static final String puncs = " !\"$%&'()*,-."; + + private static final ThreadLocalRandom random = ThreadLocalRandom.current(); + + private static final char[] symbols = (upper + lower + digits + puncs).toCharArray(); + + public static String nextString(int maxLength) { + int strLen = random.nextInt(1, maxLength); + char[] buf = new char[strLen]; + for (int idx = 0; idx < buf.length; ++idx) { + buf[idx] = symbols[random.nextInt(symbols.length)]; + } + return new String(buf); + } + + public static String nextString(int maxLength, boolean includeWildcard, boolean singleWildcardOnly) { + if (includeWildcard && random.nextFloat() > 0.5) { + if (singleWildcardOnly) { + return "+"; + } else { + return "#"; + } + } else { + return nextString(maxLength); + } + } + } +} diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TestUtilTest.java b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TestUtilTest.java new file mode 100644 index 000000000..84c5fcf9a --- /dev/null +++ b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TestUtilTest.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2024. The BifroMQ Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.baidu.bifromq.dist; + +import static org.testng.Assert.assertEquals; + +import com.google.common.collect.Lists; +import java.util.List; +import org.testng.annotations.Test; + +public class TestUtilTest { + @Test + public void testExpand() { + List topicFilters = TestUtil.expand(TestUtil.randomTopic()); + List copy = Lists.newArrayList(topicFilters); + topicFilters.sort(String::compareTo); + assertEquals(topicFilters, copy); + } + + @Test + public void testExpandSysTopic() { + String topic = "$sys/a/b/c"; + List topicFilters = TestUtil.expand(topic); + List copy = Lists.newArrayList(topicFilters); + topicFilters.sort(String::compareTo); + assertEquals(topicFilters, copy); + } +} diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicMatcher.java b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TopicMatcher.java similarity index 96% rename from bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicMatcher.java rename to bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TopicMatcher.java index 322597bd4..e1878bbb3 100644 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/main/java/com/baidu/bifromq/dist/util/TopicMatcher.java +++ b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TopicMatcher.java @@ -11,14 +11,15 @@ * See the License for the specific language governing permissions and limitations under the License. */ -package com.baidu.bifromq.dist.util; +package com.baidu.bifromq.dist; -import static com.baidu.bifromq.dist.util.TopicUtil.findNext; +import static com.baidu.bifromq.dist.TestUtil.findNext; import static com.baidu.bifromq.util.TopicConst.MULTI_WILDCARD; import static com.baidu.bifromq.util.TopicConst.SINGLE_WILDCARD; import static com.baidu.bifromq.util.TopicConst.SYS_PREFIX; +import com.baidu.bifromq.dist.util.TopicUtil; import java.util.List; import java.util.Optional; diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TopicMatcherTest.java b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TopicMatcherTest.java new file mode 100644 index 000000000..a3e69c7fb --- /dev/null +++ b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/TopicMatcherTest.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2024. The BifroMQ Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.baidu.bifromq.dist; + +import static com.baidu.bifromq.dist.util.TopicUtil.escape; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import com.google.common.collect.Sets; +import java.util.List; +import java.util.NavigableSet; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.testng.annotations.Test; + +@Slf4j +public class TopicMatcherTest { + @Test + public void testMatch() { + String topic = TestUtil.randomTopic(); + TopicMatcher matcher = new TopicMatcher(topic); + List topicFilters = TestUtil.expand(topic); + log.debug("Topic '{}' matches {} TopicFilters", topic, topicFilters.size()); + for (int i = topicFilters.size() - 1; i >= 0; i--) { + String topicFilter = topicFilters.get(i); + assertTrue(matcher.match(topicFilter)); + if (i < topicFilters.size() - 1) { + assertEquals(matcher.next(topicFilter).get(), topicFilters.get(i + 1)); + } else { + assertFalse(matcher.next(topicFilter).isPresent()); + } + } + } + + @Test + public void testMatchSysTopic() { + String topic = "$sys/baidu/user/event/abc"; + TopicMatcher matcher = new TopicMatcher(topic); + List topicFilters = TestUtil.expand(topic); + for (int i = topicFilters.size() - 1; i >= 0; i--) { + String topicFilter = topicFilters.get(i); + assertTrue(matcher.match(topicFilter)); + if (i < topicFilters.size() - 1) { + assertEquals(matcher.next(topicFilter).get(), topicFilters.get(i + 1)); + } else { + assertFalse(matcher.next(topicFilter).isPresent()); + } + } + assertFalse(matcher.match("#")); + assertFalse(matcher.match("+")); + assertFalse(matcher.match("+/+/+/+/+")); + } + + @Test + public void testMatchRandomly() { + int j = 100; + while (j-- > 0) { + String topic = TestUtil.randomTopic(); + TopicMatcher matcher = new TopicMatcher(topic); + NavigableSet topicFilters = Sets.newTreeSet(TestUtil.expand(topic)); + int i = 100; + while (i-- > 0) { + String topicFilter = TestUtil.randomTopicFilter(); + boolean matched = matcher.match(escape(topicFilter)); + if (!Optional.ofNullable(topicFilters.higher(escape(topicFilter))) + .equals(matcher.next(escape(topicFilter)))) { + log.info("'{}' matches '{}'? {}, next '{}'", + topicFilter, topic, matched, matcher.next(escape(topicFilter)).orElse(null)); + fail(); + } + } + } + } +} diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/trie/Fixtures.java b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/trie/Fixtures.java index 2840d931b..0c5a6cdf6 100644 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/trie/Fixtures.java +++ b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/trie/Fixtures.java @@ -30,6 +30,19 @@ public class Fixtures { "tenantA/a", "tenantA/a/#" )); + GlobalTopicToFilters.put("tenantA/a/b", List.of( + "tenantA/#", + "tenantA/+/#", + "tenantA/+/+", + "tenantA/+/+/#", + "tenantA/+/b", + "tenantA/+/b/#", + "tenantA/a/#", + "tenantA/a/+", + "tenantA/a/+/#", + "tenantA/a/b", + "tenantA/a/b/#" + )); GlobalTopicToFilters.put("tenantA/$sys/a", List.of( "tenantA/$sys/#", @@ -61,6 +74,7 @@ public class Fixtures { "a/#" )); + LocalTopicToFilters.put("$sys/a", List.of( "$sys/#", "$sys/+", diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/trie/TopicFilterIteratorTest.java b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/trie/TopicFilterIteratorTest.java index 0a60ada72..ce5bd4875 100644 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/trie/TopicFilterIteratorTest.java +++ b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/trie/TopicFilterIteratorTest.java @@ -13,8 +13,8 @@ package com.baidu.bifromq.dist.trie; -import static com.baidu.bifromq.dist.util.TestUtil.randomTopic; -import static com.baidu.bifromq.dist.util.TestUtil.randomTopicFilter; +import static com.baidu.bifromq.dist.TestUtil.randomTopic; +import static com.baidu.bifromq.dist.TestUtil.randomTopicFilter; import static com.baidu.bifromq.dist.util.TopicUtil.escape; import static com.baidu.bifromq.dist.util.TopicUtil.fastJoin; import static com.baidu.bifromq.dist.util.TopicUtil.parse; @@ -22,7 +22,9 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import com.baidu.bifromq.dist.TestUtil; import com.baidu.bifromq.dist.util.TopicUtil; +import com.baidu.bifromq.util.TopicConst; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -44,7 +46,7 @@ public void expandRandomLocalTopic() { for (; itr.isValid(); itr.next()) { generated.add(fastJoin(NUL, itr.key())); } - List allFilters = TopicUtil.expand(topic); + List allFilters = TestUtil.expand(topic); assertEquals(generated, allFilters); } @@ -295,6 +297,32 @@ public void associatedValues() { assertEquals(itr.value().get(List.of("a", "b")), Set.of("v2")); } + @Test + public void localSysTopicMatch() { + TopicTrieNode.Builder builder = TopicTrieNode.builder(false); + builder.addTopic(parse("$sys/a", false), "v1"); + builder.addTopic(parse("a/b", false), "v2"); + builder.addTopic(parse("c", false), "v3"); + TopicFilterIterator itr = new TopicFilterIterator<>(builder.build()); + itr.seek(List.of("#")); + assertEquals(itr.value().size(), 2); + assertEquals(itr.value().get(List.of("a", "b")), Set.of("v2")); + assertEquals(itr.value().get(List.of("c")), Set.of("v3")); + } + + @Test + public void globalSysTopicMatch() { + TopicTrieNode.Builder builder = TopicTrieNode.builder(true); + builder.addTopic(parse("tenant/$sys/a", false), "v1"); + builder.addTopic(parse("tenant/a/b", false), "v2"); + builder.addTopic(parse("tenant/c", false), "v3"); + TopicFilterIterator itr = new TopicFilterIterator<>(builder.build()); + itr.seek(List.of("tenant", "#")); + assertEquals(itr.value().size(), 2); + assertEquals(itr.value().get(List.of("tenant", "a", "b")), Set.of("v2")); + assertEquals(itr.value().get(List.of("tenant", "c")), Set.of("v3")); + } + private void expandTopics(Map> topicToFilters, boolean isGlobal) { TopicTrieNode.Builder topicTrieBuilder = TopicTrieNode.builder(isGlobal); Set allTopicFilters = new HashSet<>(); @@ -307,6 +335,7 @@ private void expandTopics(Map> topicToFilters, boolean isGl List generated = new ArrayList<>(); for (; iterator.isValid(); iterator.next()) { generated.add(fastJoin(NUL, iterator.key())); + log.info("{}-{}", fastJoin(TopicConst.DELIMITER, iterator.key()), iterator.value().values()); } assertEquals(sortedTopicFilters, generated); } diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/benchmark/TopicTrieBuilderBenchmark.java b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/trie/benchmark/TopicTrieBuilderBenchmark.java similarity index 93% rename from bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/benchmark/TopicTrieBuilderBenchmark.java rename to bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/trie/benchmark/TopicTrieBuilderBenchmark.java index 38980b851..6629ff3cd 100644 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/benchmark/TopicTrieBuilderBenchmark.java +++ b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/trie/benchmark/TopicTrieBuilderBenchmark.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and limitations under the License. */ -package com.baidu.bifromq.dist.util.benchmark; +package com.baidu.bifromq.dist.trie.benchmark; import lombok.SneakyThrows; import org.openjdk.jmh.annotations.Benchmark; @@ -41,6 +41,6 @@ public static void main(String[] args) { @Threads(1) @Fork(1) public void testBuild(TopicTrieBuilderBenchmarkState state) { - state.topicTrie.add(state.randomTopic(), state.distMessages); + state.topicTrieBuilder.addTopic(state.randomTopic(), "value"); } } diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/benchmark/TopicTrieBuilderBenchmarkState.java b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/trie/benchmark/TopicTrieBuilderBenchmarkState.java similarity index 69% rename from bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/benchmark/TopicTrieBuilderBenchmarkState.java rename to bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/trie/benchmark/TopicTrieBuilderBenchmarkState.java index d4a3164d8..52d4d9120 100644 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/benchmark/TopicTrieBuilderBenchmarkState.java +++ b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/trie/benchmark/TopicTrieBuilderBenchmarkState.java @@ -11,11 +11,11 @@ * See the License for the specific language governing permissions and limitations under the License. */ -package com.baidu.bifromq.dist.util.benchmark; +package com.baidu.bifromq.dist.trie.benchmark; -import com.baidu.bifromq.dist.util.TestUtil; -import com.baidu.bifromq.dist.util.TopicTrie; -import com.baidu.bifromq.type.TopicMessage; +import com.baidu.bifromq.dist.trie.TopicTrieNode; +import com.baidu.bifromq.dist.TestUtil; +import com.baidu.bifromq.dist.util.TopicUtil; import java.util.List; import lombok.extern.slf4j.Slf4j; import org.openjdk.jmh.annotations.Level; @@ -27,12 +27,11 @@ @Slf4j @State(Scope.Thread) public class TopicTrieBuilderBenchmarkState { - public TopicTrie topicTrie; - public final List distMessages = List.of(TopicMessage.getDefaultInstance()); + public TopicTrieNode.Builder topicTrieBuilder; @Setup(Level.Iteration) public void setup() { - topicTrie = new TopicTrie(); + topicTrieBuilder = TopicTrieNode.builder(false); } /* @@ -40,11 +39,10 @@ public void setup() { */ @TearDown(Level.Iteration) - public void teardown() { - log.info("topic count: {}", topicTrie.topicCount()); + public void tearDown() { } - public String randomTopic() { - return TestUtil.randomTopic(); + public List randomTopic() { + return TopicUtil.parse(TestUtil.randomTopic(), true); } } diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/TestUtil.java b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/TestUtil.java deleted file mode 100644 index 7d1062683..000000000 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/TestUtil.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright (c) 2023. The BifroMQ Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - */ - -package com.baidu.bifromq.dist.util; - -import static com.baidu.bifromq.util.TopicConst.DELIMITER; -import static com.baidu.bifromq.util.TopicConst.SYS_PREFIX; - -import java.util.Locale; -import java.util.concurrent.ThreadLocalRandom; - -public class TestUtil { - public static String randomTopic() { - return randomTopic(16); - } - - public static String randomTopicFilter() { - return randomTopicFilter(16); - } - - public static String randomTopic(int maxLevel) { - int levels = ThreadLocalRandom.current().nextInt(1, maxLevel + 1); - String[] topicLevels = new String[levels]; - for (int i = 0; i < levels; i++) { - topicLevels[i] = RandomTopicName.nextString(8); - } - if (ThreadLocalRandom.current().nextFloat() > 0.5) { - topicLevels[0] = SYS_PREFIX + topicLevels[0];// make - } - String topic = String.join(DELIMITER, topicLevels); - return ThreadLocalRandom.current().nextFloat() > 0.5 ? DELIMITER + topic : topic; - } - - public static String randomTopicFilter(int maxLevel) { - int levels = ThreadLocalRandom.current().nextInt(1, maxLevel + 1); - String[] filterLevels = new String[levels]; - for (int i = 0; i < levels; i++) { - filterLevels[i] = RandomTopicName.nextString(8, true, i < levels - 1); - } - return String.join(DELIMITER, filterLevels); - } - - private static class RandomTopicName { - public static final String upper = "ABCDEFGHIJKLMNOPQRSTUVWXYZ你好😄"; - - public static final String lower = upper.toLowerCase(Locale.ROOT); - - public static final String digits = "0123456789"; - - public static final String puncs = " !\"$%&'()*,-."; - - private static final ThreadLocalRandom random = ThreadLocalRandom.current(); - - private static final char[] symbols = (upper + lower + digits + puncs).toCharArray(); - - public static String nextString(int maxLength) { - int strLen = random.nextInt(1, maxLength); - char[] buf = new char[strLen]; - for (int idx = 0; idx < buf.length; ++idx) { - buf[idx] = symbols[random.nextInt(symbols.length)]; - } - return new String(buf); - } - - public static String nextString(int maxLength, boolean includeWildcard, boolean singleWildcardOnly) { - if (includeWildcard && random.nextFloat() > 0.5) { - if (singleWildcardOnly) { - return "+"; - } else { - return "#"; - } - } else { - return nextString(maxLength); - } - } - } -} diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/TopicFilterMatcherTest.java b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/TopicFilterMatcherTest.java deleted file mode 100644 index 82f3b9b8a..000000000 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/TopicFilterMatcherTest.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Copyright (c) 2023. The BifroMQ Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - */ - -package com.baidu.bifromq.dist.util; - - -import static com.baidu.bifromq.dist.util.TopicUtil.escape; -import static com.baidu.bifromq.dist.util.TopicUtil.parse; -import static com.baidu.bifromq.dist.util.TopicUtil.unescape; -import static java.util.Collections.singleton; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; - -import com.baidu.bifromq.type.TopicMessage; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.TreeSet; -import java.util.concurrent.ThreadLocalRandom; -import lombok.extern.slf4j.Slf4j; -import org.testng.annotations.Test; - -@Slf4j -public class TopicFilterMatcherTest { - - @Test - public void testParse() { - assertEquals(parse("/", false), Lists.newArrayList("", "")); - assertEquals(parse(escape("/"), true), Lists.newArrayList("", "")); - assertEquals(parse("//", false), Lists.newArrayList("", "", "")); - assertEquals(parse(escape("//"), true), Lists.newArrayList("", "", "")); - assertEquals(parse(" //", false), Lists.newArrayList(" ", "", "")); - assertEquals(parse(escape(" //"), true), Lists.newArrayList(" ", "", "")); - assertEquals(parse(" / / ", false), Lists.newArrayList(" ", " ", " ")); - assertEquals(parse(escape(" / / "), true), Lists.newArrayList(" ", " ", " ")); - assertEquals(parse("a/", false), Lists.newArrayList("a", "")); - assertEquals(parse(escape("a/"), true), Lists.newArrayList("a", "")); - assertEquals(parse("a/b", false), Lists.newArrayList("a", "b")); - assertEquals(parse(escape("a/b"), true), Lists.newArrayList("a", "b")); - assertEquals(parse("a/b/", false), Lists.newArrayList("a", "b", "")); - assertEquals(parse(escape("a/b/"), true), Lists.newArrayList("a", "b", "")); - } - -// @Test -// public void testExpand() { -// TopicTrieBuilder builder = new TopicTrieBuilder(); -// builder.add("a/", List.of(ClientMessages.getDefaultInstance())); -// TopicTrieNode root = builder.build(); -// -// TopicUtil.expand(root).forEach(tf -> log.info("TopicFilter {}", TopicUtil.unescape(tf))); -// } - - @Test - public void testEmptyMatcher() { - TopicTrie trie = new TopicTrie(); - TopicFilterMatcher topicFilterMatcher = new TopicFilterMatcher(trie); - assertTrue(TopicUtil.expand(trie).isEmpty()); - assertFalse(topicFilterMatcher.match(TopicUtil.escape(TestUtil.randomTopicFilter())).isPresent()); - } - - @Test - public void testMatch() { - long s = System.nanoTime(); - List topics = new ArrayList<>(); - int topicCount = ThreadLocalRandom.current().nextInt(1, 11); - log.info("Expanding {} topics", topicCount); - for (int i = 0; i < topicCount; i++) { - String topic = TestUtil.randomTopic(); - topics.add(topic); - } - TopicTrie trie = new TopicTrie(); - topics.forEach( - topic -> trie.add(topic, singleton(TopicMessage.newBuilder().setTopic(topic).build()))); - List topicFilters = TopicUtil.expand(trie); - log.info("Expand {} topics into {} topic filters costs {}ms", topicCount, topicFilters.size(), - Duration.ofNanos(System.nanoTime() - s).toMillis()); - TreeSet verifySet = Sets.newTreeSet(topicFilters); - assertEquals(topicFilters, Lists.newArrayList(verifySet)); - - TopicFilterMatcher matcher = new TopicFilterMatcher(trie); - for (String topicFilter : topicFilters) { - Optional>> matched = matcher.match(topicFilter); - assertTrue(matched.isPresent()); - assertTrue(matched.get().keySet().stream().anyMatch(t -> topics.contains(t))); - assertTrue(verifySet.contains(topicFilter)); - } - } - - @Test - public void specialCases() { - TopicTrie trie = new TopicTrie(); - trie.add("a", singleton(TopicMessage.newBuilder().setTopic("a").build())); - trie.add("a/b", singleton(TopicMessage.newBuilder().setTopic("a/b").build())); - TopicFilterMatcher matcher = new TopicFilterMatcher(trie); - assertTrue(matcher.match(escape("a/#")).get().containsKey("a")); - assertTrue(matcher.match(escape("a/#")).get().containsKey("a/b")); - assertTrue(matcher.match(escape("b/#")).isEmpty()); - assertTrue(matcher.match(escape("a/b/#")).get().containsKey("a/b")); - assertTrue(matcher.match(escape("a/+")).get().containsKey("a/b")); - assertTrue(matcher.match(escape("a/+/#")).get().containsKey("a/b")); - assertTrue(matcher.match(escape("a/c/#")).isEmpty()); - - trie = new TopicTrie(); - trie.add("/", singleton(TopicMessage.newBuilder().setTopic("a").build())); - trie.add("/a", singleton(TopicMessage.newBuilder().setTopic("a/b").build())); - matcher = new TopicFilterMatcher(trie); - assertTrue(matcher.match(escape("+")).isEmpty()); - assertTrue(matcher.match(escape("+/+")).get().containsKey("/")); - assertTrue(matcher.match(escape("+/+")).get().containsKey("/a")); - - assertTrue(matcher.match(escape("#")).get().containsKey("/")); - assertTrue(matcher.match(escape("#")).get().containsKey("/a")); - - assertTrue(matcher.match(escape("+/#")).get().containsKey("/")); - assertTrue(matcher.match(escape("+/#")).get().containsKey("/a")); - - assertTrue(matcher.match(escape("/b/#")).isEmpty()); - assertTrue(matcher.match(escape("+/b")).isEmpty()); - } - - @Test - public void testFindingHigherTopicFilter() { - long s = System.nanoTime(); - List topics = new ArrayList<>(); - int topicCount = ThreadLocalRandom.current().nextInt(1, 11); - log.info("Expanding {} topics", topicCount); - for (int i = 0; i < topicCount; i++) { - String topic = TestUtil.randomTopic(); - topics.add(topic); - } - TopicTrie trie = new TopicTrie(); - topics.forEach( - topic -> trie.add(topic, singleton(TopicMessage.newBuilder().setTopic(topic).build()))); - List topicFilters = TopicUtil.expand(trie); - log.info("Expand {} topics into {} topic filters costs {}ms", topicCount, topicFilters.size(), - Duration.ofNanos(System.nanoTime() - s).toMillis()); - TreeSet verifySet = Sets.newTreeSet(topicFilters); - assertEquals(topicFilters, Lists.newArrayList(verifySet)); - - - // test set includes additional random topic filters which may not match - TreeSet testSet = Sets.newTreeSet(topicFilters); - for (int i = 0; i < verifySet.size(); i++) { - testSet.add(TestUtil.randomTopicFilter()); - } - - TopicFilterMatcher matcher = new TopicFilterMatcher(trie); - for (String topicFilter : testSet) { - Optional nextTopicFilter = matcher.higher(topicFilter); - if (!Optional.ofNullable(verifySet.higher(topicFilter)).equals(nextTopicFilter)) { - log.error("Expect {} after matching {}, but is {}", - unescape(verifySet.higher(topicFilter)), - unescape(topicFilter), - unescape(nextTopicFilter.orElse(""))); - fail(); - } - } - } - -} diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/TopicFilterTrieTest.java b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/TopicFilterTrieTest.java deleted file mode 100644 index aa8c1519f..000000000 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/TopicFilterTrieTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2023. The BifroMQ Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - */ - -package com.baidu.bifromq.dist.util; - -import static com.baidu.bifromq.dist.util.TestUtil.randomTopic; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; - -import com.baidu.bifromq.type.TopicMessage; -import com.google.common.collect.Lists; -import java.util.Collections; -import java.util.List; -import lombok.extern.slf4j.Slf4j; -import org.testng.annotations.Test; - -@Slf4j -public class TopicFilterTrieTest { - - @Test - public void iterateOverEmptyTrie() { - TopicFilterTrieIterator itr = new TopicFilterTrieIterator(new TopicTrie()); - assertFalse(itr.hasNext()); - } - - @Test - public void expandTopic() { - TopicTrie topicTrie = new TopicTrie(); - String topic = randomTopic(); - topicTrie.add(topic, Collections.singleton(TopicMessage.newBuilder().setTopic(topic).build())); - TopicFilterTrieIterator itr = new TopicFilterTrieIterator(topicTrie); - List allFilters = TopicUtil.expand(topicTrie); - assertEquals(Lists.newArrayList(itr), allFilters); - } -} diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/TopicTrieBuilderTest.java b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/TopicTrieBuilderTest.java deleted file mode 100644 index 33dc79a89..000000000 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/TopicTrieBuilderTest.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (c) 2023. The BifroMQ Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - */ - -package com.baidu.bifromq.dist.util; - -import static org.testng.Assert.assertEquals; - -import com.baidu.bifromq.type.TopicMessage; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import lombok.extern.slf4j.Slf4j; -import org.testng.annotations.Test; - -@Slf4j -public class TopicTrieBuilderTest { - @Test - public void testStats() { - TopicTrie trie = new TopicTrie(); - assertEquals(trie.topicCount(), 0); - - trie.add("a", Collections.singleton(TopicMessage.newBuilder().setTopic("a").build())); - assertEquals(trie.topicCount(), 1); - - trie.add("a", Collections.singleton(TopicMessage.newBuilder().setTopic("a").build())); - assertEquals(trie.topicCount(), 1); - - trie.add("a/", Collections.singleton(TopicMessage.newBuilder().setTopic("a/").build())); - assertEquals(trie.topicCount(), 2); - } - - @Test - public void testIterate() { - TopicTrie trie = new TopicTrie(); - int topicCount = 3; - List topics = new ArrayList<>(topicCount); - for (int i = 0; i < topicCount; i++) { - topics.add("" + i); - } - topics.sort(Comparator.comparing(TopicUtil::escape)); - topics.forEach( - topic -> trie.add(topic, Collections.singleton(TopicMessage.newBuilder().setTopic(topic).build()))); - TrieNode root = trie.root(); - - List toVerify = new ArrayList<>(); - TopicTrieIterator.iterate(root, (topic, msgs) -> toVerify.add(topic)); - for (int i = 0; i < topics.size(); i++) { - if (!topics.get(i).equals(toVerify.get(i))) { - log.info("{}: {}", i, topics.get(i)); - } - } - assertEquals(toVerify, topics); - } -} diff --git a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/TopicUtilTest.java b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/TopicUtilTest.java index 6c2537691..8dab26511 100644 --- a/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/TopicUtilTest.java +++ b/bifromq-dist/bifromq-dist-rpc-definition/src/test/java/com/baidu/bifromq/dist/util/TopicUtilTest.java @@ -20,13 +20,8 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import java.util.List; -import java.util.NavigableSet; -import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.testng.annotations.Test; @@ -61,77 +56,4 @@ public void testParse() { assertEquals(parse(escape("a/b/"), true), Lists.newArrayList("a", "b", "")); } - @Test - public void testExpand() { - List topicFilters = TopicUtil.expand(TestUtil.randomTopic()); - List copy = Lists.newArrayList(topicFilters); - topicFilters.sort(String::compareTo); - assertEquals(topicFilters, copy); - } - - @Test - public void testExpandSysTopic() { - String topic = "$sys/a/b/c"; - List topicFilters = TopicUtil.expand(topic); - List copy = Lists.newArrayList(topicFilters); - topicFilters.sort(String::compareTo); - assertEquals(topicFilters, copy); - } - - @Test - public void testMatch() { - String topic = TestUtil.randomTopic(); - TopicMatcher matcher = new TopicMatcher(topic); - List topicFilters = TopicUtil.expand(topic); - log.debug("Topic '{}' matches {} TopicFilters", topic, topicFilters.size()); - for (int i = topicFilters.size() - 1; i >= 0; i--) { - String topicFilter = topicFilters.get(i); - assertTrue(matcher.match(topicFilter)); - if (i < topicFilters.size() - 1) { - assertEquals(matcher.next(topicFilter).get(), topicFilters.get(i + 1)); - } else { - assertFalse(matcher.next(topicFilter).isPresent()); - } - } - } - - @Test - public void testMatchSysTopic() { - String topic = "$sys/baidu/user/event/abc"; - TopicMatcher matcher = new TopicMatcher(topic); - List topicFilters = TopicUtil.expand(topic); - for (int i = topicFilters.size() - 1; i >= 0; i--) { - String topicFilter = topicFilters.get(i); - assertTrue(matcher.match(topicFilter)); - if (i < topicFilters.size() - 1) { - assertEquals(matcher.next(topicFilter).get(), topicFilters.get(i + 1)); - } else { - assertFalse(matcher.next(topicFilter).isPresent()); - } - } - assertFalse(matcher.match("#")); - assertFalse(matcher.match("+")); - assertFalse(matcher.match("+/+/+/+/+")); - } - - @Test - public void testMatchRandomly() { - int j = 100; - while (j-- > 0) { - String topic = TestUtil.randomTopic(); - TopicMatcher matcher = new TopicMatcher(topic); - NavigableSet topicFilters = Sets.newTreeSet(TopicUtil.expand(topic)); - int i = 100; - while (i-- > 0) { - String topicFilter = TestUtil.randomTopicFilter(); - boolean matched = matcher.match(escape(topicFilter)); - if (!Optional.ofNullable(topicFilters.higher(escape(topicFilter))) - .equals(matcher.next(escape(topicFilter)))) { - log.info("'{}' matches '{}'? {}, next '{}'", - topicFilter, topic, matched, matcher.next(escape(topicFilter)).orElse(null)); - fail(); - } - } - } - } } diff --git a/bifromq-dist/bifromq-dist-server/src/main/java/com/baidu/bifromq/dist/server/DistService.java b/bifromq-dist/bifromq-dist-server/src/main/java/com/baidu/bifromq/dist/server/DistService.java index 1ad56f392..44f2e567d 100644 --- a/bifromq-dist/bifromq-dist-server/src/main/java/com/baidu/bifromq/dist/server/DistService.java +++ b/bifromq-dist/bifromq-dist-server/src/main/java/com/baidu/bifromq/dist/server/DistService.java @@ -69,8 +69,7 @@ public class DistService extends DistServiceGrpc.DistServiceImplBase { tenantFanouts = Caffeine.newBuilder() .expireAfterAccess(120, TimeUnit.SECONDS) .build(k -> new RunningAverage(5)); - this.distCallScheduler = new DistCallScheduler(this.distCallRateScheduler, distWorkerClient, - tenantId -> tenantFanouts.get(tenantId).estimate()); + this.distCallScheduler = new DistCallScheduler(this.distCallRateScheduler, distWorkerClient, settingProvider); } @Override diff --git a/bifromq-dist/bifromq-dist-server/src/main/java/com/baidu/bifromq/dist/server/scheduler/DistCallScheduler.java b/bifromq-dist/bifromq-dist-server/src/main/java/com/baidu/bifromq/dist/server/scheduler/DistCallScheduler.java index fc438dea6..14347d63d 100644 --- a/bifromq-dist/bifromq-dist-server/src/main/java/com/baidu/bifromq/dist/server/scheduler/DistCallScheduler.java +++ b/bifromq-dist/bifromq-dist-server/src/main/java/com/baidu/bifromq/dist/server/scheduler/DistCallScheduler.java @@ -13,11 +13,14 @@ package com.baidu.bifromq.dist.server.scheduler; -import static com.baidu.bifromq.basekv.client.KVRangeRouterUtil.findByBoundary; +import static com.baidu.bifromq.basekv.client.KVRangeRouterUtil.findByKey; +import static com.baidu.bifromq.basekv.utils.BoundaryUtil.FULL_BOUNDARY; import static com.baidu.bifromq.dist.entity.EntityUtil.matchRecordKeyPrefix; import static com.baidu.bifromq.dist.entity.EntityUtil.tenantUpperBound; +import static com.baidu.bifromq.util.TopicConst.NUL; import com.baidu.bifromq.basekv.client.IBaseKVStoreClient; +import com.baidu.bifromq.basekv.client.KVRangeRouterUtil; import com.baidu.bifromq.basekv.client.KVRangeSetting; import com.baidu.bifromq.basekv.proto.Boundary; import com.baidu.bifromq.basekv.proto.KVRangeId; @@ -29,13 +32,18 @@ import com.baidu.bifromq.basescheduler.IBatchCall; import com.baidu.bifromq.basescheduler.ICallScheduler; import com.baidu.bifromq.basescheduler.ICallTask; +import com.baidu.bifromq.dist.entity.EntityUtil; import com.baidu.bifromq.dist.rpc.proto.BatchDistReply; import com.baidu.bifromq.dist.rpc.proto.BatchDistRequest; import com.baidu.bifromq.dist.rpc.proto.DistPack; import com.baidu.bifromq.dist.rpc.proto.DistServiceROCoProcInput; +import com.baidu.bifromq.dist.trie.TopicFilterIterator; +import com.baidu.bifromq.dist.trie.TopicTrieNode; +import com.baidu.bifromq.dist.util.TopicUtil; +import com.baidu.bifromq.plugin.settingprovider.ISettingProvider; +import com.baidu.bifromq.plugin.settingprovider.Setting; import com.baidu.bifromq.sysprops.props.DataPlaneBurstLatencyMillis; import com.baidu.bifromq.sysprops.props.DataPlaneTolerableLatencyMillis; -import com.baidu.bifromq.sysprops.props.DistWorkerFanOutSplitThreshold; import com.baidu.bifromq.type.ClientInfo; import com.baidu.bifromq.type.Message; import com.baidu.bifromq.type.TopicMessagePack; @@ -43,31 +51,31 @@ import java.time.Duration; import java.util.ArrayDeque; import java.util.HashMap; -import java.util.LinkedList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Queue; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.function.Function; import lombok.extern.slf4j.Slf4j; @Slf4j public class DistCallScheduler extends BatchCallScheduler, Integer> implements IDistCallScheduler { + private final ISettingProvider settingProvider; private final IBaseKVStoreClient distWorkerClient; - private final Function tenantFanoutGetter; - private final int fanoutSplitThreshold = DistWorkerFanOutSplitThreshold.INSTANCE.get(); public DistCallScheduler(ICallScheduler reqScheduler, IBaseKVStoreClient distWorkerClient, - Function tenantFanoutGetter) { + ISettingProvider settingProvider) { super("dist_server_dist_batcher", reqScheduler, Duration.ofMillis(DataPlaneTolerableLatencyMillis.INSTANCE.get()), Duration.ofMillis(DataPlaneBurstLatencyMillis.INSTANCE.get())); + this.settingProvider = settingProvider; this.distWorkerClient = distWorkerClient; - this.tenantFanoutGetter = tenantFanoutGetter; } @Override @@ -76,9 +84,7 @@ protected Batcher, Integer> newBatcher(Stri long burstLatencyNanos, Integer batchKey) { return new DistWorkerCallBatcher(batchKey, name, tolerableLatencyNanos, burstLatencyNanos, - fanoutSplitThreshold, - distWorkerClient, - tenantFanoutGetter); + distWorkerClient, settingProvider); } @Override @@ -87,21 +93,18 @@ protected Optional find(DistWorkerCall request) { } private static class DistWorkerCallBatcher extends Batcher, Integer> { + private final ISettingProvider settingProvider; private final IBaseKVStoreClient distWorkerClient; private final String orderKey = UUID.randomUUID().toString(); - private final Function tenantFanoutGetter; - private final int fanoutSplitThreshold; protected DistWorkerCallBatcher(Integer batcherKey, String name, long tolerableLatencyNanos, long burstLatencyNanos, - int fanoutSplitThreshold, IBaseKVStoreClient distWorkerClient, - Function tenantFanoutGetter) { + ISettingProvider settingProvider) { super(batcherKey, name, tolerableLatencyNanos, burstLatencyNanos); + this.settingProvider = settingProvider; this.distWorkerClient = distWorkerClient; - this.tenantFanoutGetter = tenantFanoutGetter; - this.fanoutSplitThreshold = fanoutSplitThreshold; } @Override @@ -111,83 +114,73 @@ protected IBatchCall, Integer> newBatch() { private class BatchDistCall implements IBatchCall, Integer> { private final Queue, Integer>> tasks = new ArrayDeque<>(128); - private Map>>> batch = new HashMap<>(128); + private Map>> batch = new HashMap<>(128); + private Map> topicsByTenantId = new HashMap<>(); @Override public void reset() { batch = new HashMap<>(128); + topicsByTenantId = new HashMap<>(); } @Override public void add(ICallTask, Integer> callTask) { - Map>> clientMsgsByTopic = - batch.computeIfAbsent(callTask.call().tenantId, k -> new HashMap<>()); - callTask.call().publisherMsgPacks.forEach(senderMsgPack -> - senderMsgPack.getMessagePackList().forEach(topicMsgs -> - clientMsgsByTopic.computeIfAbsent(topicMsgs.getTopic(), k -> new HashMap<>()) - .compute(senderMsgPack.getPublisher(), (k, v) -> { + callTask.call().publisherMsgPacks.forEach(publisherMsgPack -> + publisherMsgPack.getMessagePackList().forEach(topicMsgs -> { + GlobalTopic globalTopic = new GlobalTopic(callTask.call().tenantId, topicMsgs.getTopic()); + topicsByTenantId.computeIfAbsent(callTask.call().tenantId, k -> new HashSet<>()) + .add(globalTopic); + batch.computeIfAbsent(globalTopic, k -> new HashMap<>()) + .compute(publisherMsgPack.getPublisher(), (k, v) -> { if (v == null) { v = topicMsgs.getMessageList(); } else { v = Iterables.concat(v, topicMsgs.getMessageList()); } return v; - }))); + }); + })); tasks.add(callTask); } @Override public CompletableFuture execute() { - Map> distPacksByRangeReplica = new HashMap<>(); - batch.forEach((tenantId, topicMap) -> { - DistPack.Builder distPackBuilder = DistPack.newBuilder().setTenantId(tenantId); - topicMap.forEach((topic, senderMap) -> { - TopicMessagePack.Builder topicMsgPackBuilder = TopicMessagePack.newBuilder().setTopic(topic); - senderMap.forEach((sender, msgs) -> - topicMsgPackBuilder.addMessage(TopicMessagePack.PublisherPack - .newBuilder() - .setPublisher(sender) - .addAllMessage(msgs) - .build())); - distPackBuilder.addMsgPack(topicMsgPackBuilder.build()); - }); - DistPack distPack = distPackBuilder.build(); - int fanoutScale = tenantFanoutGetter.apply(tenantId); - List ranges = findByBoundary(Boundary.newBuilder() - .setStartKey(matchRecordKeyPrefix(tenantId)) - .setEndKey(tenantUpperBound(tenantId)) - .build(), distWorkerClient.latestEffectiveRouter()); - if (fanoutScale > fanoutSplitThreshold) { - ranges.forEach(range -> distPacksByRangeReplica.computeIfAbsent( - new KVRangeReplica(range.id, range.ver, range.leader), - k -> new LinkedList<>()).add(distPack)); - } else { - ranges.forEach(range -> distPacksByRangeReplica.computeIfAbsent( - new KVRangeReplica(range.id, range.ver, range.randomReplica()), - k -> new LinkedList<>()).add(distPack)); - } - }); - long reqId = System.nanoTime(); @SuppressWarnings("unchecked") - CompletableFuture[] distReplyFutures = distPacksByRangeReplica.entrySet().stream() + CompletableFuture[] distReplyFutures = replicaSelect(rangeLookup()) + .entrySet() + .stream() .map(entry -> { KVRangeReplica rangeReplica = entry.getKey(); - BatchDistRequest batchDist = BatchDistRequest.newBuilder() + Map>> replicaBatch = entry.getValue(); + BatchDistRequest.Builder batchDistBuilder = BatchDistRequest.newBuilder() .setReqId(reqId) - .addAllDistPack(entry.getValue()) - .setOrderKey(orderKey) - .build(); + .setOrderKey(orderKey); + replicaBatch.forEach((globalTopic, publisherMsgs) -> { + String tenantId = globalTopic.tenantId; + String topic = globalTopic.topic; + DistPack.Builder distPackBuilder = DistPack.newBuilder().setTenantId(tenantId); + TopicMessagePack.Builder topicMsgPackBuilder = + TopicMessagePack.newBuilder().setTopic(topic); + publisherMsgs.forEach((publisher, msgs) -> + topicMsgPackBuilder.addMessage(TopicMessagePack.PublisherPack + .newBuilder() + .setPublisher(publisher) + .addAllMessage(msgs) + .build())); + distPackBuilder.addMsgPack(topicMsgPackBuilder.build()); + batchDistBuilder.addDistPack(distPackBuilder.build()); + }); return distWorkerClient.query(rangeReplica.storeId, KVRangeRORequest.newBuilder() .setReqId(reqId) .setVer(rangeReplica.ver) .setKvRangeId(rangeReplica.id) .setRoCoProc(ROCoProcInput.newBuilder() .setDistService(DistServiceROCoProcInput.newBuilder() - .setBatchDist(batchDist) + .setBatchDist(batchDistBuilder.build()) .build()) .build()) - .build(), batchDist.getOrderKey()) + .build(), orderKey) .thenApply(v -> { if (v.getCode() == ReplyCode.Ok) { BatchDistReply batchDistReply = v.getRoCoProcResult() @@ -240,6 +233,144 @@ public CompletableFuture execute() { return null; }); } + + private Map>>> rangeLookup() { + Map>>> batchByRange = + new HashMap<>(); + if (distWorkerClient.latestEffectiveRouter().containsKey(FULL_BOUNDARY)) { + // no splitting + assert distWorkerClient.latestEffectiveRouter().size() == 1; + batchByRange.put(distWorkerClient.latestEffectiveRouter().get(FULL_BOUNDARY), batch); + } else { + for (String tenantId : topicsByTenantId.keySet()) { + List coveredRanges = KVRangeRouterUtil.findByBoundary(Boundary.newBuilder() + .setStartKey(matchRecordKeyPrefix(tenantId)) + .setEndKey(tenantUpperBound(tenantId)) + .build(), distWorkerClient.latestEffectiveRouter()); + if (coveredRanges.size() == 1) { + // one range per tenant mode + Map>> topicMsgs = + batchByRange.computeIfAbsent(coveredRanges.get(0), k -> new HashMap<>()); + for (GlobalTopic globalTopic : topicsByTenantId.get(tenantId)) { + topicMsgs.put(globalTopic, batch.get(globalTopic)); + } + batchByRange.put(coveredRanges.get(0), topicMsgs); + } else { + // multi ranges per tenant mode + for (GlobalTopic globalTopic : topicsByTenantId.get(tenantId)) { + String topic = globalTopic.topic; + if (!(boolean) settingProvider.provide(Setting.WildcardSubscriptionEnabled, tenantId)) { + // wildcard disabled + Optional rangeSetting = + findByKey(matchRecordKeyPrefix(tenantId, topic), + distWorkerClient.latestEffectiveRouter()); + assert rangeSetting.isPresent(); + batchByRange.computeIfAbsent(rangeSetting.get(), k -> new HashMap<>()) + .put(globalTopic, batch.get(globalTopic)); + } else { + // wildcard disabled, rough screening via 'one-pass' scan + Map> publisherMsg = batch.get(globalTopic); + TopicFilterIterator topicFilterIterator = + new TopicFilterIterator<>(TopicTrieNode.builder(true) + .addTopic(TopicUtil.parse(tenantId, topic, false), topic) + .build()); + for (Boundary boundary : distWorkerClient.latestEffectiveRouter().keySet()) { + KVRangeSetting rangeSetting = + distWorkerClient.latestEffectiveRouter().get(boundary); + if (!boundary.hasStartKey()) { + // left open range, must be the first range + EntityUtil.TenantAndEscapedTopicFilter endTenantAndEscapedTopicFilter = + EntityUtil.parseTenantAndEscapedTopicFilter(boundary.getEndKey()); + List globalTopicFilter = + TopicUtil.parse(endTenantAndEscapedTopicFilter.tenantId(), + endTenantAndEscapedTopicFilter.escapedTopicFilter(), true); + topicFilterIterator.seekPrev(globalTopicFilter); + if (topicFilterIterator.isValid()) { + batchByRange.computeIfAbsent(rangeSetting, k -> new HashMap<>()) + .computeIfAbsent(globalTopic, k -> new HashMap<>()) + .putAll(publisherMsg); + } + } else if (!boundary.hasEndKey()) { + // right open range, must be the last range + EntityUtil.TenantAndEscapedTopicFilter startTenantAndEscapedTopicFilter = + EntityUtil.parseTenantAndEscapedTopicFilter(boundary.getStartKey()); + List globalTopicFilter = + TopicUtil.parse(startTenantAndEscapedTopicFilter.tenantId(), + startTenantAndEscapedTopicFilter.escapedTopicFilter(), true); + topicFilterIterator.seek(globalTopicFilter); + if (topicFilterIterator.isValid()) { + batchByRange.computeIfAbsent(rangeSetting, k -> new HashMap<>()) + .computeIfAbsent(globalTopic, k -> new HashMap<>()) + .putAll(publisherMsg); + } + } else { + EntityUtil.TenantAndEscapedTopicFilter startTenantAndEscapedTopicFilter = + EntityUtil.parseTenantAndEscapedTopicFilter(boundary.getStartKey()); + List startGlobalTopicFilter = + TopicUtil.parse(startTenantAndEscapedTopicFilter.tenantId(), + startTenantAndEscapedTopicFilter.escapedTopicFilter(), true); + topicFilterIterator.seek(startGlobalTopicFilter); + if (topicFilterIterator.isValid()) { + String probeTopicFilter = + TopicUtil.fastJoin(NUL, topicFilterIterator.key()); + EntityUtil.TenantAndEscapedTopicFilter endTenantAndEscapedTopicFilter = + EntityUtil.parseTenantAndEscapedTopicFilter(boundary.getEndKey()); + String endTopicFilter = + endTenantAndEscapedTopicFilter.toGlobalTopicFilter(); + if (probeTopicFilter.compareTo(endTopicFilter) < 0) { + batchByRange.computeIfAbsent(rangeSetting, k -> new HashMap<>()) + .computeIfAbsent(globalTopic, k -> new HashMap<>()) + .putAll(publisherMsg); + } + } + } + } + } + } + } + } + } + return batchByRange; + } + + private Map>>> replicaSelect( + Map>>> batchByRange) { + Map>>> batchByReplica = + new HashMap<>(); + for (KVRangeSetting rangeSetting : batchByRange.keySet()) { + Map>> rangeBatch = batchByRange.get(rangeSetting); + if (rangeSetting.hasInProcReplica() || rangeSetting.allReplicas.size() == 1) { + // build-in or single replica + batchByReplica.put(new KVRangeReplica( + rangeSetting.id, + rangeSetting.ver, + rangeSetting.randomReplica()), rangeBatch); + } else { + // bind replica based on tenant, topic and publisher + for (GlobalTopic globalTopic : rangeBatch.keySet()) { + Map> rangePublisherMsgs = rangeBatch.get(globalTopic); + String tenantId = globalTopic.tenantId; + String topic = globalTopic.topic; + for (ClientInfo publisher : rangePublisherMsgs.keySet()) { + int hash = Objects.hash(tenantId, topic, publisher); + int replicaIdx = Math.abs(hash) % rangeSetting.allReplicas.size(); + // replica bind + batchByReplica.computeIfAbsent(new KVRangeReplica( + rangeSetting.id, + rangeSetting.ver, + rangeSetting.allReplicas.get(replicaIdx)), k -> new HashMap<>()) + .computeIfAbsent(globalTopic, k -> new HashMap<>()) + .put(publisher, rangePublisherMsgs.get(publisher)); + } + } + } + } + return batchByReplica; + } + } + + private record GlobalTopic(String tenantId, String topic) { + } private record KVRangeReplica(KVRangeId id, long ver, String storeId) {