Skip to content

Commit

Permalink
better shuffling, etc.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Jun 18, 2024
1 parent e191df8 commit 4a47caa
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 18 deletions.
24 changes: 24 additions & 0 deletions cryptography/src/main/java/com/salesforce/apollo/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,30 @@
import java.security.PublicKey;
import java.security.cert.X509Certificate;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.salesforce.apollo.cryptography.QualifiedBase64.qb64;
import static java.util.stream.Collectors.toList;

/**
* @author hal.hildebrand
**/
public class Utils {

private static final Collector<?, ?, ?> SHUFFLER = Collectors.collectingAndThen(
Collectors.toCollection(ArrayList::new), list -> {
Collections.shuffle(list);
return list;
});

/**
* Copy the contents of the input stream to the output stream. It is the caller's responsibility to close the
* streams.
Expand Down Expand Up @@ -156,6 +168,11 @@ public static BcX500NameDnImpl encode(Digest digest, String host, int port, Publ
String.format("CN=%s, L=%s, UID=%s, DC=%s", host, port, qb64(digest), qb64(signingKey)));
}

@SuppressWarnings("unchecked")
public static <T> Collector<T, ?, List<T>> toShuffledList() {
return (Collector<T, ?, List<T>>) SHUFFLER;
}

/**
* Find a free port for any local address
*
Expand All @@ -165,6 +182,13 @@ public static int allocatePort() {
return allocatePort(null);
}

public static <T> Collector<T, ?, Stream<T>> toEagerShuffledStream() {
return Collectors.collectingAndThen(toList(), list -> {
Collections.shuffle(list);
return list.stream();
});
}

/**
* Find a free port on the interface with the given address
*
Expand Down
41 changes: 24 additions & 17 deletions fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -421,18 +421,17 @@ void notifyListeners(List<SelfAddressingIdentifier> joining, List<Digest> leavin
final var viewChange = new ViewChange(context.asStatic(), currentView(),
joining.stream().map(SelfAddressingIdentifier::getDigest).toList(),
Collections.unmodifiableList(leaving));
viewNotificationQueue.execute(Utils.wrapped(() -> {
viewChangeListeners.entrySet().forEach(entry -> {
viewChangeListeners.forEach((key, value) -> {
viewNotificationQueue.execute(Utils.wrapped(() -> {
try {
log.trace("Notifying: {} view change: {} cardinality: {} joins: {} leaves: {} on: {} ",
entry.getKey(), currentView(), context.size(), joining.size(), leaving.size(),
node.getId());
entry.getValue().accept(viewChange);
log.trace("Notifying: {} view change: {} cardinality: {} joins: {} leaves: {} on: {} ", key,
currentView(), context.size(), joining.size(), leaving.size(), node.getId());
value.accept(viewChange);
} catch (Throwable e) {
log.error("error in view change listener: {} on: {} ", entry.getKey(), node.getId(), e);
log.error("error in view change listener: {} on: {} ", key, node.getId(), e);
}
});
}, log));
}, log));
});
}

/**
Expand Down Expand Up @@ -1011,6 +1010,7 @@ private BloomFilter<Digest> getAccusationsBff(long seed, double p) {
context.allMembers()
.flatMap(Participant::getAccusations)
.filter(Objects::nonNull)
.collect(Utils.toShuffledList())
.forEach(m -> bff.add(m.getHash()));
return bff;
}
Expand All @@ -1035,7 +1035,7 @@ private BloomFilter<Digest> getNotesBff(long seed, double p) {
private BloomFilter<Digest> getObservationsBff(long seed, double p) {
BloomFilter<Digest> bff = new BloomFilter.DigestBloomFilter(seed, Math.max(params.minimumBiffCardinality(),
context.cardinality() * 2), p);
observations.keySet().forEach(d -> bff.add(d));
observations.keySet().stream().collect(Utils.toShuffledList()).forEach(bff::add);
return bff;
}

Expand Down Expand Up @@ -1175,7 +1175,9 @@ private AccusationGossip.Builder processAccusations(BloomFilter<Digest> bff) {
// bff
var current = currentView();
context.allMembers()
.flatMap(m -> m.getAccusations())
.flatMap(Participant::getAccusations)
.collect(Utils.toShuffledList())
.stream()
.filter(m -> current.equals(m.currentView()))
.filter(a -> !bff.contains(a.getHash()))
.forEach(a -> builder.addUpdates(a.getWrapped()));
Expand Down Expand Up @@ -1212,9 +1214,9 @@ private NoteGossip.Builder processNotes(BloomFilter<Digest> bff) {
.filter(m -> current.equals(m.getNote().currentView()))
.filter(m -> !shunned.contains(m.getId()))
.filter(m -> !bff.contains(m.getNote().getHash()))
.map(m -> m.getNote())
// .limit(params.maximumTxfr()) // Always in sorted order with this method
.collect(new ReservoirSampler<>(params.maximumTxfr(), Entropy.bitsStream()))
.stream()
.map(m -> m.getNote())
.forEach(n -> builder.addUpdates(n.getWrapped()));
return builder;
}
Expand Down Expand Up @@ -1243,6 +1245,8 @@ private ViewChangeGossip.Builder processObservations(BloomFilter<Digest> bff) {
// Add all updates that this view has that aren't reflected in the inbound bff
final var current = currentView();
observations.entrySet()
.stream()
.collect(Utils.toShuffledList())
.stream()
.filter(e -> Digest.from(e.getValue().getChange().getCurrent()).equals(current))
.filter(m -> !bff.contains(m.getKey()))
Expand Down Expand Up @@ -1339,16 +1343,17 @@ private Update updatesForDigests(Gossip gossip) {
.filter(m -> m.getNote() != null)
.filter(m -> current.equals(m.getNote().currentView()))
.filter(m -> !notesBff.contains(m.getNote().getHash()))
.map(m -> m.getNote().getWrapped())
.collect(new ReservoirSampler<>(params.maximumTxfr(), Entropy.bitsStream()))
.forEach(n -> builder.addNotes(n));
.stream()
.map(m -> m.getNote().getWrapped())
.forEach(builder::addNotes);
}

biff = gossip.getAccusations().getBff();
if (!biff.equals(Biff.getDefaultInstance())) {
BloomFilter<Digest> accBff = BloomFilter.from(biff);
context.allMembers()
.flatMap(m -> m.getAccusations())
.flatMap(Participant::getAccusations)
.filter(a -> a.currentView().equals(current))
.filter(a -> !accBff.contains(a.getHash()))
.forEach(a -> builder.addAccusations(a.getWrapped()));
Expand All @@ -1358,6 +1363,8 @@ private Update updatesForDigests(Gossip gossip) {
if (!biff.equals(Biff.getDefaultInstance())) {
BloomFilter<Digest> obsvBff = BloomFilter.from(biff);
observations.entrySet()
.stream()
.collect(Utils.toShuffledList())
.stream()
.filter(e -> Digest.from(e.getValue().getChange().getCurrent()).equals(current))
.filter(e -> !obsvBff.contains(e.getKey()))
Expand Down Expand Up @@ -1548,7 +1555,7 @@ assert isValidMask(mask, context) : "Invalid mask: " + mask + " majority: " + co
}
}

// Fill the rest of the mask with randomly set index
// Fill the rest of the mask with randomly-set index

while (mask.cardinality() != ((context.getBias() - 1) * context.toleranceLevel()) + 1) {
int index = Entropy.nextBitsStreamInt(context.getRingCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ public void clear() {

public BloomFilter<Digest> forReconcilliation() {
var biff = new DigestBloomFilter(Entropy.nextBitsStreamLong(), params.bufferSize, params.falsePositiveRate);
state.keySet().forEach(k -> biff.add(k));
state.keySet().stream().collect(Utils.toShuffledList()).forEach(k -> biff.add(k));
return biff;
}

Expand All @@ -491,6 +491,8 @@ public void receive(List<AgedMessage> messages) {
public Iterable<? extends AgedMessage> reconcile(BloomFilter<Digest> biff, Digest from) {
PriorityQueue<AgedMessage.Builder> mailBox = new PriorityQueue<>(Comparator.comparingInt(s -> s.getAge()));
state.values()
.stream()
.collect(Utils.toShuffledList())
.stream()
.filter(s -> !biff.contains(s.hash))
.filter(s -> s.msg.getAge() < maxAge)
Expand Down

0 comments on commit 4a47caa

Please sign in to comment.