diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index 104d086bc..10a1b8fc7 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -210,10 +210,9 @@ public static String print(Join join, DigestAlgorithm da) { } public static Reconfigure reconfigure(Digest nextViewId, Map joins, int checkpointTarget) { + assert Dag.validate(joins.size()) : "Reconfigure joins: %s is not BFT".formatted(joins.size()); var builder = Reconfigure.newBuilder().setCheckpointTarget(checkpointTarget).setId(nextViewId.toDigeste()); - joins.keySet().stream().sorted().map(joins::get).forEach(builder::addJoins); - return builder.build(); } @@ -232,13 +231,6 @@ public static Block reconfigure(Digest nextViewId, Map joins, Hash .build(); } - public static Map rosterMap(Context baseContext, Collection members) { - return members.stream() - .map(baseContext::getMember) - .filter(m -> m != null) - .collect(Collectors.toMap(Member::getId, Function.identity())); - } - public static List toGenesisData(List initializationData) { return toGenesisData(initializationData, DigestAlgorithm.DEFAULT, SignatureAlgorithm.DEFAULT); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Committee.java b/choam/src/main/java/com/salesforce/apollo/choam/Committee.java index ecc92f48e..eb318320a 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Committee.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Committee.java @@ -15,6 +15,7 @@ import com.salesforce.apollo.cryptography.JohnHancock; import com.salesforce.apollo.cryptography.Verifier; import com.salesforce.apollo.cryptography.Verifier.DefaultVerifier; +import com.salesforce.apollo.ethereal.Dag; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.MockMember; import io.grpc.StatusRuntimeException; @@ -35,6 +36,8 @@ public interface Committee { static Map validatorsOf(Reconfigure reconfigure, Context context, Digest member, Logger log) { + assert Dag.validate(reconfigure.getJoinsCount()) : "Reconfigure joins: %s is not BFT".formatted( + reconfigure.getJoinsCount()); var validators = reconfigure.getJoinsList().stream().collect(Collectors.toMap(e -> { var id = new Digest(e.getMember().getVm().getId()); var m = context.getMember(id); @@ -47,7 +50,6 @@ static Map validatorsOf(Reconfigure reconfigure, Context { var vm = e.getMember().getVm(); if (vm.hasConsensusKey()) { - return new DefaultVerifier(publicKey(vm.getConsensusKey())); } else { log.info("No member for validator: {}, returning mock on: {}", Digest.from(vm.getId()), member); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java index fb003c225..9538fe8ce 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java @@ -21,6 +21,7 @@ import com.salesforce.apollo.cryptography.DigestAlgorithm; import com.salesforce.apollo.ethereal.Config; import com.salesforce.apollo.ethereal.Config.Builder; +import com.salesforce.apollo.ethereal.Dag; import com.salesforce.apollo.ethereal.Ethereal; import com.salesforce.apollo.ethereal.memberships.ChRbcGossip; import com.salesforce.apollo.membership.Member; @@ -325,8 +326,8 @@ private void publish(PendingBlock p, boolean beacon) { private void reconfigure() { final var slate = assembly.getSlate(); - assert slate != null && !slate.isEmpty() : "Slate is incorrect: %s".formatted( - slate.keySet().stream().sorted().toList()); + assert slate != null && !slate.isEmpty() : slate == null ? "Slate is null" : "Slate is empty"; + assert Dag.validate(slate.size()) : "Reconfigure joins: %s is not BFT".formatted(slate.size()); var reconfiguration = new HashedBlock(params().digestAlgorithm(), view.reconfigure(slate, nextViewId, previousBlock.get(), checkpoint.get())); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java index cf31987be..98a510a8c 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java @@ -18,6 +18,7 @@ import com.salesforce.apollo.cryptography.JohnHancock; import com.salesforce.apollo.cryptography.proto.Digeste; import com.salesforce.apollo.cryptography.proto.PubKey; +import com.salesforce.apollo.ethereal.Dag; import com.salesforce.apollo.membership.Member; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -142,9 +143,9 @@ void assemble(List asses) { } views.forEach(svs -> { if (view.validate(svs)) { - log.info("Adding views: {} from: {} on: {}", - svs.getViews().getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(), - Digest.from(svs.getViews().getMember()), params().member().getId()); + log.debug("Adding views: {} from: {} on: {}", + svs.getViews().getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(), + Digest.from(svs.getViews().getMember()), params().member().getId()); viewProposals.put(Digest.from(svs.getViews().getMember()), svs.getViews()); } else { log.warn("Invalid views: {} from: {} on: {}", @@ -377,6 +378,8 @@ private void vote() { params().member().getId()); } var winner = ratification.getFirst(); + assert Dag.validate(winner.getCommitteeCount()) : "Winner committee: %s is not BFT".formatted( + winner.getCommitteeList().size()); selected = new Vue(Digest.from(winner.getDiadem()), assemblyOf(winner.getCommitteeList()), winner.getMajority()); if (log.isDebugEnabled()) { @@ -408,8 +411,8 @@ public void certify() { transitions.certified(); } else { countdown.set(4); - log.info("Not certifying: {} majority: {} slate: {} of: {} on: {}", nextViewId, selected.majority, - proposals.keySet().stream().sorted().toList(), nextViewId, params().member().getId()); + log.debug("Not certifying: {} majority: {} slate: {} of: {} on: {}", nextViewId, selected.majority, + proposals.keySet().stream().sorted().toList(), nextViewId, params().member().getId()); } } @@ -420,7 +423,7 @@ public void checkAssembly() { if (proposals.size() >= selected.majority) { transitions.chill(); } else { - log.info("Check assembly: {} on: {}", proposals.size(), params().member().getId()); + log.trace("Check assembly: {} on: {}", proposals.size(), params().member().getId()); } } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java index 46a0c9d71..4f7a71d1f 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import static com.salesforce.apollo.cryptography.QualifiedBase64.publicKey; @@ -48,11 +49,10 @@ public ViewContext(Context context, Parameters params, Supplier { + roster.put(m.getId(), (short) pid.getAndIncrement()); + }); } public static String print(Certification c, DigestAlgorithm algo) { diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java index 2bbe4fe53..c8f84fdb5 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java @@ -152,7 +152,13 @@ private void complete(Member member, CompletableFuture gateway, HashMulti } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { - log.warn("Error retrieving Gateway from: {} on: {}", member.getId(), node.getId(), e.getCause()); + var cause = e.getCause(); + if (cause instanceof StatusRuntimeException sre) { + log.warn("Error retrieving Gateway: {} from: {} on: {}", sre.getMessage(), member.getId(), + node.getId()); + } else { + log.error("Error retrieving Gateway from: {} on: {}", member.getId(), node.getId(), cause); + } dec(complete, remaining); return; } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java index 7a1ded956..044d8e854 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java @@ -83,9 +83,10 @@ * @since 220 */ public class View { - private static final String FINALIZE_VIEW_CHANGE = "FINALIZE VIEW CHANGE"; + private static final String FINALIZE_VIEW_CHANGE = "Finalize View Change"; private static final Logger log = LoggerFactory.getLogger(View.class); private static final String SCHEDULED_VIEW_CHANGE = "Scheduled View Change"; + private static final String CLEAR_OBSERVATIONS = "Clear Observations"; final CommonCommunications comm; final AtomicBoolean started = new AtomicBoolean(); @@ -98,7 +99,7 @@ public class View { private final Executor viewNotificationQueue; private final FireflyMetrics metrics; private final Node node; - private final Map observations = new ConcurrentSkipListMap<>(); + private final Map observations = new ConcurrentSkipListMap<>(); private final Parameters params; private final ConcurrentMap pendingRebuttals = new ConcurrentSkipListMap<>(); private final RoundScheduler roundTimers; @@ -109,6 +110,7 @@ public class View { private final EventValidation validation; private final Verifiers verifiers; private volatile ScheduledFuture futureGossip; + private volatile boolean boostrap = false; public View(DynamicContext context, ControlledIdentifierMember member, String endpoint, EventValidation validation, Verifiers verifiers, Router communications, Parameters params, @@ -325,6 +327,7 @@ boolean addToView(NoteWrapper note) { } void bootstrap(NoteWrapper nw, Duration dur) { + boostrap = true; viewManagement.bootstrap(nw, dur); } @@ -344,45 +347,23 @@ void finalizeViewChange() { return; } viewChange(() -> { - final var current = currentView(); + removeTimer(View.FINALIZE_VIEW_CHANGE); final var supermajority = context.getRingCount() * 3 / 4; final var majority = context.size() == 1 ? 1 : supermajority; - final var valid = observations.values() - .stream() - .filter(vc -> current.equals(Digest.from(vc.getChange().getCurrent()))) - .filter(svc -> viewManagement.observers.contains( - Digest.from(svc.getChange().getObserver()))) - .toList(); - log.trace("Finalize view change, observations: {} valid: {} observers: {} on: {}", - observations.values().stream().map(sv -> Digest.from(sv.getChange().getObserver())).toList(), - valid.size(), viewManagement.observersList(), node.getId()); - observations.clear(); - if (valid.size() < majority) { - log.info("Do not have majority: {} required: {} observers: {} for: {} on: {}", valid.size(), majority, - viewManagement.observersList(), currentView(), node.getId()); - scheduleFinalizeViewChange(2); + log.info("Finalize view change, observations: {} observers: {} on: {}", + observations.keySet().stream().toList(), viewManagement.observersList(), node.getId()); + if (observations.size() < majority) { + log.info("Do not have majority: {} required: {} observers: {} for: {} on: {}", observations.size(), + majority, viewManagement.observersList(), currentView(), node.getId()); + scheduleFinalizeViewChange(1); return; } log.info("Finalizing view change: {} required: {} observers: {} for: {} on: {}", context.getId(), majority, viewManagement.observersList(), currentView(), node.getId()); HashMultiset ballots = HashMultiset.create(); - valid.forEach(vc -> { - final var leaving = vc.getChange() - .getLeavesList() - .stream() - .map(Digest::from) - .distinct() - .collect(Collectors.toCollection(ArrayList::new)); - final var joining = vc.getChange() - .getJoinsList() - .stream() - .map(Digest::from) - .distinct() - .collect(Collectors.toCollection(ArrayList::new)); - leaving.sort(Ordering.natural()); - joining.sort(Ordering.natural()); - ballots.add(new Ballot(Digest.from(vc.getChange().getCurrent()), leaving, joining, digestAlgo)); - }); + observations.values().forEach(svu -> tally(svu, ballots)); + viewManagement.clearVote(); + scheduleClearObservations(); var max = ballots.entrySet() .stream() .max(Ordering.natural().onResultOf(Multiset.Entry::getCount)) @@ -391,17 +372,15 @@ void finalizeViewChange() { log.info("View consensus successful: {} required: {} cardinality: {} for: {} on: {}", max, majority, viewManagement.cardinality(), currentView(), node.getId()); viewManagement.install(max.getElement()); + scheduleViewChange(); } else { @SuppressWarnings("unchecked") final var reversed = Comparator.comparing(e -> ((Entry) e).getCount()).reversed(); log.info("View consensus failed: {}, required: {} cardinality: {} ballots: {} for: {} on: {}", max == null ? 0 : max.getCount(), majority, viewManagement.cardinality(), ballots.entrySet().stream().sorted(reversed).toList(), currentView(), node.getId()); + viewManagement.initiateViewChange(); } - - scheduleViewChange(); - removeTimer(View.FINALIZE_VIEW_CHANGE); - viewManagement.clearVote(); }); } @@ -419,7 +398,7 @@ boolean hasPendingRebuttals() { } void initiate(SignedViewChange viewChange) { - observations.put(node.getId(), viewChange); + observations.put(node.getId(), new SVU(viewChange, digestAlgo)); } void introduced() { @@ -521,6 +500,13 @@ void schedule(final Duration duration) { Entropy.nextBitsStreamLong(duration.toNanos()), TimeUnit.NANOSECONDS); } + void scheduleClearObservations() { + if (!started.get()) { + return; + } + timers.put(CLEAR_OBSERVATIONS, roundTimers.schedule(CLEAR_OBSERVATIONS, () -> observations.clear(), 1)); + } + void scheduleFinalizeViewChange() { scheduleFinalizeViewChange(params.finalizeViewRounds()); } @@ -582,7 +568,7 @@ void stopRebuttalTimer(Participant m) { m.clearAccusations(); var timer = pendingRebuttals.remove(m.getId()); if (timer != null) { - log.debug("Cancelling accusation of: {} on: {}", m.getId(), node.getId()); + log.info("Cancelling accusation of: {} on: {}", m.getId(), node.getId()); timer.cancel(); } } @@ -691,8 +677,8 @@ private void accuse(Participant member, int ring, Throwable e) { member.addAccusation(node.accuse(member, ring)); pendingRebuttals.computeIfAbsent(member.getId(), d -> roundTimers.schedule(() -> gc(member), params.rebuttalTimeout())); - log.debug("Accuse: {} on ring: {} view: {} (timer started): {} on: {}", member.getId(), ring, currentView(), - e.getMessage(), node.getId()); + log.info("Accuse: {} on ring: {} view: {} (timer started): {} on: {}", member.getId(), ring, currentView(), + e.getMessage(), node.getId()); } /** @@ -754,15 +740,15 @@ private boolean add(AccusationWrapper accusation, Participant accuser, Participa accused)) { if (!accused.verify(accusation.getSignature(), accusation.getWrapped().getAccusation().toByteString())) { - log.trace("Accusation discarded, accusation by: {} accused:{} signature invalid on: {}", + log.debug("Accusation discarded, accusation by: {} accused:{} signature invalid on: {}", accuser.getId(), accused.getId(), node.getId()); return false; } accused.addAccusation(accusation); pendingRebuttals.computeIfAbsent(accused.getId(), d -> roundTimers.schedule(() -> gc(accused), params.rebuttalTimeout())); - log.debug("{} accused by: {} on ring: {} (replacing: {}) on: {}", accused.getId(), accuser.getId(), - accusation.getRingNumber(), currentAccuser.getId(), node.getId()); + log.info("{} accused by: {} on ring: {} (replacing: {}) on: {}", accused.getId(), accuser.getId(), + accusation.getRingNumber(), currentAccuser.getId(), node.getId()); if (metrics != null) { metrics.accusations().mark(); } @@ -790,8 +776,8 @@ private boolean add(AccusationWrapper accusation, Participant accuser, Participa if (accuser.equals(predecessor)) { accused.addAccusation(accusation); if (!accused.equals(node) && !pendingRebuttals.containsKey(accused.getId())) { - log.debug("{} accused by: {} on ring: {} (timer started) on: {}", accused.getId(), accuser.getId(), - accusation.getRingNumber(), node.getId()); + log.info("{} accused by: {} on ring: {} (timer started) on: {}", accused.getId(), accuser.getId(), + accusation.getRingNumber(), node.getId()); pendingRebuttals.computeIfAbsent(accused.getId(), d -> roundTimers.schedule(() -> gc(accused), params.rebuttalTimeout())); } @@ -841,41 +827,44 @@ private boolean add(NoteWrapper note) { * @param observation */ private boolean add(SignedViewChange observation) { - final Digest observer = Digest.from(observation.getChange().getObserver()); - if (!viewManagement.isObserver(observer)) { - log.trace("Invalid observer: {} current: {} on: {}", observer, currentView(), node.getId()); + var svu = new SVU(observation, digestAlgo); + if (!viewManagement.isObserver(svu.observer)) { + log.trace("Invalid observer: {} current: {} on: {}", svu.observer, currentView(), node.getId()); return false; } final var inView = Digest.from(observation.getChange().getCurrent()); if (!currentView().equals(inView)) { - log.trace("Invalid view change: {} current: {} from {} on: {}", inView, currentView(), observer, + log.trace("Invalid view change: {} current: {} from {} on: {}", inView, currentView(), svu.observer, node.getId()); return false; } - var currentObservation = observations.get(observer); - if (currentObservation != null) { - if (observation.getChange().getAttempt() < currentObservation.getChange().getAttempt()) { - log.trace("Stale observation: {} current: {} view change: {} current: {} offline: {} on: {}", - observation.getChange().getAttempt(), currentObservation.getChange().getAttempt(), inView, - currentView(), observer, node.getId()); - return false; - } - } - final var member = context.getActiveMember(observer); + final var member = context.getActiveMember(svu.observer); if (member == null) { - log.trace("Cannot validate view change: {} current: {} from: {} on: {}", inView, currentView(), observer, + log.trace("Cannot validate view change: {} current: {} from: {} on: {}", inView, currentView(), + svu.observer, node.getId()); + return false; + } + if (!viewManagement.isObserver(member.id)) { + log.trace("Not an observer of: {} current: {} from: {} on: {}", inView, currentView(), svu.observer, node.getId()); return false; } - return observations.computeIfAbsent(observer.prefix(observation.getChange().getAttempt()), p -> { - final var signature = JohnHancock.from(observation.getSignature()); - if (!member.verify(signature, observation.getChange().toByteString())) { - return null; + final var signature = JohnHancock.from(observation.getSignature()); + if (!member.verify(signature, observation.getChange().toByteString())) { + return false; + } + return observations.compute(svu.observer, (d, cur) -> { + if (cur != null) { + if (svu.attempt < cur.attempt) { + log.trace("Stale observation: {} current: {} view change: {} current: {} offline: {} on: {}", + svu.attempt, cur.attempt, inView, currentView(), svu.observer, node.getId()); + return cur; + } } - log.trace("Observation: {} current: {} view change: {} from: {} on: {}", - observation.getChange().getAttempt(), inView, currentView(), observer, node.getId()); - return observation; - }) != null; + log.trace("Observation: {} current: {} view change: {} from: {} on: {}", svu.attempt, inView, currentView(), + svu.observer, node.getId()); + return svu; + }) == svu; } private boolean addJoin(SignedNote sn) { @@ -1049,7 +1038,7 @@ private BloomFilter getNotesBff(long seed, double p) { private BloomFilter getObservationsBff(long seed, double p) { var n = Math.max(params.minimumBiffCardinality(), observations.size()); BloomFilter bff = new BloomFilter.DigestBloomFilter(seed, n, 1.0 / (double) n); - observations.keySet().stream().collect(Utils.toShuffledList()).forEach(bff::add); + observations.values().stream().map(svu -> svu.hash).collect(Utils.toShuffledList()).forEach(bff::add); return bff; } @@ -1263,13 +1252,12 @@ private ViewChangeGossip.Builder processObservations(BloomFilter bff) { // Add all updates that this view has that aren't reflected in the inbound bff final var current = currentView(); - observations.entrySet() + observations.values() .stream() .collect(Utils.toShuffledList()) .stream() - .filter(e -> Digest.from(e.getValue().getChange().getCurrent()).equals(current)) - .filter(m -> !bff.contains(m.getKey())) - .map(m -> m.getValue()) + .filter(svu -> !bff.contains(svu.hash)) + .map(svu -> svu.viewChange) .forEach(n -> builder.addUpdates(n)); return builder; } @@ -1344,6 +1332,25 @@ private Update response(Gossip gossip) { return updatesForDigests(gossip); } + private void tally(SVU svu, HashMultiset ballots) { + var vc = svu.viewChange; + final var leaving = vc.getChange() + .getLeavesList() + .stream() + .map(Digest::from) + .distinct() + .collect(Collectors.toCollection(ArrayList::new)); + final var joining = vc.getChange() + .getJoinsList() + .stream() + .map(Digest::from) + .distinct() + .collect(Collectors.toCollection(ArrayList::new)); + leaving.sort(Ordering.natural()); + joining.sort(Ordering.natural()); + ballots.add(new Ballot(Digest.from(vc.getChange().getCurrent()), leaving, joining, digestAlgo)); + } + /** * Process the gossip reply. Return the gossip with the updates determined from the inbound digests. * @@ -1380,13 +1387,12 @@ private Update updatesForDigests(Gossip gossip) { biff = gossip.getObservations().getBff(); if (!biff.equals(Biff.getDefaultInstance())) { BloomFilter obsvBff = BloomFilter.from(biff); - observations.entrySet() + observations.values() .stream() .collect(Utils.toShuffledList()) .stream() - .filter(e -> Digest.from(e.getValue().getChange().getCurrent()).equals(current)) - .filter(e -> !obsvBff.contains(e.getKey())) - .forEach(e -> builder.addObservations(e.getValue())); + .filter(svu -> !obsvBff.contains(svu.hash)) + .forEach(svu -> builder.addObservations(svu.viewChange)); } biff = gossip.getJoins().getBff(); @@ -1463,6 +1469,19 @@ private boolean verify(SelfAddressingIdentifier id, SigningThreshold threshold, return verifiers.verifierFor(id).map(value -> value.verify(threshold, signature, message)).orElse(false); } + private record SVU(Digest observer, SignedViewChange viewChange, int attempt, Digest hash) + implements Comparable { + public SVU(SignedViewChange signedViewChange, DigestAlgorithm algo) { + this(Digest.from(signedViewChange.getChange().getObserver()), signedViewChange, + signedViewChange.getChange().getAttempt(), algo.digest(signedViewChange.toByteString())); + } + + @Override + public int compareTo(SVU o) { + return Integer.compare(attempt, o.attempt); + } + } + public record Seed(SelfAddressingIdentifier identifier, String endpoint) { } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java index b4d33a528..2008a0c05 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java @@ -188,6 +188,48 @@ BloomFilter getJoinsBff(long seed, double p) { return bff; } + /** + * Initiate the view change + */ + void initiateViewChange() { + view.stable(() -> { + if (vote.get() != null) { + log.trace("Vote already cast for: {} on: {}", currentView(), node.getId()); + return; + } + // Use pending rebuttals as a proxy for stability + if (view.hasPendingRebuttals()) { + log.debug("Pending rebuttals in view: {} on: {}", currentView(), node.getId()); + view.scheduleViewChange(1); + return; + } + view.scheduleFinalizeViewChange(); + if (!isObserver(node.getId())) { + log.debug("Initiating (non observer) view change: {} joins: {} leaves: {} on: {}", currentView(), + joins.size(), view.streamShunned().count(), node.getId()); + return; + } + log.debug("Initiating (observer) view change vote: {} joins: {} leaves: {} observers: {} on: {}", + currentView(), joins.size(), view.streamShunned().count(), observersList(), node.getId()); + final var builder = ViewChange.newBuilder() + .setObserver(node.getId().toDigeste()) + .setCurrent(currentView().toDigeste()) + .setAttempt(attempt.getAndIncrement()) + .addAllJoins(joins.keySet().stream().map(Digest::toDigeste).toList()); + view.streamShunned().map(Digest::toDigeste).forEach(builder::addLeaves); + ViewChange change = builder.build(); + vote.set(change); + var signature = node.sign(change.toByteString()); + final var viewChange = SignedViewChange.newBuilder() + .setChange(change) + .setSignature(signature.toSig()) + .build(); + view.initiate(viewChange); + log.trace("View change vote: {} joins: {} leaves: {} on: {}", currentView(), change.getJoinsCount(), + change.getLeavesCount(), node.getId()); + }); + } + /** * Install the new view * @@ -427,17 +469,22 @@ void maybeViewChange() { if (!joined()) { return; } - if (context.size() == 1 && joins.size() < context.getRingCount() - 1) { - log.trace("Cannot form cluster: {} with: {} members, required > 3 on: {}", currentView(), - joins.size() + context.size(), node.getId()); + if (bootstrap && context.size() == 1 && joins.size() < context.getRingCount() - 1) { + log.trace("Cannot form cluster: {} with: {} members, required >= {}} on: {}", currentView(), + joins.size() + context.size(), context.getRingCount(), node.getId()); view.scheduleViewChange(); return; + } else if (!bootstrap) { + if (context.size() < context.getRingCount()) { + log.trace("Cannot initiate view change: {} with: {} members, required >= {}} on: {}", currentView(), + joins.size() + context.size(), context.getRingCount(), node.getId()); + view.scheduleViewChange(); + return; + } } if ((context.offlineCount() > 0 || !joins.isEmpty())) { initiateViewChange(); } else { - // log.trace("No view change: {} joins: {} leaves: {} on: {}", currentView(), joins.size(), - // view.streamShunned().count(), node.getId()); view.scheduleViewChange(); } } @@ -556,48 +603,6 @@ void start(CompletableFuture onJoin, boolean bootstrap) { this.bootstrap = bootstrap; } - /** - * Initiate the view change - */ - private void initiateViewChange() { - view.stable(() -> { - if (vote.get() != null) { - log.trace("Vote already cast for: {} on: {}", currentView(), node.getId()); - return; - } - // Use pending rebuttals as a proxy for stability - if (view.hasPendingRebuttals()) { - log.debug("Pending rebuttals in view: {} on: {}", currentView(), node.getId()); - view.scheduleViewChange(1); // 1 TTL round to check again - return; - } - view.scheduleFinalizeViewChange(); - if (!isObserver(node.getId())) { - log.debug("Initiating (non observer) view change: {} joins: {} leaves: {} on: {}", currentView(), - joins.size(), view.streamShunned().count(), node.getId()); - return; - } - log.debug("Initiating (observer) view change vote: {} joins: {} leaves: {} observers: {} on: {}", - currentView(), joins.size(), view.streamShunned().count(), observersList(), node.getId()); - final var builder = ViewChange.newBuilder() - .setObserver(node.getId().toDigeste()) - .setCurrent(currentView().toDigeste()) - .setAttempt(attempt.getAndIncrement()) - .addAllJoins(joins.keySet().stream().map(Digest::toDigeste).toList()); - view.streamShunned().map(Digest::toDigeste).forEach(builder::addLeaves); - ViewChange change = builder.build(); - vote.set(change); - var signature = node.sign(change.toByteString()); - final var viewChange = SignedViewChange.newBuilder() - .setChange(change) - .setSignature(signature.toSig()) - .build(); - view.initiate(viewChange); - log.trace("View change vote: {} joins: {} leaves: {} on: {}", currentView(), change.getJoinsCount(), - change.getLeavesCount(), node.getId()); - }); - } - /** * @return true if the receiver is part of the BFT Observers of this group */ diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java index b689e159d..913623451 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java @@ -152,7 +152,7 @@ public void churn() throws Exception { toStart.forEach(view -> view.start(() -> countdown.get().countDown(), gossipDuration, seeds)); - success = countdown.get().await(30, TimeUnit.SECONDS); + success = countdown.get().await(60, TimeUnit.SECONDS); failed = testViews.stream().filter(e -> { if (e.getContext().activeCount() != testViews.size()) return true;