diff --git a/cryptography/src/main/java/com/salesforce/apollo/utils/Utils.java b/cryptography/src/main/java/com/salesforce/apollo/utils/Utils.java index 534bf2dca..afdc7329a 100644 --- a/cryptography/src/main/java/com/salesforce/apollo/utils/Utils.java +++ b/cryptography/src/main/java/com/salesforce/apollo/utils/Utils.java @@ -18,18 +18,30 @@ import java.security.PublicKey; import java.security.cert.X509Certificate; import java.time.Instant; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.concurrent.Callable; import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.stream.Collector; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static com.salesforce.apollo.cryptography.QualifiedBase64.qb64; +import static java.util.stream.Collectors.toList; /** * @author hal.hildebrand **/ public class Utils { + private static final Collector SHUFFLER = Collectors.collectingAndThen( + Collectors.toCollection(ArrayList::new), list -> { + Collections.shuffle(list); + return list; + }); + /** * Copy the contents of the input stream to the output stream. It is the caller's responsibility to close the * streams. @@ -156,6 +168,11 @@ public static BcX500NameDnImpl encode(Digest digest, String host, int port, Publ String.format("CN=%s, L=%s, UID=%s, DC=%s", host, port, qb64(digest), qb64(signingKey))); } + @SuppressWarnings("unchecked") + public static Collector> toShuffledList() { + return (Collector>) SHUFFLER; + } + /** * Find a free port for any local address * @@ -165,6 +182,13 @@ public static int allocatePort() { return allocatePort(null); } + public static Collector> toEagerShuffledStream() { + return Collectors.collectingAndThen(toList(), list -> { + Collections.shuffle(list); + return list.stream(); + }); + } + /** * Find a free port on the interface with the given address * diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java index bc4f92a3b..eef938c7a 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java @@ -421,18 +421,17 @@ void notifyListeners(List joining, List leavin final var viewChange = new ViewChange(context.asStatic(), currentView(), joining.stream().map(SelfAddressingIdentifier::getDigest).toList(), Collections.unmodifiableList(leaving)); - viewNotificationQueue.execute(Utils.wrapped(() -> { - viewChangeListeners.entrySet().forEach(entry -> { + viewChangeListeners.forEach((key, value) -> { + viewNotificationQueue.execute(Utils.wrapped(() -> { try { - log.trace("Notifying: {} view change: {} cardinality: {} joins: {} leaves: {} on: {} ", - entry.getKey(), currentView(), context.size(), joining.size(), leaving.size(), - node.getId()); - entry.getValue().accept(viewChange); + log.trace("Notifying: {} view change: {} cardinality: {} joins: {} leaves: {} on: {} ", key, + currentView(), context.size(), joining.size(), leaving.size(), node.getId()); + value.accept(viewChange); } catch (Throwable e) { - log.error("error in view change listener: {} on: {} ", entry.getKey(), node.getId(), e); + log.error("error in view change listener: {} on: {} ", key, node.getId(), e); } - }); - }, log)); + }, log)); + }); } /** @@ -1011,6 +1010,7 @@ private BloomFilter getAccusationsBff(long seed, double p) { context.allMembers() .flatMap(Participant::getAccusations) .filter(Objects::nonNull) + .collect(Utils.toShuffledList()) .forEach(m -> bff.add(m.getHash())); return bff; } @@ -1035,7 +1035,7 @@ private BloomFilter getNotesBff(long seed, double p) { private BloomFilter getObservationsBff(long seed, double p) { BloomFilter bff = new BloomFilter.DigestBloomFilter(seed, Math.max(params.minimumBiffCardinality(), context.cardinality() * 2), p); - observations.keySet().forEach(d -> bff.add(d)); + observations.keySet().stream().collect(Utils.toShuffledList()).forEach(bff::add); return bff; } @@ -1175,7 +1175,9 @@ private AccusationGossip.Builder processAccusations(BloomFilter bff) { // bff var current = currentView(); context.allMembers() - .flatMap(m -> m.getAccusations()) + .flatMap(Participant::getAccusations) + .collect(Utils.toShuffledList()) + .stream() .filter(m -> current.equals(m.currentView())) .filter(a -> !bff.contains(a.getHash())) .forEach(a -> builder.addUpdates(a.getWrapped())); @@ -1212,9 +1214,9 @@ private NoteGossip.Builder processNotes(BloomFilter bff) { .filter(m -> current.equals(m.getNote().currentView())) .filter(m -> !shunned.contains(m.getId())) .filter(m -> !bff.contains(m.getNote().getHash())) - .map(m -> m.getNote()) - // .limit(params.maximumTxfr()) // Always in sorted order with this method .collect(new ReservoirSampler<>(params.maximumTxfr(), Entropy.bitsStream())) + .stream() + .map(m -> m.getNote()) .forEach(n -> builder.addUpdates(n.getWrapped())); return builder; } @@ -1243,6 +1245,8 @@ private ViewChangeGossip.Builder processObservations(BloomFilter bff) { // Add all updates that this view has that aren't reflected in the inbound bff final var current = currentView(); observations.entrySet() + .stream() + .collect(Utils.toShuffledList()) .stream() .filter(e -> Digest.from(e.getValue().getChange().getCurrent()).equals(current)) .filter(m -> !bff.contains(m.getKey())) @@ -1339,16 +1343,17 @@ private Update updatesForDigests(Gossip gossip) { .filter(m -> m.getNote() != null) .filter(m -> current.equals(m.getNote().currentView())) .filter(m -> !notesBff.contains(m.getNote().getHash())) - .map(m -> m.getNote().getWrapped()) .collect(new ReservoirSampler<>(params.maximumTxfr(), Entropy.bitsStream())) - .forEach(n -> builder.addNotes(n)); + .stream() + .map(m -> m.getNote().getWrapped()) + .forEach(builder::addNotes); } biff = gossip.getAccusations().getBff(); if (!biff.equals(Biff.getDefaultInstance())) { BloomFilter accBff = BloomFilter.from(biff); context.allMembers() - .flatMap(m -> m.getAccusations()) + .flatMap(Participant::getAccusations) .filter(a -> a.currentView().equals(current)) .filter(a -> !accBff.contains(a.getHash())) .forEach(a -> builder.addAccusations(a.getWrapped())); @@ -1358,6 +1363,8 @@ private Update updatesForDigests(Gossip gossip) { if (!biff.equals(Biff.getDefaultInstance())) { BloomFilter obsvBff = BloomFilter.from(biff); observations.entrySet() + .stream() + .collect(Utils.toShuffledList()) .stream() .filter(e -> Digest.from(e.getValue().getChange().getCurrent()).equals(current)) .filter(e -> !obsvBff.contains(e.getKey())) @@ -1548,7 +1555,7 @@ assert isValidMask(mask, context) : "Invalid mask: " + mask + " majority: " + co } } - // Fill the rest of the mask with randomly set index + // Fill the rest of the mask with randomly-set index while (mask.cardinality() != ((context.getBias() - 1) * context.toleranceLevel()) + 1) { int index = Entropy.nextBitsStreamInt(context.getRingCount()); diff --git a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java index ca923b3f6..97698686f 100644 --- a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java +++ b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java @@ -466,7 +466,7 @@ public void clear() { public BloomFilter forReconcilliation() { var biff = new DigestBloomFilter(Entropy.nextBitsStreamLong(), params.bufferSize, params.falsePositiveRate); - state.keySet().forEach(k -> biff.add(k)); + state.keySet().stream().collect(Utils.toShuffledList()).forEach(k -> biff.add(k)); return biff; } @@ -491,6 +491,8 @@ public void receive(List messages) { public Iterable reconcile(BloomFilter biff, Digest from) { PriorityQueue mailBox = new PriorityQueue<>(Comparator.comparingInt(s -> s.getAge())); state.values() + .stream() + .collect(Utils.toShuffledList()) .stream() .filter(s -> !biff.contains(s.hash)) .filter(s -> s.msg.getAge() < maxAge)