From 70f4d0dac3bc1d789b38fb350a7a28a11cdc6c76 Mon Sep 17 00:00:00 2001 From: Yonny Hao Date: Thu, 8 Aug 2024 14:59:40 +0800 Subject: [PATCH] Reduce read-write contention to improve the efficiency of collecting metrics --- .../range/hinter/KVLoadBasedSplitHinter.java | 10 ++++++---- .../bifromq/dist/worker/DistWorkerCoProc.java | 18 ++++++++++-------- .../bifromq/dist/worker/TenantsState.java | 5 ++++- .../bifromq/inbox/store/TenantsState.java | 11 ++++++++--- 4 files changed, 28 insertions(+), 16 deletions(-) diff --git a/base-kv/base-kv-store-server/src/main/java/com/baidu/bifromq/basekv/store/range/hinter/KVLoadBasedSplitHinter.java b/base-kv/base-kv-store-server/src/main/java/com/baidu/bifromq/basekv/store/range/hinter/KVLoadBasedSplitHinter.java index 02e206be9..e772105c1 100644 --- a/base-kv/base-kv-store-server/src/main/java/com/baidu/bifromq/basekv/store/range/hinter/KVLoadBasedSplitHinter.java +++ b/base-kv/base-kv-store-server/src/main/java/com/baidu/bifromq/basekv/store/range/hinter/KVLoadBasedSplitHinter.java @@ -41,6 +41,7 @@ public abstract class KVLoadBasedSplitHinter implements IKVRangeSplitHinter { private final Gauge ioDensityGuage; private final Gauge ioLatencyNanosGauge; private final Gauge avgLatencyNanosGauge; + private volatile SplitHint latestHint = SplitHint.getDefaultInstance(); public KVLoadBasedSplitHinter(Supplier nanoSource, Duration windowSize, @@ -51,17 +52,17 @@ public KVLoadBasedSplitHinter(Supplier nanoSource, this.windowSizeNanos = windowSize.toNanos(); this.toSplitKey = toSplitKey; ioDensityGuage = Gauge.builder("basekv.load.est.iodensity", - () -> estimate().getLoadOrDefault(LOAD_TYPE_IO_DENSITY, 0)) + () -> latestHint.getLoadOrDefault(LOAD_TYPE_IO_DENSITY, 0)) .tags(tags) .tags("type", type()) .register(Metrics.globalRegistry); ioLatencyNanosGauge = Gauge.builder("basekv.load.est.iolatency", - () -> estimate().getLoadOrDefault(LOAD_TYPE_IO_LATENCY_NANOS, 0)) + () -> latestHint.getLoadOrDefault(LOAD_TYPE_IO_LATENCY_NANOS, 0)) .tags(tags) .tags("type", type()) .register(Metrics.globalRegistry); avgLatencyNanosGauge = Gauge.builder("basekv.load.est.avglatency", () -> - estimate().getLoadOrDefault(LOAD_TYPE_AVG_LATENCY_NANOS, 0)) + latestHint.getLoadOrDefault(LOAD_TYPE_AVG_LATENCY_NANOS, 0)) .tags(tags) .tags("type", type()) .register(Metrics.globalRegistry); @@ -78,7 +79,8 @@ public SplitHint estimate() { long currentSlot = getSlot(); trackedKeySlots.headMap(currentSlot - 2).clear(); recentLoadHints.headMap(currentSlot).clear(); - return recentLoadHints.computeIfAbsent(currentSlot, n -> doEstimate(n - 1)); + latestHint = recentLoadHints.computeIfAbsent(currentSlot, n -> doEstimate(n - 1)); + return latestHint; } protected void onRecord(IKVLoadRecord kvLoadRecord) { diff --git a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/DistWorkerCoProc.java b/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/DistWorkerCoProc.java index 5b25abe4e..15d3bfcc2 100644 --- a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/DistWorkerCoProc.java +++ b/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/DistWorkerCoProc.java @@ -97,6 +97,7 @@ class DistWorkerCoProc implements IKVRangeCoProc { private final SubscriptionCache routeCache; private final TenantsState tenantsState; private final DeliverExecutorGroup fanoutExecutorGroup; + private transient Boundary boundary; public DistWorkerCoProc(String clusterId, String storeId, @@ -223,7 +224,7 @@ private Runnable batchMatch(BatchMatchRequest request, touchedTopics.add(ScopedTopic.builder() .tenantId(tenantId) .topic(topicFilter) - .boundary(reader.boundary()) + .boundary(boundary) .build()); } replyBuilder.putResults(scopedTopicFilter, BatchMatchReply.Result.OK); @@ -278,7 +279,7 @@ private Runnable batchMatch(BatchMatchRequest request, touchedTopics.add(ScopedTopic.builder() .tenantId(parseTenantId(groupMatchRecordKey)) .topic(groupTopicFilter) - .boundary(reader.boundary()) + .boundary(boundary) .build()); } }); @@ -314,7 +315,7 @@ private Runnable batchUnmatch(BatchUnmatchRequest request, touchedTopics.add(ScopedTopic.builder() .tenantId(tenantId) .topic(topicFilter) - .boundary(reader.boundary()) + .boundary(boundary) .build()); replyBuilder.putResults(scopedTopicFilter, BatchUnmatchReply.Result.OK); } else { @@ -362,7 +363,7 @@ private Runnable batchUnmatch(BatchUnmatchRequest request, touchedTopics.add(ScopedTopic.builder() .tenantId(parseTenantId(groupMatchRecordKey)) .topic(groupTopicFilter) - .boundary(reader.boundary()) + .boundary(boundary) .build()); } } else { @@ -388,11 +389,11 @@ private CompletableFuture batchDist(BatchDistRequest request, IK List>>> distFanOutFutures = new ArrayList<>(); for (DistPack distPack : distPackList) { String tenantId = distPack.getTenantId(); - Boundary boundary = intersect(Boundary.newBuilder() + Boundary tenantBoundary = intersect(Boundary.newBuilder() .setStartKey(matchRecordKeyPrefix(tenantId)) .setEndKey(tenantUpperBound(tenantId)) - .build(), reader.boundary()); - if (isEmptyRange(boundary)) { + .build(), boundary); + if (isEmptyRange(tenantBoundary)) { continue; } for (TopicMessagePack topicMsgPack : distPack.getMsgPackList()) { @@ -400,7 +401,7 @@ private CompletableFuture batchDist(BatchDistRequest request, IK ScopedTopic scopedTopic = ScopedTopic.builder() .tenantId(tenantId) .topic(topic) - .boundary(reader.boundary()) + .boundary(boundary) .build(); distFanOutFutures.add(routeCache.get(scopedTopic) .thenApply(matchResult -> { @@ -426,6 +427,7 @@ private CompletableFuture batchDist(BatchDistRequest request, IK private void load() { try (IKVCloseableReader reader = readerProvider.get()) { + boundary = reader.boundary(); IKVIterator itr = reader.iterator(); for (itr.seekToFirst(); itr.isValid(); ) { String tenantId = parseTenantId(itr.key()); diff --git a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/TenantsState.java b/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/TenantsState.java index 367d354bd..4a89cd3ec 100644 --- a/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/TenantsState.java +++ b/bifromq-dist/bifromq-dist-worker/src/main/java/com/baidu/bifromq/dist/worker/TenantsState.java @@ -29,10 +29,12 @@ class TenantsState { private final Map tenantRouteStates = new ConcurrentHashMap<>(); private final IKVCloseableReader reader; private final String[] tags; + private transient Boundary boundary; TenantsState(IKVCloseableReader reader, String... tags) { this.reader = reader; this.tags = tags; + boundary = reader.boundary(); } void incNormalRoutes(String tenantId) { @@ -90,6 +92,7 @@ void decSharedRoutes(String tenantId, int count) { void reset() { tenantRouteStates.values().forEach(TenantRouteState::destroy); tenantRouteStates.clear(); + boundary = reader.boundary(); } void close() { @@ -100,7 +103,7 @@ void close() { private Supplier getSpaceUsageProvider(String tenantId) { return () -> { try { - return reader.size(intersect(reader.boundary(), Boundary.newBuilder() + return reader.size(intersect(boundary, Boundary.newBuilder() .setStartKey(tenantPrefix(tenantId)) .setEndKey(tenantUpperBound(tenantId)) .build())); diff --git a/bifromq-inbox/bifromq-inbox-store/src/main/java/com/baidu/bifromq/inbox/store/TenantsState.java b/bifromq-inbox/bifromq-inbox-store/src/main/java/com/baidu/bifromq/inbox/store/TenantsState.java index decf0461f..e15d5d7f3 100644 --- a/bifromq-inbox/bifromq-inbox-store/src/main/java/com/baidu/bifromq/inbox/store/TenantsState.java +++ b/bifromq-inbox/bifromq-inbox-store/src/main/java/com/baidu/bifromq/inbox/store/TenantsState.java @@ -22,6 +22,7 @@ import com.baidu.bifromq.inbox.storage.proto.InboxMetadata; import com.baidu.bifromq.plugin.eventcollector.IEventCollector; import com.google.protobuf.ByteString; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -36,11 +37,13 @@ class TenantsState { private final IEventCollector eventCollector; private final IKVCloseableReader reader; private final String[] tags; + private transient Boundary boundary; TenantsState(IEventCollector eventCollector, IKVCloseableReader reader, String... tags) { this.eventCollector = eventCollector; this.reader = reader; this.tags = tags; + boundary = reader.boundary(); } Collection getAll(String tenantId) { @@ -91,6 +94,7 @@ void reset() { tenantStates.values().forEach(TenantInboxSet::removeAll); tenantStates.values().forEach(TenantInboxSet::destroy); tenantStates.clear(); + boundary = reader.boundary(); } void close() { @@ -102,11 +106,12 @@ private Supplier getTenantUsedSpace(String tenantId) { return () -> { try { ByteString startKey = tenantPrefix(tenantId); - ByteString endKey = upperBound(tenantPrefix(tenantId)); - return reader.size(intersect(reader.boundary(), Boundary.newBuilder() + ByteString endKey = upperBound(startKey); + Boundary tenantBoundary = intersect(boundary, Boundary.newBuilder() .setStartKey(startKey) .setEndKey(endKey) - .build())); + .build()); + return reader.size(tenantBoundary); } catch (Exception e) { log.error("Failed to get used space for tenant:{}", tenantId, e); return 0;