From 1f0a47f62c5870c7483dc9021c1bae51d2c2821a Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Tue, 18 Jun 2024 09:06:25 -0700 Subject: [PATCH] d'oh. observers now concurrent skip list. reject joins if not observer. better logging --- .../apollo/fireflies/ViewManagement.java | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java index 2521d07d4..5e18e4338 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java @@ -30,10 +30,7 @@ import java.time.Duration; import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; @@ -51,7 +48,7 @@ public class ViewManagement { private static final Logger log = LoggerFactory.getLogger(ViewManagement.class); final AtomicReference diadem = new AtomicReference<>(); - final Set observers = new TreeSet<>(); + final Set observers = new ConcurrentSkipListSet<>(); private final AtomicInteger attempt = new AtomicInteger(); private final Digest bootstrapView; private final DynamicContext context; @@ -288,6 +285,11 @@ void join(Join join, Digest from, StreamObserver responseObserver, Time .toList(), from, responseObserver, timer); return; } + if (!observers.contains(node.getId())) { + log.warn("Join not observer! from: {} observers: {} on: {}", from, observers, node.getId()); + responseObserver.onNext(Gateway.getDefaultInstance()); + return; + } if (!thisView.equals(joinView)) { responseObserver.onError(new StatusRuntimeException( Status.OUT_OF_RANGE.withDescription("View: " + joinView + " does not match: " + thisView))); @@ -376,8 +378,8 @@ boolean joined() { */ void maybeViewChange() { if (context.size() == 1 && joins.size() < context.getRingCount() - 1) { - log.info("Do not have minimum cluster size: {} required: {} for: {} on: {}", joins.size() + context.size(), - 4, currentView(), node.getId()); + log.info("Cannot form cluster: {} with: {} members, required:4 on: {}", currentView(), + joins.size() + context.size(), node.getId()); view.scheduleViewChange(); return; } @@ -390,7 +392,7 @@ void maybeViewChange() { // Use pending rebuttals as a proxy for stability if (view.hasPendingRebuttals()) { log.debug("Pending rebuttals in view: {} on: {}", currentView(), node.getId()); - view.scheduleViewChange(1); // 1 TTL round to check again + view.scheduleViewChange(2); // 2 TTL round2 to check again } else { view.scheduleFinalizeViewChange(); } @@ -474,8 +476,8 @@ Redirect seed(Registration registration, Digest from) { final var introductions = observers.stream().map(context::getMember).toList(); - log.debug("Member seeding: {} view: {} context: {} introductions: {} on: {}", newMember.getId(), - currentView(), context.getId(), introductions.size(), node.getId()); + log.info("Member seeding: {} view: {} context: {} introductions: {} on: {}", newMember.getId(), + currentView(), context.getId(), introductions.stream().map(p -> p.getId()).toList(), node.getId()); return Redirect.newBuilder() .setView(currentView().toDigeste()) .addAllIntroductions(introductions.stream() @@ -599,7 +601,7 @@ private void setDiadem(final HexBloom hex) { diadem.set(hex); currentView.set(diadem.get().compactWrapped()); resetObservers(); - log.debug("View: {} set diadem: {} observers: {} view: {} context: {} size: {} on: {}", context.getId(), + log.trace("View: {} set diadem: {} observers: {} view: {} context: {} size: {} on: {}", context.getId(), diadem.get().compactWrapped(), observers.stream().toList(), currentView(), context.getId(), context.size(), node.getId()); }