Skip to content

Commit

Permalink
Reduce read-write contention to improve the efficiency of collecting …
Browse files Browse the repository at this point in the history
…metrics
  • Loading branch information
popduke committed Aug 8, 2024
1 parent 39141a5 commit 70f4d0d
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public abstract class KVLoadBasedSplitHinter implements IKVRangeSplitHinter {
private final Gauge ioDensityGuage;
private final Gauge ioLatencyNanosGauge;
private final Gauge avgLatencyNanosGauge;
private volatile SplitHint latestHint = SplitHint.getDefaultInstance();

public KVLoadBasedSplitHinter(Supplier<Long> nanoSource,
Duration windowSize,
Expand All @@ -51,17 +52,17 @@ public KVLoadBasedSplitHinter(Supplier<Long> nanoSource,
this.windowSizeNanos = windowSize.toNanos();
this.toSplitKey = toSplitKey;
ioDensityGuage = Gauge.builder("basekv.load.est.iodensity",
() -> estimate().getLoadOrDefault(LOAD_TYPE_IO_DENSITY, 0))
() -> latestHint.getLoadOrDefault(LOAD_TYPE_IO_DENSITY, 0))
.tags(tags)
.tags("type", type())
.register(Metrics.globalRegistry);
ioLatencyNanosGauge = Gauge.builder("basekv.load.est.iolatency",
() -> estimate().getLoadOrDefault(LOAD_TYPE_IO_LATENCY_NANOS, 0))
() -> latestHint.getLoadOrDefault(LOAD_TYPE_IO_LATENCY_NANOS, 0))
.tags(tags)
.tags("type", type())
.register(Metrics.globalRegistry);
avgLatencyNanosGauge = Gauge.builder("basekv.load.est.avglatency", () ->
estimate().getLoadOrDefault(LOAD_TYPE_AVG_LATENCY_NANOS, 0))
latestHint.getLoadOrDefault(LOAD_TYPE_AVG_LATENCY_NANOS, 0))
.tags(tags)
.tags("type", type())
.register(Metrics.globalRegistry);
Expand All @@ -78,7 +79,8 @@ public SplitHint estimate() {
long currentSlot = getSlot();
trackedKeySlots.headMap(currentSlot - 2).clear();
recentLoadHints.headMap(currentSlot).clear();
return recentLoadHints.computeIfAbsent(currentSlot, n -> doEstimate(n - 1));
latestHint = recentLoadHints.computeIfAbsent(currentSlot, n -> doEstimate(n - 1));
return latestHint;
}

protected void onRecord(IKVLoadRecord kvLoadRecord) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class DistWorkerCoProc implements IKVRangeCoProc {
private final SubscriptionCache routeCache;
private final TenantsState tenantsState;
private final DeliverExecutorGroup fanoutExecutorGroup;
private transient Boundary boundary;

public DistWorkerCoProc(String clusterId,
String storeId,
Expand Down Expand Up @@ -223,7 +224,7 @@ private Runnable batchMatch(BatchMatchRequest request,
touchedTopics.add(ScopedTopic.builder()
.tenantId(tenantId)
.topic(topicFilter)
.boundary(reader.boundary())
.boundary(boundary)
.build());
}
replyBuilder.putResults(scopedTopicFilter, BatchMatchReply.Result.OK);
Expand Down Expand Up @@ -278,7 +279,7 @@ private Runnable batchMatch(BatchMatchRequest request,
touchedTopics.add(ScopedTopic.builder()
.tenantId(parseTenantId(groupMatchRecordKey))
.topic(groupTopicFilter)
.boundary(reader.boundary())
.boundary(boundary)
.build());
}
});
Expand Down Expand Up @@ -314,7 +315,7 @@ private Runnable batchUnmatch(BatchUnmatchRequest request,
touchedTopics.add(ScopedTopic.builder()
.tenantId(tenantId)
.topic(topicFilter)
.boundary(reader.boundary())
.boundary(boundary)
.build());
replyBuilder.putResults(scopedTopicFilter, BatchUnmatchReply.Result.OK);
} else {
Expand Down Expand Up @@ -362,7 +363,7 @@ private Runnable batchUnmatch(BatchUnmatchRequest request,
touchedTopics.add(ScopedTopic.builder()
.tenantId(parseTenantId(groupMatchRecordKey))
.topic(groupTopicFilter)
.boundary(reader.boundary())
.boundary(boundary)
.build());
}
} else {
Expand All @@ -388,19 +389,19 @@ private CompletableFuture<BatchDistReply> batchDist(BatchDistRequest request, IK
List<CompletableFuture<Map<String, Map<String, Integer>>>> distFanOutFutures = new ArrayList<>();
for (DistPack distPack : distPackList) {
String tenantId = distPack.getTenantId();
Boundary boundary = intersect(Boundary.newBuilder()
Boundary tenantBoundary = intersect(Boundary.newBuilder()
.setStartKey(matchRecordKeyPrefix(tenantId))
.setEndKey(tenantUpperBound(tenantId))
.build(), reader.boundary());
if (isEmptyRange(boundary)) {
.build(), boundary);
if (isEmptyRange(tenantBoundary)) {
continue;
}
for (TopicMessagePack topicMsgPack : distPack.getMsgPackList()) {
String topic = topicMsgPack.getTopic();
ScopedTopic scopedTopic = ScopedTopic.builder()
.tenantId(tenantId)
.topic(topic)
.boundary(reader.boundary())
.boundary(boundary)
.build();
distFanOutFutures.add(routeCache.get(scopedTopic)
.thenApply(matchResult -> {
Expand All @@ -426,6 +427,7 @@ private CompletableFuture<BatchDistReply> batchDist(BatchDistRequest request, IK

private void load() {
try (IKVCloseableReader reader = readerProvider.get()) {
boundary = reader.boundary();
IKVIterator itr = reader.iterator();
for (itr.seekToFirst(); itr.isValid(); ) {
String tenantId = parseTenantId(itr.key());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ class TenantsState {
private final Map<String, TenantRouteState> tenantRouteStates = new ConcurrentHashMap<>();
private final IKVCloseableReader reader;
private final String[] tags;
private transient Boundary boundary;

TenantsState(IKVCloseableReader reader, String... tags) {
this.reader = reader;
this.tags = tags;
boundary = reader.boundary();
}

void incNormalRoutes(String tenantId) {
Expand Down Expand Up @@ -90,6 +92,7 @@ void decSharedRoutes(String tenantId, int count) {
void reset() {
tenantRouteStates.values().forEach(TenantRouteState::destroy);
tenantRouteStates.clear();
boundary = reader.boundary();
}

void close() {
Expand All @@ -100,7 +103,7 @@ void close() {
private Supplier<Number> getSpaceUsageProvider(String tenantId) {
return () -> {
try {
return reader.size(intersect(reader.boundary(), Boundary.newBuilder()
return reader.size(intersect(boundary, Boundary.newBuilder()
.setStartKey(tenantPrefix(tenantId))
.setEndKey(tenantUpperBound(tenantId))
.build()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.baidu.bifromq.inbox.storage.proto.InboxMetadata;
import com.baidu.bifromq.plugin.eventcollector.IEventCollector;
import com.google.protobuf.ByteString;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
Expand All @@ -36,11 +37,13 @@ class TenantsState {
private final IEventCollector eventCollector;
private final IKVCloseableReader reader;
private final String[] tags;
private transient Boundary boundary;

TenantsState(IEventCollector eventCollector, IKVCloseableReader reader, String... tags) {
this.eventCollector = eventCollector;
this.reader = reader;
this.tags = tags;
boundary = reader.boundary();
}

Collection<InboxMetadata> getAll(String tenantId) {
Expand Down Expand Up @@ -91,6 +94,7 @@ void reset() {
tenantStates.values().forEach(TenantInboxSet::removeAll);
tenantStates.values().forEach(TenantInboxSet::destroy);
tenantStates.clear();
boundary = reader.boundary();
}

void close() {
Expand All @@ -102,11 +106,12 @@ private Supplier<Number> getTenantUsedSpace(String tenantId) {
return () -> {
try {
ByteString startKey = tenantPrefix(tenantId);
ByteString endKey = upperBound(tenantPrefix(tenantId));
return reader.size(intersect(reader.boundary(), Boundary.newBuilder()
ByteString endKey = upperBound(startKey);
Boundary tenantBoundary = intersect(boundary, Boundary.newBuilder()
.setStartKey(startKey)
.setEndKey(endKey)
.build()));
.build());
return reader.size(tenantBoundary);
} catch (Exception e) {
log.error("Failed to get used space for tenant:{}", tenantId, e);
return 0;
Expand Down

0 comments on commit 70f4d0d

Please sign in to comment.