From 5ed4b824ebc8ae42633d437fcdd5e9ad750eb925 Mon Sep 17 00:00:00 2001 From: Yonny Hao Date: Wed, 4 Sep 2024 17:06:12 +0800 Subject: [PATCH] refactoring BaseKVStoreClient to support custom routing mechanism --- .../basekv/client/BaseKVStoreClient.java | 307 ++++++++---------- .../basekv/client/IBaseKVStoreClient.java | 8 +- .../bifromq/basekv/client/IKVRangeRouter.java | 25 -- .../bifromq/basekv/client/KVRangeRouter.java | 121 ------- .../basekv/client/KVRangeRouterUtil.java | 80 +++++ .../scheduler/MutationCallScheduler.java | 6 +- .../client/scheduler/QueryCallScheduler.java | 6 +- ...erTest.java => KVRangeRouterUtilTest.java} | 98 +++--- .../scheduler/BatchMutationCallTest.java | 31 +- .../client/scheduler/BatchQueryCallTest.java | 26 +- .../basekv/client/scheduler/Fixtures.java | 3 + .../server/scheduler/DistCallScheduler.java | 5 +- .../bifromq/dist/worker/DistWorkerTest.java | 10 +- .../inbox/store/gc/InboxStoreGCProcessor.java | 10 +- .../inbox/store/gc/InboxGCProcessorTest.java | 59 +++- .../bifromq/inbox/store/InboxStoreTest.java | 10 +- .../store/gc/RetainStoreGCProcessor.java | 9 +- .../store/gc/RetainStoreGCProcessorTest.java | 72 ++-- .../bifromq/retain/store/RetainStoreTest.java | 10 +- 19 files changed, 440 insertions(+), 456 deletions(-) delete mode 100644 base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/IKVRangeRouter.java delete mode 100644 base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/KVRangeRouter.java create mode 100644 base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/KVRangeRouterUtil.java rename base-kv/base-kv-store-client/src/test/java/com/baidu/bifromq/basekv/client/{KVRangeRouterTest.java => KVRangeRouterUtilTest.java} (69%) diff --git a/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/BaseKVStoreClient.java b/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/BaseKVStoreClient.java index d2526b9f4..6f01a2d0d 100644 --- a/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/BaseKVStoreClient.java +++ b/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/BaseKVStoreClient.java @@ -26,8 +26,10 @@ import static com.baidu.bifromq.basekv.store.proto.BaseKVStoreServiceGrpc.getRecoverMethod; import static com.baidu.bifromq.basekv.store.proto.BaseKVStoreServiceGrpc.getSplitMethod; import static com.baidu.bifromq.basekv.store.proto.BaseKVStoreServiceGrpc.getTransferLeadershipMethod; -import static com.baidu.bifromq.basekv.utils.BoundaryUtil.FULL_BOUNDARY; +import static com.baidu.bifromq.basekv.utils.DescriptorUtil.getEffectiveEpoch; +import static com.baidu.bifromq.basekv.utils.DescriptorUtil.toLeaderRanges; import static java.util.Collections.emptyMap; +import static java.util.Collections.emptyNavigableMap; import com.baidu.bifromq.basecrdt.core.api.IORMap; import com.baidu.bifromq.basecrdt.service.ICRDTService; @@ -37,6 +39,7 @@ import com.baidu.bifromq.basekv.proto.KVRangeDescriptor; import com.baidu.bifromq.basekv.proto.KVRangeId; import com.baidu.bifromq.basekv.proto.KVRangeStoreDescriptor; +import com.baidu.bifromq.basekv.raft.proto.RaftNodeStatus; import com.baidu.bifromq.basekv.store.proto.BootstrapReply; import com.baidu.bifromq.basekv.store.proto.BootstrapRequest; import com.baidu.bifromq.basekv.store.proto.ChangeReplicaConfigReply; @@ -54,13 +57,14 @@ import com.baidu.bifromq.basekv.store.proto.ReplyCode; import com.baidu.bifromq.basekv.store.proto.TransferLeadershipReply; import com.baidu.bifromq.basekv.store.proto.TransferLeadershipRequest; +import com.baidu.bifromq.basekv.utils.DescriptorUtil; +import com.baidu.bifromq.basekv.utils.KeySpaceDAG; import com.baidu.bifromq.baserpc.BluePrint; import com.baidu.bifromq.baserpc.IRPCClient; import com.baidu.bifromq.baserpc.exception.ServerNotFoundException; import com.baidu.bifromq.baserpc.utils.BehaviorSubject; import com.baidu.bifromq.logger.SiftLogger; import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.protobuf.ByteString; @@ -69,15 +73,16 @@ import io.reactivex.rxjava3.disposables.CompositeDisposable; import io.reactivex.rxjava3.subjects.Subject; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; import java.util.stream.IntStream; import org.slf4j.Logger; @@ -92,7 +97,6 @@ private record ClusterInfo(Map storeDescriptors, private final ICRDTService crdtService; private final AtomicBoolean closed = new AtomicBoolean(); private final CompositeDisposable disposables = new CompositeDisposable(); - private final KVRangeRouter rangeRouter; private final int queryPipelinesPerStore; private final IORMap storeDescriptorCRDT; private final MethodDescriptor bootstrapMethod; @@ -106,6 +110,8 @@ private record ClusterInfo(Map storeDescriptors, private final MethodDescriptor queryMethod; private final Subject> storeToServerSubject = BehaviorSubject.createDefault(Maps.newHashMap()); private final Observable clusterInfoObservable; + private final BehaviorSubject> effectiveRouterSubject = + BehaviorSubject.createDefault(emptyNavigableMap()); // key: serverId, val: storeId private volatile Map serverToStoreMap = Maps.newHashMap(); @@ -121,7 +127,6 @@ private record ClusterInfo(Map storeDescriptors, BaseKVStoreClient(BaseKVStoreClientBuilder builder) { this.clusterId = builder.clusterId; log = SiftLogger.getLogger(BaseKVStoreClient.class, "clusterId", clusterId); - rangeRouter = new KVRangeRouter(clusterId); BluePrint bluePrint = RPCBluePrint.build(clusterId); this.bootstrapMethod = bluePrint.methodDesc( toScopedFullMethodName(clusterId, getBootstrapMethod().getFullMethodName())); @@ -171,6 +176,13 @@ private record ClusterInfo(Map storeDescriptors, return complete; }); disposables.add(clusterInfoObservable.subscribe(this::refresh)); + disposables.add(effectiveRouterSubject.subscribe(router -> { + if (!router.isEmpty()) { + synchronized (this) { + this.notifyAll(); + } + } + })); } @Override @@ -189,13 +201,13 @@ public Observable> describe() { } @Override - public Optional findByKey(ByteString key) { - return rangeRouter.findByKey(key); + public Observable> effectiveRouter() { + return effectiveRouterSubject.distinctUntilChanged(); } @Override - public List findByBoundary(Boundary boundary) { - return rangeRouter.findByBoundary(boundary); + public NavigableMap latestEffectiveRouter() { + return effectiveRouterSubject.getValue(); } @Override @@ -335,29 +347,31 @@ public CompletableFuture linearizedQuery(String storeId, KVRange @Override public IMutationPipeline createMutationPipeline(String storeId) { - return new ManagedMutationPipeline(storeToServerSubject.map(m -> { - String serverId = m.get(storeId); - if (serverId == null) { - return new IRPCClient.IRequestPipeline<>() { - @Override - public boolean isClosed() { - return false; - } - - @Override - public CompletableFuture invoke(KVRangeRWRequest req) { - return CompletableFuture.failedFuture( - new ServerNotFoundException("No hosting server found for store: " + storeId)); - } - - @Override - public void close() { - - } - }; - } - return rpcClient.createRequestPipeline("", serverId, null, emptyMap(), executeMethod); - })); + return new ManagedMutationPipeline(storeToServerSubject + .map(m -> m.get(storeId)) + .distinctUntilChanged() + .map(serverId -> { + if (serverId == null) { + return new IRPCClient.IRequestPipeline<>() { + @Override + public boolean isClosed() { + return false; + } + + @Override + public CompletableFuture invoke(KVRangeRWRequest req) { + return CompletableFuture.failedFuture( + new ServerNotFoundException("No hosting server found for store: " + storeId)); + } + + @Override + public void close() { + + } + }; + } + return rpcClient.createRequestPipeline("", serverId, null, emptyMap(), executeMethod); + })); } @Override @@ -371,42 +385,44 @@ public IQueryPipeline createLinearizedQueryPipeline(String storeId) { } private IQueryPipeline createQueryPipeline(String storeId, boolean linearized) { - return new ManagedQueryPipeline(storeToServerSubject.map(m -> { - String serverId = m.get(storeId); - if (serverId == null) { - return new IRPCClient.IRequestPipeline<>() { - @Override - public boolean isClosed() { - return false; - } - - @Override - public CompletableFuture invoke(KVRangeRORequest req) { - return CompletableFuture.failedFuture( - new ServerNotFoundException("No hosting server found for store: " + storeId)); - } - - @Override - public void close() { - - } - }; - } - if (linearized) { - return rpcClient.createRequestPipeline("", serverId, null, emptyMap(), linearizedQueryMethod); - } else { - return rpcClient.createRequestPipeline("", serverId, null, emptyMap(), queryMethod); - } - })); + return new ManagedQueryPipeline(storeToServerSubject + .map(m -> m.get(storeId)) + .distinctUntilChanged() + .map(serverId -> { + if (serverId == null) { + return new IRPCClient.IRequestPipeline<>() { + @Override + public boolean isClosed() { + return false; + } + + @Override + public CompletableFuture invoke(KVRangeRORequest req) { + return CompletableFuture.failedFuture( + new ServerNotFoundException("No hosting server found for store: " + storeId)); + } + + @Override + public void close() { + + } + }; + } + if (linearized) { + return rpcClient.createRequestPipeline("", serverId, null, emptyMap(), linearizedQueryMethod); + } else { + return rpcClient.createRequestPipeline("", serverId, null, emptyMap(), queryMethod); + } + })); } @Override public void join() { // wait for router covering full range - if (!rangeRouter.isFullRangeCovered()) { + if (effectiveRouterSubject.getValue() == null || effectiveRouterSubject.getValue().isEmpty()) { synchronized (this) { try { - if (!rangeRouter.isFullRangeCovered()) { + if (effectiveRouterSubject.getValue().isEmpty()) { this.wait(); } } catch (InterruptedException e) { @@ -455,22 +471,30 @@ private void refresh(ClusterInfo clusterInfo) { boolean rangeRouteUpdated = refreshRangeRoute(clusterInfo); boolean storeRouteUpdated = refreshStoreRoute(clusterInfo); if (storeRouteUpdated) { - refreshQueryPipelines(storeToServerMap); + refreshQueryPipelines(clusterInfo.storeDescriptors.keySet()); } if (rangeRouteUpdated || storeRouteUpdated) { - refreshMutPipelines(storeToServerMap, clusterInfo.storeDescriptors); - } - if (rangeRouteUpdated) { - if (rangeRouter.isFullRangeCovered()) { - synchronized (this) { - this.notifyAll(); - } - } + refreshMutPipelines(clusterInfo.storeDescriptors); } } private boolean refreshRangeRoute(ClusterInfo clusterInfo) { - return rangeRouter.reset(Sets.newHashSet(clusterInfo.storeDescriptors.values())); + Optional effectiveEpoch = + getEffectiveEpoch(Sets.newHashSet(clusterInfo.storeDescriptors.values())); + if (effectiveEpoch.isEmpty()) { + return false; + } + Map> leaderRanges = + toLeaderRanges(effectiveEpoch.get().storeDescriptors()); + KeySpaceDAG dag = new KeySpaceDAG(leaderRanges); + NavigableMap router = Maps.transformValues(dag.getEffectiveFullCoveredRoute(), + leaderRange -> new KVRangeSetting(clusterId, leaderRange.storeId(), leaderRange.descriptor())); + if (router.isEmpty()) { + return false; + } + NavigableMap last = effectiveRouterSubject.getValue(); + effectiveRouterSubject.onNext(router); + return !router.equals(last); } private boolean refreshStoreRoute(ClusterInfo clusterInfo) { @@ -486,121 +510,52 @@ private boolean refreshStoreRoute(ClusterInfo clusterInfo) { return true; } - private void refreshMutPipelines(Map newStoreToServerMap, - Map storeDescriptorMap) { - Map> nextMutPplns = Maps.newHashMap(mutPplns); - nextMutPplns.forEach((k, v) -> nextMutPplns.put(k, Maps.newHashMap(v))); - Set oldStoreIds = Sets.newHashSet(mutPplns.keySet()); - for (String existingStoreId : Sets.intersection(newStoreToServerMap.keySet(), oldStoreIds)) { - Set newRangeIds = storeDescriptorMap.get(existingStoreId).getRangesList().stream() - .map(KVRangeDescriptor::getId).collect(Collectors.toSet()); - Set oldRangeIds = Sets.newHashSet(nextMutPplns.get(existingStoreId).keySet()); - for (KVRangeId newRangeId : Sets.difference(newRangeIds, oldRangeIds)) { - nextMutPplns.get(existingStoreId) - .put(newRangeId, new MutPipeline(newStoreToServerMap.get(existingStoreId))); - } - for (KVRangeId deadRangeId : Sets.difference(oldRangeIds, newRangeIds)) { - nextMutPplns.get(existingStoreId).remove(deadRangeId).close(); - } - } - Set effectiveKVRangeIds = rangeRouter.findByBoundary(FULL_BOUNDARY).stream() - .map(m -> m.id).collect(Collectors.toSet()); - for (String newStoreId : Sets.difference(newStoreToServerMap.keySet(), oldStoreIds)) { - Map rangeExecPplns = - nextMutPplns.computeIfAbsent(newStoreId, k -> new HashMap<>()); - for (KVRangeDescriptor rangeDesc : storeDescriptorMap.get(newStoreId).getRangesList()) { - if (effectiveKVRangeIds.contains(rangeDesc.getId())) { - // only create mutation pipelines for effective ranges - rangeExecPplns.put(rangeDesc.getId(), new MutPipeline(newStoreToServerMap.get(newStoreId))); + private void refreshMutPipelines(Map storeDescriptors) { + Map> nextMutPplns = new HashMap<>(); + Map> currentMutPplns = mutPplns; + + for (KVRangeStoreDescriptor storeDescriptor : storeDescriptors.values()) { + String storeId = storeDescriptor.getId(); + for (KVRangeDescriptor rangeDescriptor : storeDescriptor.getRangesList()) { + if (rangeDescriptor.getRole() != RaftNodeStatus.Leader) { + continue; + } + KVRangeId rangeId = rangeDescriptor.getId(); + Map currentRanges = + currentMutPplns.getOrDefault(storeId, Collections.emptyMap()); + IMutationPipeline existingPpln = currentRanges.get(rangeId); + if (existingPpln != null) { + nextMutPplns.computeIfAbsent(storeId, k -> new HashMap<>()).put(rangeId, existingPpln); + } else { + nextMutPplns.computeIfAbsent(storeId, k -> new HashMap<>()) + .put(rangeId, createMutationPipeline(storeId)); } } } - - for (String deadStoreId : Sets.difference(oldStoreIds, newStoreToServerMap.keySet())) { - nextMutPplns.remove(deadStoreId).values().forEach(IMutationPipeline::close); - } mutPplns = nextMutPplns; - } - - private void refreshQueryPipelines(Map newStoreToServerMap) { - if (newStoreToServerMap.keySet().equals(queryPplns.keySet())) { - return; - } - Set oldStoreIds = Sets.newHashSet(queryPplns.keySet()); - // query pipelines - Map> nextQueryPplns = Maps.newHashMap(queryPplns); - nextQueryPplns.forEach((k, v) -> nextQueryPplns.put(k, Lists.newArrayList(v))); - // lnr query pipelines - Map> nextLnrQueryPplns = Maps.newHashMap(lnrQueryPplns); - nextQueryPplns.forEach((k, v) -> nextLnrQueryPplns.put(k, Lists.newArrayList(v))); - - for (String newStoreId : Sets.difference(newStoreToServerMap.keySet(), oldStoreIds)) { - // setup new query pipelines - List storeQueryPplns = nextQueryPplns.computeIfAbsent(newStoreId, k -> new ArrayList<>()); - IntStream.range(0, queryPipelinesPerStore) - .forEach(i -> storeQueryPplns.add(new QueryPipeline(newStoreToServerMap.get(newStoreId)))); - - // setup new linear query pipelines - List storeLnrQueryPplns = - nextLnrQueryPplns.computeIfAbsent(newStoreId, k -> new ArrayList<>()); - IntStream.range(0, queryPipelinesPerStore) - .forEach(i -> storeLnrQueryPplns.add(new QueryPipeline(newStoreToServerMap.get(newStoreId), true))); + // clear mut pipelines targeting non-exist storeId; + for (String storeId : Sets.difference(currentMutPplns.keySet(), nextMutPplns.keySet())) { + currentMutPplns.get(storeId).values().forEach(IMutationPipeline::close); } - for (String deadStoreId : Sets.difference(oldStoreIds, newStoreToServerMap.keySet())) { - // close store pipelines - nextQueryPplns.remove(deadStoreId).forEach(IQueryPipeline::close); - nextLnrQueryPplns.remove(deadStoreId).forEach(IQueryPipeline::close); - } - queryPplns = nextQueryPplns; - lnrQueryPplns = nextLnrQueryPplns; } - private class MutPipeline implements IMutationPipeline { - private final IRPCClient.IRequestPipeline ppln; - - MutPipeline(String serverId) { - ppln = rpcClient.createRequestPipeline("", serverId, null, emptyMap(), executeMethod); - } - - @Override - public CompletableFuture execute(KVRangeRWRequest request) { - log.trace("Requesting rw range:req={}", request); - return ppln.invoke(request); - } - - @Override - public void close() { - ppln.close(); - } - } - - private class QueryPipeline implements IQueryPipeline { - private final IRPCClient.IRequestPipeline ppln; - private final boolean linearized; - - QueryPipeline(String serverId) { - this(serverId, false); - } - - QueryPipeline(String serverId, boolean linearized) { - this.linearized = linearized; - if (linearized) { - ppln = rpcClient.createRequestPipeline("", serverId, null, emptyMap(), linearizedQueryMethod); + private void refreshQueryPipelines(Set allStoreIds) { + Map> nextQueryPplns = new HashMap<>(); + Map> currentQueryPplns = queryPplns; + for (String storeId : allStoreIds) { + if (currentQueryPplns.containsKey(storeId)) { + nextQueryPplns.put(storeId, currentQueryPplns.get(storeId)); } else { - ppln = rpcClient.createRequestPipeline("", serverId, null, - emptyMap(), queryMethod); + List queryPipelines = new ArrayList<>(queryPipelinesPerStore); + IntStream.range(0, queryPipelinesPerStore) + .forEach(i -> queryPipelines.add(createQueryPipeline(storeId))); + nextQueryPplns.put(storeId, queryPipelines); } } - - @Override - public CompletableFuture query(KVRangeRORequest request) { - log.trace("Invoke ro range request: linearized={} \n{}", linearized, request); - return ppln.invoke(request); - } - - @Override - public void close() { - ppln.close(); + queryPplns = nextQueryPplns; + // clear query pipelines targeting non-exist storeId; + for (String storeId : Sets.difference(currentQueryPplns.keySet(), allStoreIds)) { + currentQueryPplns.get(storeId).forEach(IQueryPipeline::close); } } } diff --git a/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/IBaseKVStoreClient.java b/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/IBaseKVStoreClient.java index 0466c20ad..af03eecca 100644 --- a/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/IBaseKVStoreClient.java +++ b/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/IBaseKVStoreClient.java @@ -13,6 +13,7 @@ package com.baidu.bifromq.basekv.client; +import com.baidu.bifromq.basekv.proto.Boundary; import com.baidu.bifromq.basekv.proto.KVRangeStoreDescriptor; import com.baidu.bifromq.basekv.store.proto.BootstrapReply; import com.baidu.bifromq.basekv.store.proto.BootstrapRequest; @@ -32,10 +33,11 @@ import com.baidu.bifromq.basekv.store.proto.TransferLeadershipRequest; import com.baidu.bifromq.baserpc.IConnectable; import io.reactivex.rxjava3.core.Observable; +import java.util.NavigableMap; import java.util.Set; import java.util.concurrent.CompletableFuture; -public interface IBaseKVStoreClient extends IKVRangeRouter, IConnectable { +public interface IBaseKVStoreClient extends IConnectable { static BaseKVStoreClientBuilder newBuilder() { return new BaseKVStoreClientBuilder(); } @@ -44,6 +46,10 @@ static BaseKVStoreClientBuilder newBuilder() { Observable> describe(); + Observable> effectiveRouter(); + + NavigableMap latestEffectiveRouter(); + CompletableFuture bootstrap(String storeId, BootstrapRequest request); CompletableFuture recover(String storeId, RecoverRequest request); diff --git a/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/IKVRangeRouter.java b/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/IKVRangeRouter.java deleted file mode 100644 index 57b9bf7d5..000000000 --- a/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/IKVRangeRouter.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (c) 2023. The BifroMQ Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - */ - -package com.baidu.bifromq.basekv.client; - -import com.baidu.bifromq.basekv.proto.Boundary; -import com.google.protobuf.ByteString; -import java.util.List; -import java.util.Optional; - -public interface IKVRangeRouter { - Optional findByKey(ByteString key); - - List findByBoundary(Boundary boundary); -} diff --git a/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/KVRangeRouter.java b/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/KVRangeRouter.java deleted file mode 100644 index c7871dd76..000000000 --- a/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/KVRangeRouter.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright (c) 2024. The BifroMQ Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - */ - -package com.baidu.bifromq.basekv.client; - -import static com.baidu.bifromq.basekv.utils.BoundaryUtil.compareEndKeys; -import static com.baidu.bifromq.basekv.utils.BoundaryUtil.endKey; -import static com.baidu.bifromq.basekv.utils.DescriptorUtil.toLeaderRanges; -import static com.baidu.bifromq.basekv.utils.DescriptorUtil.getEffectiveEpoch; - -import com.baidu.bifromq.basekv.proto.Boundary; -import com.baidu.bifromq.basekv.proto.KVRangeDescriptor; -import com.baidu.bifromq.basekv.proto.KVRangeId; -import com.baidu.bifromq.basekv.proto.KVRangeStoreDescriptor; -import com.baidu.bifromq.basekv.utils.BoundaryUtil; -import com.baidu.bifromq.basekv.utils.DescriptorUtil; -import com.baidu.bifromq.basekv.utils.KeySpaceDAG; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.protobuf.ByteString; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Optional; -import java.util.Set; -import java.util.TreeMap; - -public class KVRangeRouter implements IKVRangeRouter { - private final String clusterId; - private volatile NavigableMap router = new TreeMap<>(BoundaryUtil::compare); - - - public KVRangeRouter(String clusterId) { - this.clusterId = clusterId; - } - - /** - * Reset the router with the latest store descriptors if it contains qualified keyspace. - * - * @param storeDescriptors the latest store descriptors - * @return true if the router is changed, otherwise false - */ - public boolean reset(Set storeDescriptors) { - Optional effectiveEpoch = getEffectiveEpoch(storeDescriptors); - if (effectiveEpoch.isEmpty()) { - return false; - } - Map> leaderRanges = - toLeaderRanges(effectiveEpoch.get().storeDescriptors()); - KeySpaceDAG dag = new KeySpaceDAG(leaderRanges); - NavigableMap router = Maps.transformValues(dag.getEffectiveFullCoveredRoute(), - leaderRange -> new KVRangeSetting(clusterId, leaderRange.storeId(), leaderRange.descriptor())); - if (router.isEmpty()) { - return false; - } - boolean changed = !this.router.equals(router); - if (changed) { - this.router = router; - } - return changed; - } - - public boolean isFullRangeCovered() { - return !router.isEmpty(); - } - - @Override - public Optional findByKey(ByteString key) { - Map.Entry entry = router.floorEntry(Boundary.newBuilder().setStartKey(key).build()); - if (entry != null) { - KVRangeSetting setting = entry.getValue(); - if (BoundaryUtil.inRange(key, entry.getKey())) { - return Optional.of(setting); - } - } - return Optional.empty(); - } - - @Override - public List findByBoundary(Boundary boundary) { - if (!boundary.hasStartKey() && !boundary.hasEndKey()) { - return Lists.newArrayList(router.values()); - } - if (!boundary.hasStartKey()) { - Boundary boundaryEnd = Boundary.newBuilder() - .setStartKey(boundary.getEndKey()) - .setEndKey(boundary.getEndKey()).build(); - return Lists.newArrayList(router.headMap(boundaryEnd, false).values()); - } - if (!boundary.hasEndKey()) { - Boundary boundaryStart = Boundary.newBuilder() - .setStartKey(boundary.getStartKey()) - .setEndKey(boundary.getStartKey()).build(); - Boundary floorBoundary = router.floorKey(boundaryStart); - return Lists.newArrayList( - router.tailMap(floorBoundary, compareEndKeys(endKey(floorBoundary), boundary.getStartKey()) > 0) - .values()); - } - Boundary boundaryStart = Boundary.newBuilder() - .setStartKey(boundary.getStartKey()) - .setEndKey(boundary.getStartKey()).build(); - Boundary boundaryEnd = Boundary.newBuilder() - .setStartKey(boundary.getEndKey()) - .setEndKey(boundary.getEndKey()).build(); - Boundary floorBoundary = router.floorKey(boundaryStart); - - return Lists.newArrayList( - router.subMap(floorBoundary, compareEndKeys(endKey(floorBoundary), boundary.getStartKey()) > 0, - boundaryEnd, false).values()); - } -} diff --git a/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/KVRangeRouterUtil.java b/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/KVRangeRouterUtil.java new file mode 100644 index 000000000..80cebb89e --- /dev/null +++ b/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/KVRangeRouterUtil.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2024. The BifroMQ Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.baidu.bifromq.basekv.client; + +import static com.baidu.bifromq.basekv.utils.BoundaryUtil.compareEndKeys; +import static com.baidu.bifromq.basekv.utils.BoundaryUtil.endKey; + +import com.baidu.bifromq.basekv.proto.Boundary; +import com.baidu.bifromq.basekv.utils.BoundaryUtil; +import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Optional; + +public class KVRangeRouterUtil { + + public static Optional findByKey(ByteString key, + NavigableMap effectiveRouter) { + Map.Entry entry = + effectiveRouter.floorEntry(Boundary.newBuilder().setStartKey(key).build()); + if (entry != null) { + KVRangeSetting setting = entry.getValue(); + if (BoundaryUtil.inRange(key, entry.getKey())) { + return Optional.of(setting); + } + } + return Optional.empty(); + } + + public static List findByBoundary(Boundary boundary, + NavigableMap effectiveRouter) { + if (effectiveRouter.isEmpty()) { + return Collections.emptyList(); + } + if (!boundary.hasStartKey() && !boundary.hasEndKey()) { + return Lists.newArrayList(effectiveRouter.values()); + } + if (!boundary.hasStartKey()) { + Boundary boundaryEnd = Boundary.newBuilder() + .setStartKey(boundary.getEndKey()) + .setEndKey(boundary.getEndKey()).build(); + return Lists.newArrayList(effectiveRouter.headMap(boundaryEnd, false).values()); + } + if (!boundary.hasEndKey()) { + Boundary boundaryStart = Boundary.newBuilder() + .setStartKey(boundary.getStartKey()) + .setEndKey(boundary.getStartKey()).build(); + Boundary floorBoundary = effectiveRouter.floorKey(boundaryStart); + return Lists.newArrayList( + effectiveRouter.tailMap(floorBoundary, + compareEndKeys(endKey(floorBoundary), boundary.getStartKey()) > 0) + .values()); + } + Boundary boundaryStart = Boundary.newBuilder() + .setStartKey(boundary.getStartKey()) + .setEndKey(boundary.getStartKey()).build(); + Boundary boundaryEnd = Boundary.newBuilder() + .setStartKey(boundary.getEndKey()) + .setEndKey(boundary.getEndKey()).build(); + Boundary floorBoundary = effectiveRouter.floorKey(boundaryStart); + + return Lists.newArrayList( + effectiveRouter.subMap(floorBoundary, compareEndKeys(endKey(floorBoundary), boundary.getStartKey()) > 0, + boundaryEnd, false).values()); + } +} diff --git a/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/scheduler/MutationCallScheduler.java b/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/scheduler/MutationCallScheduler.java index ef07484cd..8a639e1ca 100644 --- a/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/scheduler/MutationCallScheduler.java +++ b/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/scheduler/MutationCallScheduler.java @@ -13,8 +13,10 @@ package com.baidu.bifromq.basekv.client.scheduler; -import com.baidu.bifromq.basekv.client.KVRangeSetting; +import static com.baidu.bifromq.basekv.client.KVRangeRouterUtil.findByKey; + import com.baidu.bifromq.basekv.client.IBaseKVStoreClient; +import com.baidu.bifromq.basekv.client.KVRangeSetting; import com.baidu.bifromq.basescheduler.BatchCallScheduler; import com.google.protobuf.ByteString; import java.time.Duration; @@ -45,7 +47,7 @@ public MutationCallScheduler(String name, @Override protected final Optional find(ReqT subCall) { - Optional rangeSetting = storeClient.findByKey(rangeKey(subCall)); + Optional rangeSetting = findByKey(rangeKey(subCall), storeClient.latestEffectiveRouter()); return rangeSetting.map(setting -> new MutationCallBatcherKey(setting.id, setting.leader, setting.ver)); } diff --git a/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/scheduler/QueryCallScheduler.java b/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/scheduler/QueryCallScheduler.java index c85ead636..f60e6f754 100644 --- a/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/scheduler/QueryCallScheduler.java +++ b/base-kv/base-kv-store-client/src/main/java/com/baidu/bifromq/basekv/client/scheduler/QueryCallScheduler.java @@ -13,8 +13,10 @@ package com.baidu.bifromq.basekv.client.scheduler; -import com.baidu.bifromq.basekv.client.KVRangeSetting; +import static com.baidu.bifromq.basekv.client.KVRangeRouterUtil.findByKey; + import com.baidu.bifromq.basekv.client.IBaseKVStoreClient; +import com.baidu.bifromq.basekv.client.KVRangeSetting; import com.baidu.bifromq.basescheduler.BatchCallScheduler; import com.google.protobuf.ByteString; import java.time.Duration; @@ -51,7 +53,7 @@ protected String selectStore(KVRangeSetting setting, ReqT request) { @Override protected final Optional find(ReqT req) { - return storeClient.findByKey(rangeKey(req)).map( + return findByKey(rangeKey(req), storeClient.latestEffectiveRouter()).map( range -> new QueryCallBatcherKey(range.id, selectStore(range, req), selectQueue(req), range.ver)); } diff --git a/base-kv/base-kv-store-client/src/test/java/com/baidu/bifromq/basekv/client/KVRangeRouterTest.java b/base-kv/base-kv-store-client/src/test/java/com/baidu/bifromq/basekv/client/KVRangeRouterUtilTest.java similarity index 69% rename from base-kv/base-kv-store-client/src/test/java/com/baidu/bifromq/basekv/client/KVRangeRouterTest.java rename to base-kv/base-kv-store-client/src/test/java/com/baidu/bifromq/basekv/client/KVRangeRouterUtilTest.java index 762e468ab..5cca50188 100644 --- a/base-kv/base-kv-store-client/src/test/java/com/baidu/bifromq/basekv/client/KVRangeRouterTest.java +++ b/base-kv/base-kv-store-client/src/test/java/com/baidu/bifromq/basekv/client/KVRangeRouterUtilTest.java @@ -13,6 +13,8 @@ package com.baidu.bifromq.basekv.client; +import static com.baidu.bifromq.basekv.client.KVRangeRouterUtil.findByBoundary; +import static com.baidu.bifromq.basekv.client.KVRangeRouterUtil.findByKey; import static com.baidu.bifromq.basekv.utils.BoundaryUtil.EMPTY_BOUNDARY; import static com.baidu.bifromq.basekv.utils.BoundaryUtil.FULL_BOUNDARY; import static org.testng.Assert.assertEquals; @@ -22,23 +24,17 @@ import com.baidu.bifromq.basekv.proto.Boundary; import com.baidu.bifromq.basekv.proto.KVRangeDescriptor; import com.baidu.bifromq.basekv.proto.KVRangeId; -import com.baidu.bifromq.basekv.proto.KVRangeStoreDescriptor; import com.baidu.bifromq.basekv.raft.proto.RaftNodeStatus; +import com.baidu.bifromq.basekv.utils.BoundaryUtil; import com.google.protobuf.ByteString; +import java.util.Collections; import java.util.List; +import java.util.NavigableMap; import java.util.Optional; -import java.util.Set; -import org.testng.annotations.BeforeMethod; +import java.util.TreeMap; import org.testng.annotations.Test; -public class KVRangeRouterTest { - - private KVRangeRouter router; - - @BeforeMethod - public void setUp() { - router = new KVRangeRouter("testCluster"); - } +public class KVRangeRouterUtilTest { @Test public void reset() { @@ -47,18 +43,19 @@ public void reset() { @Test public void emptyRouter() { - assertFalse(router.findByKey(ByteString.copyFromUtf8("a")).isPresent()); - assertTrue(router.findByBoundary(FULL_BOUNDARY).isEmpty()); + assertFalse(findByKey(ByteString.copyFromUtf8("a"), Collections.emptyNavigableMap()).isPresent()); + assertTrue(findByBoundary(FULL_BOUNDARY, Collections.emptyNavigableMap()).isEmpty()); } @Test - public void findByKey() { + public void testFindByKey() { // Prepare test data with full space coverage KVRangeDescriptor range1 = KVRangeDescriptor.newBuilder() .setId(KVRangeId.newBuilder().setId(1).build()) .setRole(RaftNodeStatus.Leader) .setBoundary(Boundary.newBuilder().setEndKey(ByteString.copyFromUtf8("b")).build()) // (null, "b") .build(); + KVRangeSetting rangeSetting1 = new KVRangeSetting("testCluster", "V1", range1); KVRangeDescriptor range2 = KVRangeDescriptor.newBuilder() .setId(KVRangeId.newBuilder().setId(2).build()) @@ -68,6 +65,7 @@ public void findByKey() { .setEndKey(ByteString.copyFromUtf8("c")) .build()) // ["b", "c") .build(); + KVRangeSetting rangeSetting2 = new KVRangeSetting("testCluster", "V1", range2); KVRangeDescriptor range3 = KVRangeDescriptor.newBuilder() .setId(KVRangeId.newBuilder().setId(3).build()) @@ -77,6 +75,8 @@ public void findByKey() { .setEndKey(ByteString.copyFromUtf8("d")) .build()) // ["c", "d") .build(); + KVRangeSetting rangeSetting3 = new KVRangeSetting("testCluster", "V1", range3); + KVRangeDescriptor range4 = KVRangeDescriptor.newBuilder() .setId(KVRangeId.newBuilder().setId(4).build()) @@ -85,51 +85,51 @@ public void findByKey() { .setStartKey(ByteString.copyFromUtf8("d")) .build()) // ["d", null) .build(); + KVRangeSetting rangeSetting4 = new KVRangeSetting("testCluster", "V1", range4); - KVRangeStoreDescriptor storeDescriptor = KVRangeStoreDescriptor.newBuilder() - .setId("store1") - .addRanges(range1) - .addRanges(range2) - .addRanges(range3) - .addRanges(range4) - .build(); + NavigableMap router = new TreeMap<>(BoundaryUtil::compare); + router.put(range1.getBoundary(), rangeSetting1); + router.put(range2.getBoundary(), rangeSetting2); + router.put(range3.getBoundary(), rangeSetting3); + router.put(range4.getBoundary(), rangeSetting4); - assertTrue(router.reset(Set.of(storeDescriptor))); // Test find by key within the first range - Optional result1 = router.findByKey(ByteString.copyFromUtf8("a")); + Optional result1 = findByKey(ByteString.copyFromUtf8("a"), router); assertTrue(result1.isPresent()); assertEquals(result1.get().id.getId(), 1L); // Test find by key within the second range - Optional result2 = router.findByKey(ByteString.copyFromUtf8("b")); + Optional result2 = findByKey(ByteString.copyFromUtf8("b"), router); assertTrue(result2.isPresent()); assertEquals(result2.get().id.getId(), 2L); // Test find by key within the third range - Optional result3 = router.findByKey(ByteString.copyFromUtf8("c")); + Optional result3 = findByKey(ByteString.copyFromUtf8("c"), router); assertTrue(result3.isPresent()); assertEquals(result3.get().id.getId(), 3L); // Test find by key within the fourth range - Optional result4 = router.findByKey(ByteString.copyFromUtf8("d")); + Optional result4 = findByKey(ByteString.copyFromUtf8("d"), router); assertTrue(result4.isPresent()); assertEquals(result4.get().id.getId(), 4L); // Test find by key not in any range - Optional result5 = router.findByKey(ByteString.copyFromUtf8("z")); + Optional result5 = findByKey(ByteString.copyFromUtf8("z"), router); assertTrue(result5.isPresent()); assertEquals(result5.get().id.getId(), 4L); } @Test - public void findByBoundary() { + public void testFindByBoundary() { // Prepare test data with full space coverage KVRangeDescriptor range1 = KVRangeDescriptor.newBuilder() .setId(KVRangeId.newBuilder().setId(1).build()) .setRole(RaftNodeStatus.Leader) .setBoundary(Boundary.newBuilder().setEndKey(ByteString.copyFromUtf8("b")).build()) // (null, "b") .build(); + KVRangeSetting rangeSetting1 = new KVRangeSetting("testCluster", "V1", range1); + KVRangeDescriptor range2 = KVRangeDescriptor.newBuilder() .setId(KVRangeId.newBuilder().setId(2).build()) @@ -139,6 +139,8 @@ public void findByBoundary() { .setEndKey(ByteString.copyFromUtf8("c")) .build()) // ["b", "c") .build(); + KVRangeSetting rangeSetting2 = new KVRangeSetting("testCluster", "V1", range2); + KVRangeDescriptor range3 = KVRangeDescriptor.newBuilder() .setId(KVRangeId.newBuilder().setId(3).build()) @@ -148,6 +150,7 @@ public void findByBoundary() { .setEndKey(ByteString.copyFromUtf8("d")) .build()) // ["c", "d") .build(); + KVRangeSetting rangeSetting3 = new KVRangeSetting("testCluster", "V1", range3); KVRangeDescriptor range4 = KVRangeDescriptor.newBuilder() .setId(KVRangeId.newBuilder().setId(4).build()) @@ -156,59 +159,58 @@ public void findByBoundary() { .setStartKey(ByteString.copyFromUtf8("d")) .build()) // ["d", null) .build(); + KVRangeSetting rangeSetting4 = new KVRangeSetting("testCluster", "V1", range4); + + NavigableMap router = new TreeMap<>(BoundaryUtil::compare); + router.put(range1.getBoundary(), rangeSetting1); + router.put(range2.getBoundary(), rangeSetting2); + router.put(range3.getBoundary(), rangeSetting3); + router.put(range4.getBoundary(), rangeSetting4); - KVRangeStoreDescriptor storeDescriptor = KVRangeStoreDescriptor.newBuilder() - .setId("store1") - .addRanges(range1) - .addRanges(range2) - .addRanges(range3) - .addRanges(range4) - .build(); - router.reset(Set.of(storeDescriptor)); - List result0 = router.findByBoundary( - Boundary.newBuilder().setEndKey(ByteString.copyFromUtf8("a")).build()); // (null, "a") + List result0 = findByBoundary( + Boundary.newBuilder().setEndKey(ByteString.copyFromUtf8("a")).build(), router); // (null, "a") assertEquals(result0.size(), 1); assertEquals(result0.get(0).id.getId(), 1L); // Test find by exact boundary of the first range - List result1 = router.findByBoundary( - Boundary.newBuilder().setEndKey(ByteString.copyFromUtf8("b")).build()); // (null, "b") + List result1 = findByBoundary( + Boundary.newBuilder().setEndKey(ByteString.copyFromUtf8("b")).build(), router); // (null, "b") assertEquals(result1.size(), 1); assertEquals(result1.get(0).id.getId(), 1L); // Test find by overlapping boundary with the second range - List result2 = router.findByBoundary( - Boundary.newBuilder().setStartKey(ByteString.copyFromUtf8("b")).build()); // ["b", null) + List result2 = findByBoundary( + Boundary.newBuilder().setStartKey(ByteString.copyFromUtf8("b")).build(), router); // ["b", null) assertEquals(result2.size(), 3); // Covers ranges 2, 3, and 4 assertEquals(result2.get(0).boundary, range2.getBoundary()); assertEquals(result2.get(1).boundary, range3.getBoundary()); assertEquals(result2.get(2).boundary, range4.getBoundary()); // Test find by a boundary that overlaps multiple ranges - List result3 = router.findByBoundary( + List result3 = findByBoundary( Boundary.newBuilder().setStartKey(ByteString.copyFromUtf8("b")) .setEndKey(ByteString.copyFromUtf8("d")) - .build()); // ["b", "d") + .build(), router); // ["b", "d") assertEquals(result3.size(), 2); // Covers ranges 2 and 3 assertEquals(result3.get(0).boundary, range2.getBoundary()); assertEquals(result3.get(1).boundary, range3.getBoundary()); - List result4 = router.findByBoundary( + List result4 = findByBoundary( Boundary.newBuilder().setStartKey(ByteString.copyFromUtf8("x")) .setEndKey(ByteString.copyFromUtf8("y")) - .build()); // ["x", "y") + .build(), router); // ["x", "y") assertEquals(result4.size(), 1); assertEquals(result4.get(0).boundary, range4.getBoundary()); - List result5 = router.findByBoundary(FULL_BOUNDARY); + List result5 = findByBoundary(FULL_BOUNDARY, router); assertEquals(result5.size(), 4); assertEquals(result5.get(0).boundary, range1.getBoundary()); assertEquals(result5.get(1).boundary, range2.getBoundary()); assertEquals(result5.get(2).boundary, range3.getBoundary()); assertEquals(result5.get(3).boundary, range4.getBoundary()); - List result6 = router.findByBoundary(EMPTY_BOUNDARY); + List result6 = findByBoundary(EMPTY_BOUNDARY, router); assertEquals(result6.size(), 1); assertEquals(result6.get(0).boundary, range1.getBoundary()); } diff --git a/base-kv/base-kv-store-client/src/test/java/com/baidu/bifromq/basekv/client/scheduler/BatchMutationCallTest.java b/base-kv/base-kv-store-client/src/test/java/com/baidu/bifromq/basekv/client/scheduler/BatchMutationCallTest.java index 779860fa5..fbfc13b33 100644 --- a/base-kv/base-kv-store-client/src/test/java/com/baidu/bifromq/basekv/client/scheduler/BatchMutationCallTest.java +++ b/base-kv/base-kv-store-client/src/test/java/com/baidu/bifromq/basekv/client/scheduler/BatchMutationCallTest.java @@ -14,6 +14,7 @@ package com.baidu.bifromq.basekv.client.scheduler; import static com.baidu.bifromq.basekv.client.scheduler.Fixtures.setting; +import static com.baidu.bifromq.basekv.utils.BoundaryUtil.FULL_BOUNDARY; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.timeout; @@ -21,18 +22,18 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; -import com.baidu.bifromq.basekv.client.KVRangeSetting; import com.baidu.bifromq.basekv.client.IBaseKVStoreClient; import com.baidu.bifromq.basekv.client.IMutationPipeline; import com.baidu.bifromq.basekv.proto.KVRangeId; import com.baidu.bifromq.basekv.store.proto.KVRangeRWReply; import com.baidu.bifromq.basekv.store.proto.KVRangeRWRequest; +import com.baidu.bifromq.basekv.utils.BoundaryUtil; import com.baidu.bifromq.basekv.utils.KVRangeIdUtil; import com.google.protobuf.ByteString; import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.Optional; +import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadLocalRandom; @@ -42,7 +43,6 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.mockito.internal.util.collections.Sets; -import org.mockito.stubbing.Answer; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -71,7 +71,10 @@ public void teardown() { @Test public void addToSameBatch() { - when(storeClient.findByKey(any())).thenReturn(Optional.of(setting(id, "V1", 0))); + when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{ + put(FULL_BOUNDARY, setting(id, "V1", 0)); + }}); + when(storeClient.createMutationPipeline("V1")).thenReturn(mutationPipeline1); when(mutationPipeline1.execute(any())) .thenReturn(CompletableFuture.supplyAsync(() -> KVRangeRWReply.newBuilder().build(), @@ -102,12 +105,6 @@ public void addToSameBatch() { @Test public void addToDifferentBatch() { - when(storeClient.findByKey(any())).thenAnswer((Answer>) invocation -> { - int req = Integer.parseInt(((ByteString) invocation.getArgument(0)).toStringUtf8()); - return Optional.of(req < 500 ? - setting(id, "V1", 0) : - setting(id, "V2", 0)); - }); when(storeClient.createMutationPipeline("V1")).thenReturn(mutationPipeline1); when(storeClient.createMutationPipeline("V2")).thenReturn(mutationPipeline2); when(mutationPipeline1.execute(any())) @@ -124,6 +121,15 @@ public void addToDifferentBatch() { for (int i = 0; i < 1000; i++) { int req = ThreadLocalRandom.current().nextInt(1, 1001); reqList.add(req); + if (req < 500) { + when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{ + put(FULL_BOUNDARY, setting(id, "V1", 0)); + }}); + } else { + when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{ + put(FULL_BOUNDARY, setting(id, "V2", 0)); + }}); + } futures.add(scheduler.schedule(ByteString.copyFromUtf8(Integer.toString(req))) .thenAccept((v) -> respList.add(Integer.parseInt(v.toStringUtf8())))); } @@ -134,7 +140,10 @@ public void addToDifferentBatch() { @Test public void pipelineExpiry() { - when(storeClient.findByKey(any())).thenReturn(Optional.of(setting(id, "V1", 0))); + when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{ + put(FULL_BOUNDARY, setting(id, "V1", 0)); + }}); + when(storeClient.createMutationPipeline("V1")).thenReturn(mutationPipeline1); when(mutationPipeline1.execute(any())) .thenReturn(CompletableFuture.supplyAsync(() -> KVRangeRWReply.newBuilder().build())); diff --git a/base-kv/base-kv-store-client/src/test/java/com/baidu/bifromq/basekv/client/scheduler/BatchQueryCallTest.java b/base-kv/base-kv-store-client/src/test/java/com/baidu/bifromq/basekv/client/scheduler/BatchQueryCallTest.java index eb781e232..d96e99184 100644 --- a/base-kv/base-kv-store-client/src/test/java/com/baidu/bifromq/basekv/client/scheduler/BatchQueryCallTest.java +++ b/base-kv/base-kv-store-client/src/test/java/com/baidu/bifromq/basekv/client/scheduler/BatchQueryCallTest.java @@ -14,23 +14,26 @@ package com.baidu.bifromq.basekv.client.scheduler; import static com.baidu.bifromq.basekv.client.scheduler.Fixtures.setting; +import static com.baidu.bifromq.basekv.utils.BoundaryUtil.FULL_BOUNDARY; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; -import com.baidu.bifromq.basekv.client.KVRangeSetting; import com.baidu.bifromq.basekv.client.IBaseKVStoreClient; import com.baidu.bifromq.basekv.client.IQueryPipeline; +import com.baidu.bifromq.basekv.client.KVRangeSetting; import com.baidu.bifromq.basekv.proto.KVRangeId; import com.baidu.bifromq.basekv.store.proto.KVRangeROReply; +import com.baidu.bifromq.basekv.utils.BoundaryUtil; import com.baidu.bifromq.basekv.utils.KVRangeIdUtil; import com.google.protobuf.ByteString; import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; @@ -70,7 +73,9 @@ public void teardown() { public void addToSameBatch() { ExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - when(storeClient.findByKey(any())).thenReturn(Optional.of(setting(id, "V1", 0))); + when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{ + put(FULL_BOUNDARY, setting(id, "V1", 0)); + }}); when(storeClient.createLinearizedQueryPipeline("V1")).thenReturn(queryPipeline1); when(queryPipeline1.query(any())) .thenReturn(CompletableFuture.supplyAsync(() -> KVRangeROReply.newBuilder().build(), executor)); @@ -95,12 +100,6 @@ public void addToSameBatch() { @Test public void addToDifferentBatch() { - when(storeClient.findByKey(any())).thenAnswer((Answer>) invocation -> { - int req = Integer.parseInt(((ByteString) invocation.getArgument(0)).toStringUtf8()); - return Optional.of(req < 500 ? - setting(id, "V1", 0) : - setting(id, "V2", 0)); - }); when(storeClient.createLinearizedQueryPipeline("V1")).thenReturn(queryPipeline1); when(storeClient.createLinearizedQueryPipeline("V2")).thenReturn(queryPipeline2); ExecutorService executor1 = Executors.newSingleThreadScheduledExecutor(); @@ -121,10 +120,16 @@ public void addToDifferentBatch() { int req = ThreadLocalRandom.current().nextInt(1, 1001); if (req < 500) { reqList1.add(req); + when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{ + put(FULL_BOUNDARY, setting(id, "V1", 0)); + }}); futures.add(scheduler.schedule(ByteString.copyFromUtf8(Integer.toString(req))) .thenAccept((v) -> respList1.add(Integer.parseInt(v.toStringUtf8())))); } else { reqList2.add(req); + when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{ + put(FULL_BOUNDARY, setting(id, "V2", 0)); + }}); futures.add(scheduler.schedule(ByteString.copyFromUtf8(Integer.toString(req))) .thenAccept((v) -> respList2.add(Integer.parseInt(v.toStringUtf8())))); } @@ -141,7 +146,10 @@ public void addToDifferentBatch() { public void pipelineExpiry() { ExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - when(storeClient.findByKey(any())).thenReturn(Optional.of(setting(id, "V1", 0))); + when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{ + put(FULL_BOUNDARY, setting(id, "V1", 0)); + }}); + when(storeClient.createQueryPipeline("V1")).thenReturn(queryPipeline1); when(queryPipeline1.query(any())) .thenReturn(CompletableFuture.supplyAsync(() -> KVRangeROReply.newBuilder().build(), executor)); diff --git a/base-kv/base-kv-store-client/src/test/java/com/baidu/bifromq/basekv/client/scheduler/Fixtures.java b/base-kv/base-kv-store-client/src/test/java/com/baidu/bifromq/basekv/client/scheduler/Fixtures.java index e6b4662d4..7fc33eed7 100644 --- a/base-kv/base-kv-store-client/src/test/java/com/baidu/bifromq/basekv/client/scheduler/Fixtures.java +++ b/base-kv/base-kv-store-client/src/test/java/com/baidu/bifromq/basekv/client/scheduler/Fixtures.java @@ -13,6 +13,8 @@ package com.baidu.bifromq.basekv.client.scheduler; +import static com.baidu.bifromq.basekv.utils.BoundaryUtil.FULL_BOUNDARY; + import com.baidu.bifromq.basekv.client.KVRangeSetting; import com.baidu.bifromq.basekv.proto.KVRangeDescriptor; import com.baidu.bifromq.basekv.proto.KVRangeId; @@ -23,6 +25,7 @@ static KVRangeSetting setting(KVRangeId id, String leaderStoreId, long ver) { KVRangeDescriptor descriptor = KVRangeDescriptor.newBuilder() .setId(id) .setVer(ver) + .setBoundary(FULL_BOUNDARY) .setConfig(ClusterConfig.newBuilder() .addVoters(leaderStoreId) .build()) diff --git a/bifromq-dist/bifromq-dist-server/src/main/java/com/baidu/bifromq/dist/server/scheduler/DistCallScheduler.java b/bifromq-dist/bifromq-dist-server/src/main/java/com/baidu/bifromq/dist/server/scheduler/DistCallScheduler.java index f90986494..fc438dea6 100644 --- a/bifromq-dist/bifromq-dist-server/src/main/java/com/baidu/bifromq/dist/server/scheduler/DistCallScheduler.java +++ b/bifromq-dist/bifromq-dist-server/src/main/java/com/baidu/bifromq/dist/server/scheduler/DistCallScheduler.java @@ -13,6 +13,7 @@ package com.baidu.bifromq.dist.server.scheduler; +import static com.baidu.bifromq.basekv.client.KVRangeRouterUtil.findByBoundary; import static com.baidu.bifromq.dist.entity.EntityUtil.matchRecordKeyPrefix; import static com.baidu.bifromq.dist.entity.EntityUtil.tenantUpperBound; @@ -152,10 +153,10 @@ public CompletableFuture execute() { }); DistPack distPack = distPackBuilder.build(); int fanoutScale = tenantFanoutGetter.apply(tenantId); - List ranges = distWorkerClient.findByBoundary(Boundary.newBuilder() + List ranges = findByBoundary(Boundary.newBuilder() .setStartKey(matchRecordKeyPrefix(tenantId)) .setEndKey(tenantUpperBound(tenantId)) - .build()); + .build(), distWorkerClient.latestEffectiveRouter()); if (fanoutScale > fanoutSplitThreshold) { ranges.forEach(range -> distPacksByRangeReplica.computeIfAbsent( new KVRangeReplica(range.id, range.ver, range.leader), diff --git a/bifromq-dist/bifromq-dist-worker/src/test/java/com/baidu/bifromq/dist/worker/DistWorkerTest.java b/bifromq-dist/bifromq-dist-worker/src/test/java/com/baidu/bifromq/dist/worker/DistWorkerTest.java index 769fc5730..cfde6b50a 100644 --- a/bifromq-dist/bifromq-dist-worker/src/test/java/com/baidu/bifromq/dist/worker/DistWorkerTest.java +++ b/bifromq-dist/bifromq-dist-worker/src/test/java/com/baidu/bifromq/dist/worker/DistWorkerTest.java @@ -13,6 +13,7 @@ package com.baidu.bifromq.dist.worker; +import static com.baidu.bifromq.basekv.client.KVRangeRouterUtil.findByKey; import static com.baidu.bifromq.dist.entity.EntityUtil.toMatchRecordKey; import static com.baidu.bifromq.dist.entity.EntityUtil.toQInboxId; import static com.baidu.bifromq.plugin.subbroker.TypeUtil.to; @@ -279,7 +280,8 @@ protected BatchMatchReply.Result match(String tenantId, int maxMembersPerSharedSubGroup) { long reqId = ThreadLocalRandom.current().nextInt(); String qInboxId = toQInboxId(subBroker, inboxId, delivererKey); - KVRangeSetting s = storeClient.findByKey(toMatchRecordKey(tenantId, topicFilter, qInboxId)).get(); + KVRangeSetting s = + findByKey(toMatchRecordKey(tenantId, topicFilter, qInboxId), storeClient.latestEffectiveRouter()).get(); String scopedTopicFilter = EntityUtil.toScopedTopicFilter(tenantId, qInboxId, topicFilter); DistServiceRWCoProcInput input = DistServiceRWCoProcInput.newBuilder() .setBatchMatch(BatchMatchRequest.newBuilder() @@ -306,7 +308,8 @@ protected BatchUnmatchReply.Result unmatch(String tenantId, String topicFilter, String delivererKey) { long reqId = ThreadLocalRandom.current().nextInt(); String qInboxId = toQInboxId(subBroker, inboxId, delivererKey); - KVRangeSetting s = storeClient.findByKey(toMatchRecordKey(tenantId, topicFilter, qInboxId)).get(); + KVRangeSetting s = + findByKey(toMatchRecordKey(tenantId, topicFilter, qInboxId), storeClient.latestEffectiveRouter()).get(); String scopedTopicFilter = EntityUtil.toScopedTopicFilter(tenantId, qInboxId, topicFilter); DistServiceRWCoProcInput input = DistServiceRWCoProcInput.newBuilder() .setBatchUnmatch(BatchUnmatchRequest.newBuilder() @@ -328,7 +331,8 @@ protected BatchUnmatchReply.Result unmatch(String tenantId, String topicFilter, protected BatchDistReply dist(String tenantId, List msgs, String orderKey) { long reqId = ThreadLocalRandom.current().nextInt(); - KVRangeSetting s = storeClient.findByKey(EntityUtil.matchRecordKeyPrefix(tenantId)).get(); + KVRangeSetting s = + findByKey(EntityUtil.matchRecordKeyPrefix(tenantId), storeClient.latestEffectiveRouter()).get(); BatchDistRequest request = BatchDistRequest.newBuilder() .setReqId(reqId) .addDistPack(DistPack.newBuilder() diff --git a/bifromq-inbox/bifromq-inbox-gc/src/main/java/com/baidu/bifromq/inbox/store/gc/InboxStoreGCProcessor.java b/bifromq-inbox/bifromq-inbox-gc/src/main/java/com/baidu/bifromq/inbox/store/gc/InboxStoreGCProcessor.java index 44f30835a..0b396747b 100644 --- a/bifromq-inbox/bifromq-inbox-gc/src/main/java/com/baidu/bifromq/inbox/store/gc/InboxStoreGCProcessor.java +++ b/bifromq-inbox/bifromq-inbox-gc/src/main/java/com/baidu/bifromq/inbox/store/gc/InboxStoreGCProcessor.java @@ -13,13 +13,14 @@ package com.baidu.bifromq.inbox.store.gc; +import static com.baidu.bifromq.basekv.client.KVRangeRouterUtil.findByBoundary; import static com.baidu.bifromq.basekv.utils.BoundaryUtil.FULL_BOUNDARY; import static com.baidu.bifromq.basekv.utils.BoundaryUtil.upperBound; import static com.baidu.bifromq.inbox.util.KeyUtil.tenantPrefix; import static com.baidu.bifromq.inbox.util.MessageUtil.buildGCRequest; -import com.baidu.bifromq.basekv.client.KVRangeSetting; import com.baidu.bifromq.basekv.client.IBaseKVStoreClient; +import com.baidu.bifromq.basekv.client.KVRangeSetting; import com.baidu.bifromq.basekv.proto.Boundary; import com.baidu.bifromq.basekv.store.proto.KVRangeRORequest; import com.baidu.bifromq.basekv.store.proto.ROCoProcInput; @@ -61,10 +62,11 @@ public final CompletableFuture gc(long reqId, List rangeSettingList; if (tenantId != null) { ByteString tenantPrefix = tenantPrefix(tenantId); - rangeSettingList = storeClient.findByBoundary(Boundary.newBuilder() - .setStartKey(tenantPrefix).setEndKey(upperBound(tenantPrefix)).build()); + rangeSettingList = findByBoundary(Boundary.newBuilder() + .setStartKey(tenantPrefix).setEndKey(upperBound(tenantPrefix)).build(), + storeClient.latestEffectiveRouter()); } else { - rangeSettingList = storeClient.findByBoundary(FULL_BOUNDARY); + rangeSettingList = findByBoundary(FULL_BOUNDARY, storeClient.latestEffectiveRouter()); if (localServerId != null) { rangeSettingList.removeIf(rangeSetting -> !rangeSetting.leader.equals(localServerId)); } diff --git a/bifromq-inbox/bifromq-inbox-gc/src/test/java/com/baidu/bifromq/inbox/store/gc/InboxGCProcessorTest.java b/bifromq-inbox/bifromq-inbox-gc/src/test/java/com/baidu/bifromq/inbox/store/gc/InboxGCProcessorTest.java index b27255f06..2b5b591fe 100644 --- a/bifromq-inbox/bifromq-inbox-gc/src/test/java/com/baidu/bifromq/inbox/store/gc/InboxGCProcessorTest.java +++ b/bifromq-inbox/bifromq-inbox-gc/src/test/java/com/baidu/bifromq/inbox/store/gc/InboxGCProcessorTest.java @@ -14,11 +14,9 @@ package com.baidu.bifromq.inbox.store.gc; import static com.baidu.bifromq.basekv.utils.BoundaryUtil.FULL_BOUNDARY; -import static com.baidu.bifromq.basekv.utils.BoundaryUtil.upperBound; -import static com.baidu.bifromq.inbox.util.KeyUtil.tenantPrefix; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -26,12 +24,13 @@ import static org.testng.Assert.assertFalse; import com.baidu.bifromq.basehlc.HLC; -import com.baidu.bifromq.basekv.client.KVRangeSetting; import com.baidu.bifromq.basekv.client.IBaseKVStoreClient; +import com.baidu.bifromq.basekv.client.KVRangeSetting; import com.baidu.bifromq.basekv.proto.KVRangeDescriptor; import com.baidu.bifromq.basekv.store.proto.KVRangeROReply; import com.baidu.bifromq.basekv.store.proto.ROCoProcOutput; import com.baidu.bifromq.basekv.store.proto.ReplyCode; +import com.baidu.bifromq.basekv.utils.BoundaryUtil; import com.baidu.bifromq.basekv.utils.KVRangeIdUtil; import com.baidu.bifromq.inbox.client.IInboxClient; import com.baidu.bifromq.inbox.rpc.proto.DetachRequest; @@ -40,6 +39,7 @@ import com.baidu.bifromq.type.ClientInfo; import java.util.Collections; import java.util.List; +import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import org.mockito.ArgumentCaptor; import org.mockito.Mock; @@ -62,7 +62,9 @@ public class InboxGCProcessorTest { private final String remoteStoreId = "testRemoteStoreId"; private final KVRangeSetting remoteRangeSetting = new KVRangeSetting("cluster", remoteStoreId, - KVRangeDescriptor.newBuilder().setId(KVRangeIdUtil.generate()).build()); + KVRangeDescriptor.newBuilder().setId(KVRangeIdUtil.generate()) + .setBoundary(FULL_BOUNDARY) + .build()); private final String tenantId = "testTenantId"; private AutoCloseable closeable; @@ -79,31 +81,41 @@ public void tearDown() throws Exception { @Test public void testNoRangeForTenant() { inboxGCProc = new InboxStoreGCProcessor(inboxClient, storeClient); - when(storeClient.findByBoundary(any())).thenReturn(Collections.emptyList()); + when(storeClient.latestEffectiveRouter()).thenReturn(Collections.emptyNavigableMap()); IInboxStoreGCProcessor.Result result = inboxGCProc.gc(System.nanoTime(), tenantId, 10, HLC.INST.getPhysical()).join(); assertEquals(result, IInboxStoreGCProcessor.Result.OK); - verify(storeClient).findByBoundary(argThat(boundary -> - boundary.getStartKey().equals(tenantPrefix(tenantId)) - && boundary.getEndKey().equals(upperBound(tenantPrefix(tenantId))))); + verify(storeClient, times(1)).latestEffectiveRouter(); } @Test public void testNoLocalRange() { inboxGCProc = new InboxStoreGCProcessor(inboxClient, storeClient, localStoreId); - when(storeClient.findByBoundary(FULL_BOUNDARY)).thenReturn(List.of(remoteRangeSetting)); + when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{ + put(FULL_BOUNDARY, remoteRangeSetting); + }}); + when(storeClient.query(eq(remoteStoreId), any())).thenReturn( + CompletableFuture.completedFuture(KVRangeROReply.newBuilder() + .setCode(ReplyCode.Ok) + .setRoCoProcResult(ROCoProcOutput.newBuilder() + .setInboxService(InboxServiceROCoProcOutput.newBuilder() + .setGc(GCReply.newBuilder() + .setCode(GCReply.Code.OK) + .build()) + .build()) + .build()) + .build())); IInboxStoreGCProcessor.Result result = inboxGCProc.gc(System.nanoTime(), tenantId, 10, HLC.INST.getPhysical()).join(); assertEquals(result, IInboxStoreGCProcessor.Result.OK); - verify(storeClient).findByBoundary(argThat(boundary -> - boundary.getStartKey().equals(tenantPrefix(tenantId)) - && boundary.getEndKey().equals(upperBound(tenantPrefix(tenantId))))); } @Test public void testStoreQueryException() { inboxGCProc = new InboxStoreGCProcessor(inboxClient, storeClient); - when(storeClient.findByBoundary(FULL_BOUNDARY)).thenReturn(List.of(remoteRangeSetting)); + when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{ + put(FULL_BOUNDARY, remoteRangeSetting); + }}); when(storeClient.query(anyString(), any())) .thenReturn(CompletableFuture.failedFuture(new RuntimeException())); @@ -115,7 +127,9 @@ public void testStoreQueryException() { @Test public void testStoreQueryFailed() { inboxGCProc = new InboxStoreGCProcessor(inboxClient, storeClient); - when(storeClient.findByBoundary(FULL_BOUNDARY)).thenReturn(List.of(remoteRangeSetting)); + when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{ + put(FULL_BOUNDARY, remoteRangeSetting); + }}); when(storeClient.query(anyString(), any())) .thenReturn(CompletableFuture.completedFuture(KVRangeROReply.newBuilder() @@ -129,7 +143,10 @@ public void testStoreQueryFailed() { @Test public void testGCScanFailed() { inboxGCProc = new InboxStoreGCProcessor(inboxClient, storeClient); - when(storeClient.findByBoundary(FULL_BOUNDARY)).thenReturn(List.of(remoteRangeSetting)); + when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{ + put(FULL_BOUNDARY, remoteRangeSetting); + }}); + when(storeClient.query(anyString(), any())) .thenReturn(CompletableFuture.completedFuture(KVRangeROReply.newBuilder() .setCode(ReplyCode.Ok) @@ -163,7 +180,10 @@ public void detachAfterScan() { .setClient(ClientInfo.newBuilder().build()) .build(); inboxGCProc = new InboxStoreGCProcessor(inboxClient, storeClient); - when(storeClient.findByBoundary(FULL_BOUNDARY)).thenReturn(List.of(remoteRangeSetting)); + when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{ + put(FULL_BOUNDARY, remoteRangeSetting); + }}); + when(storeClient.query(anyString(), any())) .thenReturn(CompletableFuture.completedFuture(KVRangeROReply.newBuilder() .setCode(ReplyCode.Ok) @@ -210,7 +230,10 @@ public void expirySecondsOverride() { .setClient(ClientInfo.newBuilder().build()) .build(); inboxGCProc = new InboxStoreGCProcessor(inboxClient, storeClient); - when(storeClient.findByBoundary(FULL_BOUNDARY)).thenReturn(List.of(remoteRangeSetting)); + when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{ + put(FULL_BOUNDARY, remoteRangeSetting); + }}); + when(storeClient.query(anyString(), any())) .thenReturn(CompletableFuture.completedFuture(KVRangeROReply.newBuilder() .setCode(ReplyCode.Ok) diff --git a/bifromq-inbox/bifromq-inbox-store/src/test/java/com/baidu/bifromq/inbox/store/InboxStoreTest.java b/bifromq-inbox/bifromq-inbox-store/src/test/java/com/baidu/bifromq/inbox/store/InboxStoreTest.java index 67bc98878..e4b464985 100644 --- a/bifromq-inbox/bifromq-inbox-store/src/test/java/com/baidu/bifromq/inbox/store/InboxStoreTest.java +++ b/bifromq-inbox/bifromq-inbox-store/src/test/java/com/baidu/bifromq/inbox/store/InboxStoreTest.java @@ -13,6 +13,8 @@ package com.baidu.bifromq.inbox.store; +import static com.baidu.bifromq.basekv.client.KVRangeRouterUtil.findByBoundary; +import static com.baidu.bifromq.basekv.client.KVRangeRouterUtil.findByKey; import static com.baidu.bifromq.basekv.utils.BoundaryUtil.FULL_BOUNDARY; import static com.baidu.bifromq.inbox.util.KeyUtil.inboxPrefix; import static com.baidu.bifromq.metrics.TenantMetric.MqttPersistentSessionNumGauge; @@ -31,9 +33,9 @@ import com.baidu.bifromq.basecrdt.service.CRDTServiceOptions; import com.baidu.bifromq.basecrdt.service.ICRDTService; import com.baidu.bifromq.baseenv.EnvProvider; -import com.baidu.bifromq.basekv.client.KVRangeSetting; import com.baidu.bifromq.basekv.balance.option.KVRangeBalanceControllerOptions; import com.baidu.bifromq.basekv.client.IBaseKVStoreClient; +import com.baidu.bifromq.basekv.client.KVRangeSetting; import com.baidu.bifromq.basekv.localengine.rocksdb.RocksDBCPableKVEngineConfigurator; import com.baidu.bifromq.basekv.localengine.rocksdb.RocksDBWALableKVEngineConfigurator; import com.baidu.bifromq.basekv.store.option.KVRangeStoreOptions; @@ -254,7 +256,7 @@ public void afterCaseFinish(Method method) { } private InboxServiceROCoProcOutput query(ByteString routeKey, InboxServiceROCoProcInput input) { - KVRangeSetting s = storeClient.findByKey(routeKey).get(); + KVRangeSetting s = findByKey(routeKey, storeClient.latestEffectiveRouter()).get(); return query(s, input); } @@ -272,7 +274,7 @@ private InboxServiceROCoProcOutput query(KVRangeSetting s, InboxServiceROCoProcI private InboxServiceRWCoProcOutput mutate(ByteString routeKey, InboxServiceRWCoProcInput input) { - KVRangeSetting s = storeClient.findByKey(routeKey).get(); + KVRangeSetting s = findByKey(routeKey, storeClient.latestEffectiveRouter()).get(); KVRangeRWReply reply = storeClient.execute(s.leader, KVRangeRWRequest.newBuilder() .setReqId(input.getReqId()) .setVer(s.ver) @@ -290,7 +292,7 @@ protected GCReply requestGCScan(GCRequest request) { .setReqId(reqId) .setGc(request) .build(); - List rangeSettings = storeClient.findByBoundary(FULL_BOUNDARY); + List rangeSettings = findByBoundary(FULL_BOUNDARY, storeClient.latestEffectiveRouter()); assert !rangeSettings.isEmpty(); InboxServiceROCoProcOutput output = query(rangeSettings.get(0), input); assertTrue(output.hasGc()); diff --git a/bifromq-retain/bifromq-retain-gc/src/main/java/com/baidu/bifromq/retain/store/gc/RetainStoreGCProcessor.java b/bifromq-retain/bifromq-retain-gc/src/main/java/com/baidu/bifromq/retain/store/gc/RetainStoreGCProcessor.java index ae7943884..039b09805 100644 --- a/bifromq-retain/bifromq-retain-gc/src/main/java/com/baidu/bifromq/retain/store/gc/RetainStoreGCProcessor.java +++ b/bifromq-retain/bifromq-retain-gc/src/main/java/com/baidu/bifromq/retain/store/gc/RetainStoreGCProcessor.java @@ -13,12 +13,14 @@ package com.baidu.bifromq.retain.store.gc; +import static com.baidu.bifromq.basekv.client.KVRangeRouterUtil.findByBoundary; +import static com.baidu.bifromq.basekv.client.KVRangeRouterUtil.findByKey; import static com.baidu.bifromq.basekv.utils.BoundaryUtil.FULL_BOUNDARY; import static com.baidu.bifromq.retain.utils.KeyUtil.tenantNS; import static com.baidu.bifromq.retain.utils.MessageUtil.buildGCRequest; -import com.baidu.bifromq.basekv.client.KVRangeSetting; import com.baidu.bifromq.basekv.client.IBaseKVStoreClient; +import com.baidu.bifromq.basekv.client.KVRangeSetting; import com.baidu.bifromq.basekv.store.proto.KVRangeRWRequest; import com.baidu.bifromq.basekv.store.proto.RWCoProcInput; import java.util.Arrays; @@ -43,14 +45,15 @@ public CompletableFuture gc(long reqId, @Nullable Integer expirySeconds, long now) { if (tenantId != null) { - Optional rangeSettingOpt = storeClient.findByKey(tenantNS(tenantId)); + Optional rangeSettingOpt = + findByKey(tenantNS(tenantId), storeClient.latestEffectiveRouter()); if (rangeSettingOpt.isEmpty()) { return CompletableFuture.completedFuture(Result.ERROR); } KVRangeSetting rangeSetting = rangeSettingOpt.get(); return gcRange(reqId, rangeSetting, tenantId, expirySeconds, now); } else { - CompletableFuture[] gcFutures = storeClient.findByBoundary(FULL_BOUNDARY) + CompletableFuture[] gcFutures = findByBoundary(FULL_BOUNDARY, storeClient.latestEffectiveRouter()) .stream() .filter(k -> localServerId == null || k.leader.equals(localServerId)) .map(rangeSetting -> gcRange(reqId, rangeSetting, null, expirySeconds, now)) diff --git a/bifromq-retain/bifromq-retain-gc/src/test/java/com/baidu/bifromq/retain/store/gc/RetainStoreGCProcessorTest.java b/bifromq-retain/bifromq-retain-gc/src/test/java/com/baidu/bifromq/retain/store/gc/RetainStoreGCProcessorTest.java index e098f5012..3a7a8eec6 100644 --- a/bifromq-retain/bifromq-retain-gc/src/test/java/com/baidu/bifromq/retain/store/gc/RetainStoreGCProcessorTest.java +++ b/bifromq-retain/bifromq-retain-gc/src/test/java/com/baidu/bifromq/retain/store/gc/RetainStoreGCProcessorTest.java @@ -24,13 +24,17 @@ import static org.testng.Assert.assertEquals; import com.baidu.bifromq.basehlc.HLC; -import com.baidu.bifromq.basekv.client.KVRangeSetting; import com.baidu.bifromq.basekv.client.IBaseKVStoreClient; +import com.baidu.bifromq.basekv.client.KVRangeSetting; +import com.baidu.bifromq.basekv.proto.Boundary; import com.baidu.bifromq.basekv.proto.KVRangeDescriptor; +import com.baidu.bifromq.basekv.utils.BoundaryUtil; import com.baidu.bifromq.basekv.utils.KVRangeIdUtil; import com.baidu.bifromq.retain.rpc.proto.GCRequest; -import java.util.List; +import com.google.protobuf.ByteString; +import java.util.Collections; import java.util.Optional; +import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -58,11 +62,11 @@ public void teardown() throws Exception { public void testGCNonExistTenant() { String tenantId = "tenantId"; gcProcessor = new RetainStoreGCProcessor(storeClient, null); - when(storeClient.findByKey(any())).thenReturn(Optional.empty()); + when(storeClient.latestEffectiveRouter()).thenReturn(Collections.emptyNavigableMap()); + IRetainStoreGCProcessor.Result result = gcProcessor.gc(System.nanoTime(), tenantId, null, HLC.INST.getPhysical()).join(); assertEquals(result, IRetainStoreGCProcessor.Result.ERROR); - verify(storeClient).findByKey(eq(tenantNS(tenantId))); } @Test @@ -74,12 +78,14 @@ public void testGCTenantWithNullExpirySeconds() { KVRangeDescriptor rangeDescriptor = KVRangeDescriptor.newBuilder() .setId(KVRangeIdUtil.generate()) .setVer(1) + .setBoundary(FULL_BOUNDARY) .build(); KVRangeSetting setting = new KVRangeSetting("clueter", "store", rangeDescriptor); - when(storeClient.findByKey(any())).thenReturn(Optional.of(setting)); + when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{ + put(FULL_BOUNDARY, setting); + }}); when(storeClient.execute(anyString(), any())).thenReturn(new CompletableFuture<>()); gcProcessor.gc(reqId, tenantId, null, now); - verify(storeClient).findByKey(eq(tenantNS(tenantId))); verify(storeClient).execute(eq(setting.leader), argThat(req -> { if (req.getReqId() != reqId || req.getVer() != rangeDescriptor.getVer() @@ -104,12 +110,14 @@ public void testGCTenantWithExpirySeconds() { KVRangeDescriptor rangeDescriptor = KVRangeDescriptor.newBuilder() .setId(KVRangeIdUtil.generate()) .setVer(1) + .setBoundary(FULL_BOUNDARY) .build(); - KVRangeSetting setting = new KVRangeSetting("clueter", "store", rangeDescriptor); - when(storeClient.findByKey(any())).thenReturn(Optional.of(setting)); + KVRangeSetting setting = new KVRangeSetting("cluster", "store", rangeDescriptor); + when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{ + put(FULL_BOUNDARY, setting); + }}); when(storeClient.execute(anyString(), any())).thenReturn(new CompletableFuture<>()); gcProcessor.gc(reqId, tenantId, expirySeconds, now); - verify(storeClient).findByKey(eq(tenantNS(tenantId))); verify(storeClient).execute(eq(setting.leader), argThat(req -> { if (req.getReqId() != reqId || req.getVer() != rangeDescriptor.getVer() @@ -131,19 +139,28 @@ public void testGCWithLocalStoreAndExpirySeconds() { int expirySeconds = 10; String localStoreId = "localStore"; gcProcessor = new RetainStoreGCProcessor(storeClient, localStoreId); - KVRangeDescriptor rangeDescriptor = KVRangeDescriptor.newBuilder() - .setId(KVRangeIdUtil.generate()) + KVRangeDescriptor localDescriptor = KVRangeDescriptor.newBuilder() + .setId(KVRangeIdUtil.next(1)) + .setVer(1) + .setBoundary(Boundary.newBuilder().setEndKey(ByteString.copyFromUtf8("a")).build()) + .build(); + KVRangeDescriptor remoteDescriptor = KVRangeDescriptor.newBuilder() + .setId(KVRangeIdUtil.next(1)) .setVer(1) + .setBoundary(Boundary.newBuilder().setStartKey(ByteString.copyFromUtf8("a")).build()) .build(); - KVRangeSetting localSetting = new KVRangeSetting("cluster", localStoreId, rangeDescriptor); - KVRangeSetting remoteSetting = new KVRangeSetting("cluster", "remoteStore", rangeDescriptor); - when(storeClient.findByBoundary(FULL_BOUNDARY)).thenReturn(List.of(localSetting, remoteSetting)); + KVRangeSetting localSetting = new KVRangeSetting("cluster", localStoreId, localDescriptor); + KVRangeSetting remoteSetting = new KVRangeSetting("cluster", "remoteStore", remoteDescriptor); + when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{ + put(localDescriptor.getBoundary(), localSetting); + put(remoteDescriptor.getBoundary(), remoteSetting); + }}); when(storeClient.execute(anyString(), any())).thenReturn(new CompletableFuture<>()); gcProcessor.gc(reqId, null, expirySeconds, now); verify(storeClient).execute(eq(localSetting.leader), argThat(req -> { if (req.getReqId() != reqId - || req.getVer() != rangeDescriptor.getVer() - || !req.getKvRangeId().equals(rangeDescriptor.getId())) { + || req.getVer() != localDescriptor.getVer() + || !req.getKvRangeId().equals(localDescriptor.getId())) { return false; } GCRequest gcRequest = req.getRwCoProc().getRetainService().getGc(); @@ -160,19 +177,28 @@ public void testGCWithLocalStoreSpecified() { long now = HLC.INST.getPhysical(); String localStoreId = "localStore"; gcProcessor = new RetainStoreGCProcessor(storeClient, localStoreId); - KVRangeDescriptor rangeDescriptor = KVRangeDescriptor.newBuilder() - .setId(KVRangeIdUtil.generate()) + KVRangeDescriptor localDescriptor = KVRangeDescriptor.newBuilder() + .setId(KVRangeIdUtil.next(1)) .setVer(1) + .setBoundary(Boundary.newBuilder().setEndKey(ByteString.copyFromUtf8("a")).build()) .build(); - KVRangeSetting localSetting = new KVRangeSetting("cluster", localStoreId, rangeDescriptor); - KVRangeSetting remoteSetting = new KVRangeSetting("cluster", "remoteStore", rangeDescriptor); - when(storeClient.findByBoundary(FULL_BOUNDARY)).thenReturn(List.of(localSetting, remoteSetting)); + KVRangeDescriptor remoteDescriptor = KVRangeDescriptor.newBuilder() + .setId(KVRangeIdUtil.next(1)) + .setVer(1) + .setBoundary(Boundary.newBuilder().setStartKey(ByteString.copyFromUtf8("a")).build()) + .build(); + KVRangeSetting localSetting = new KVRangeSetting("cluster", localStoreId, localDescriptor); + KVRangeSetting remoteSetting = new KVRangeSetting("cluster", "remoteStore", remoteDescriptor); + when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{ + put(localDescriptor.getBoundary(), localSetting); + put(remoteDescriptor.getBoundary(), remoteSetting); + }}); when(storeClient.execute(anyString(), any())).thenReturn(new CompletableFuture<>()); gcProcessor.gc(reqId, null, null, now); verify(storeClient).execute(eq(localSetting.leader), argThat(req -> { if (req.getReqId() != reqId - || req.getVer() != rangeDescriptor.getVer() - || !req.getKvRangeId().equals(rangeDescriptor.getId())) { + || req.getVer() != localDescriptor.getVer() + || !req.getKvRangeId().equals(localDescriptor.getId())) { return false; } GCRequest gcRequest = req.getRwCoProc().getRetainService().getGc(); diff --git a/bifromq-retain/bifromq-retain-store/src/test/java/com/baidu/bifromq/retain/store/RetainStoreTest.java b/bifromq-retain/bifromq-retain-store/src/test/java/com/baidu/bifromq/retain/store/RetainStoreTest.java index 7a7b64aa1..ba24e2d14 100644 --- a/bifromq-retain/bifromq-retain-store/src/test/java/com/baidu/bifromq/retain/store/RetainStoreTest.java +++ b/bifromq-retain/bifromq-retain-store/src/test/java/com/baidu/bifromq/retain/store/RetainStoreTest.java @@ -13,6 +13,8 @@ package com.baidu.bifromq.retain.store; +import static com.baidu.bifromq.basekv.client.KVRangeRouterUtil.findByBoundary; +import static com.baidu.bifromq.basekv.client.KVRangeRouterUtil.findByKey; import static com.baidu.bifromq.basekv.utils.BoundaryUtil.FULL_BOUNDARY; import static com.baidu.bifromq.metrics.TenantMetric.MqttRetainNumGauge; import static com.baidu.bifromq.metrics.TenantMetric.MqttRetainSpaceGauge; @@ -29,9 +31,9 @@ import com.baidu.bifromq.basecrdt.service.ICRDTService; import com.baidu.bifromq.baseenv.EnvProvider; import com.baidu.bifromq.basehlc.HLC; -import com.baidu.bifromq.basekv.client.KVRangeSetting; import com.baidu.bifromq.basekv.balance.option.KVRangeBalanceControllerOptions; import com.baidu.bifromq.basekv.client.IBaseKVStoreClient; +import com.baidu.bifromq.basekv.client.KVRangeSetting; import com.baidu.bifromq.basekv.localengine.rocksdb.RocksDBCPableKVEngineConfigurator; import com.baidu.bifromq.basekv.localengine.rocksdb.RocksDBWALableKVEngineConfigurator; import com.baidu.bifromq.basekv.store.option.KVRangeStoreOptions; @@ -201,7 +203,7 @@ public void tearDown() throws Exception { protected RetainResult.Code requestRetain(String tenantId, TopicMessage topicMsg) { long reqId = ThreadLocalRandom.current().nextInt(); ByteString tenantNS = KeyUtil.tenantNS(tenantId); - KVRangeSetting s = storeClient.findByKey(tenantNS).get(); + KVRangeSetting s = findByKey(tenantNS, storeClient.latestEffectiveRouter()).get(); String topic = topicMsg.getTopic(); Message message = topicMsg.getMessage(); BatchRetainRequest request = BatchRetainRequest.newBuilder() @@ -235,7 +237,7 @@ protected MatchResult requestMatch(String tenantId, String topicFilter, int limi protected MatchResult requestMatch(String tenantId, long now, String topicFilter, int limit) { long reqId = ThreadLocalRandom.current().nextInt(); ByteString tenantNS = KeyUtil.tenantNS(tenantId); - KVRangeSetting s = storeClient.findByKey(tenantNS).get(); + KVRangeSetting s = findByKey(tenantNS, storeClient.latestEffectiveRouter()).get(); BatchMatchRequest request = BatchMatchRequest.newBuilder() .setReqId(reqId) .putMatchParams(tenantId, MatchParam.newBuilder() @@ -260,7 +262,7 @@ protected MatchResult requestMatch(String tenantId, long now, String topicFilter protected GCReply requestGC(long now, String tenantId, Integer expirySeconds) { long reqId = ThreadLocalRandom.current().nextInt(); - KVRangeSetting s = storeClient.findByBoundary(FULL_BOUNDARY).get(0); + KVRangeSetting s = findByBoundary(FULL_BOUNDARY, storeClient.latestEffectiveRouter()).get(0); RetainServiceRWCoProcInput input = buildGCRequest(reqId, now, tenantId, expirySeconds); KVRangeRWReply reply = storeClient.execute(s.leader, KVRangeRWRequest.newBuilder() .setReqId(reqId)