Skip to content

Commit

Permalink
explicitly refresh route cache to ensure it reflects latest subscript…
Browse files Browse the repository at this point in the history
…ion changes
  • Loading branch information
popduke committed Oct 28, 2024
1 parent fc61cd4 commit 32d795e
Showing 1 changed file with 8 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,16 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import lombok.EqualsAndHashCode;
import org.checkerframework.checker.index.qual.NonNegative;
import org.checkerframework.checker.nullness.qual.Nullable;

class TenantRouteCache implements ITenantRouteCache {
private record RouteCacheKey(String topic, Boundary matchRecordBoundary) {
}

@EqualsAndHashCode
private static class RouteCacheIndexKey {
final RouteCacheKey cacheKey;
@EqualsAndHashCode.Exclude
volatile boolean refresh;

private RouteCacheIndexKey(RouteCacheKey cacheKey) {
this.cacheKey = cacheKey;
}
}

private final String tenantId;
private final AsyncLoadingCache<RouteCacheKey, Set<Matching>> routesCache;
private final TopicIndex<RouteCacheIndexKey> index;
private final TopicIndex<RouteCacheKey> index;

TenantRouteCache(String tenantId,
ITenantRouteMatcher matcher,
Expand Down Expand Up @@ -81,18 +69,17 @@ private RouteCacheIndexKey(RouteCacheKey cacheKey) {
}
})
.expireAfterAccess(expiryAfterAccess)
.refreshAfterWrite(Duration.ofSeconds(1))
.evictionListener((key, value, cause) -> {
if (key != null) {
index.remove(key.topic, new RouteCacheIndexKey(key));
index.remove(key.topic, key);
}
})
.buildAsync(new CacheLoader<>() {
@Override
public @Nullable Set<Matching> load(RouteCacheKey key) {
ITenantMeter.get(tenantId).recordCount(TenantMetric.MqttRouteCacheMissCount);
Map<String, Set<Matching>> results = matcher.matchAll(Collections.singleton(key.topic));
index.add(key.topic, new RouteCacheIndexKey(key));
index.add(key.topic, key);
return results.get(key.topic);
}

Expand All @@ -106,25 +93,15 @@ public Map<RouteCacheKey, Set<Matching>> loadAll(Set<? extends RouteCacheKey> ke
for (Map.Entry<String, Set<Matching>> entry : resultMap.entrySet()) {
RouteCacheKey key = topicToKeyMap.get(entry.getKey());
result.put(key, entry.getValue());
index.add(key.topic, new RouteCacheIndexKey(key));
index.add(key.topic, key);
}
return result;
}

@Override
public @Nullable Set<Matching> reload(RouteCacheKey key, Set<Matching> oldValue) {
Set<RouteCacheIndexKey> indexKey = index.get(key.topic);
if (indexKey.isEmpty() || index.get(key.topic).stream().anyMatch(k -> {
if (k.cacheKey.equals(key) && k.refresh) {
k.refresh = false;
return true;
}
return false;
})) {
Map<String, Set<Matching>> results = matcher.matchAll(Collections.singleton(key.topic));
return results.get(key.topic);
}
return oldValue;
Map<String, Set<Matching>> results = matcher.matchAll(Collections.singleton(key.topic));
return results.get(key.topic);
}
});
ITenantMeter.gauging(tenantId, TenantMetric.MqttRouteCacheSize, routesCache.synchronous()::estimatedSize);
Expand All @@ -133,8 +110,8 @@ public Map<RouteCacheKey, Set<Matching>> loadAll(Set<? extends RouteCacheKey> ke
@Override
public void refresh(Set<String> topicFilters) {
topicFilters.forEach(topicFilter -> {
for (RouteCacheIndexKey cacheKey : index.match(topicFilter)) {
cacheKey.refresh = true;
for (RouteCacheKey cacheKey : index.match(topicFilter)) {
routesCache.synchronous().refresh(cacheKey);
}
});
}
Expand Down

0 comments on commit 32d795e

Please sign in to comment.