Skip to content

Commit

Permalink
moar observation lifecycle fixie. don't "start/end" ViewAssembly
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Jul 2, 2024
1 parent 7d22597 commit bd8393a
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 87 deletions.
176 changes: 114 additions & 62 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,37 +76,36 @@
public class CHOAM {
private static final Logger log = LoggerFactory.getLogger(CHOAM.class);

private final Map<ULong, CheckpointState> cachedCheckpoints = new ConcurrentHashMap<>();
private final AtomicReference<HashedCertifiedBlock> checkpoint = new AtomicReference<>();
private final ReliableBroadcaster combine;
private final CommonCommunications<Terminal, Concierge> comm;
private final AtomicReference<Committee> current = new AtomicReference<>();
private final AtomicReference<CompletableFuture<SynchronizedState>> futureBootstrap = new AtomicReference<>();
private final AtomicReference<ScheduledFuture<?>> futureSynchronization = new AtomicReference<>();
private final AtomicReference<HashedCertifiedBlock> genesis = new AtomicReference<>();
private final AtomicReference<HashedCertifiedBlock> head = new AtomicReference<>();
private final AtomicReference<nextView> next = new AtomicReference<>();
private final AtomicReference<Digest> nextViewId = new AtomicReference<>();
private final Parameters params;
private final PriorityBlockingQueue<HashedCertifiedBlock> pending = new PriorityBlockingQueue<>();
private final RoundScheduler roundScheduler;
private final Session session;
private final AtomicBoolean started = new AtomicBoolean();
private final Store store;
private final CommonCommunications<TxnSubmission, Submitter> submissionComm;
private final Combine.Transitions transitions;
private final TransSubmission txnSubmission = new TransSubmission();
private final AtomicReference<HashedCertifiedBlock> view = new AtomicReference<>();
private final PendingViews pendingViews = new PendingViews();
private final ScheduledExecutorService scheduler;
private final ExecutorService linear;
private final AtomicBoolean ongoingJoin = new AtomicBoolean();
private final Map<ULong, CheckpointState> cachedCheckpoints = new ConcurrentHashMap<>();
private final AtomicReference<HashedCertifiedBlock> checkpoint = new AtomicReference<>();
private final ReliableBroadcaster combine;
private final CommonCommunications<Terminal, Concierge> comm;
private final AtomicReference<Committee> current = new AtomicReference<>();
private final AtomicReference<CompletableFuture<SynchronizedState>> futureBootstrap = new AtomicReference<>();
private final AtomicReference<ScheduledFuture<?>> futureSynchronization = new AtomicReference<>();
private final AtomicReference<HashedCertifiedBlock> genesis = new AtomicReference<>();
private final AtomicReference<HashedCertifiedBlock> head = new AtomicReference<>();
private final AtomicReference<nextView> next = new AtomicReference<>();
private final AtomicReference<Digest> nextViewId = new AtomicReference<>();
private final Parameters params;
private final PriorityBlockingQueue<HashedCertifiedBlock> pending = new PriorityBlockingQueue<>();
private final RoundScheduler roundScheduler;
private final Session session;
private final AtomicBoolean started = new AtomicBoolean();
private final Store store;
private final CommonCommunications<TxnSubmission, Submitter> submissionComm;
private final Combine.Transitions transitions;
private final TransSubmission txnSubmission = new TransSubmission();
private final AtomicReference<HashedCertifiedBlock> view = new AtomicReference<>();
private final PendingViews pendingViews = new PendingViews();
private final ScheduledExecutorService scheduler;
private final AtomicBoolean ongoingJoin = new AtomicBoolean();
private volatile Thread linear;

public CHOAM(Parameters params) {
scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
this.store = new Store(params.digestAlgorithm(), params.mvBuilder().clone().build());
this.params = params;
linear = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
pendingViews.add(params.context().getId(), params.context().delegate());

rotateViewKeys();
Expand Down Expand Up @@ -353,7 +352,11 @@ public void stop() {
if (!started.compareAndSet(true, false)) {
return;
}
linear.shutdown();
var l = linear;
linear = null;
if (l != null) {
l.interrupt();
}
try {
scheduler.shutdownNow();
} catch (Throwable e) {
Expand Down Expand Up @@ -434,39 +437,6 @@ private Block checkpoint() {
return block;
}

private void combine() {
var next = pending.peek();
log.trace("Attempting to combine blocks, peek: {} height: {}, head: {} height: {} on: {}",
next == null ? "<null>" : next.hash, next == null ? "-1" : next.height(), head.get().hash,
head.get().height(), params.member().getId());
while (next != null) {
final HashedCertifiedBlock h = head.get();
if (h.height() != null && next.height().compareTo(h.height()) <= 0) {
pending.poll();
} else if (isNext(next)) {
if (current.get().validate(next)) {
HashedCertifiedBlock nextBlock = pending.poll();
if (nextBlock == null) {
return;
}
accept(nextBlock);
} else {
log.debug("Unable to validate block: {} hash: {} height: {} on: {}", next.block.getBodyCase(),
next.hash, next.height(), params.member().getId());
pending.poll();
}
} else {
log.trace("Premature block: {} : {} height: {} current: {} on: {}", next.block.getBodyCase(), next.hash,
next.height(), h.height(), params.member().getId());
return;
}
next = pending.peek();
}

log.trace("Finished combined, head: {} height: {} on: {}", head.get().hash, head.get().height(),
params.member().getId());
}

private void combine(List<Msg> messages) {
messages.forEach(this::combine);
transitions.combine();
Expand Down Expand Up @@ -542,7 +512,7 @@ public Block produce(ULong height, Digest prev, Executions executions, HashedBlo
checkpoint.hash, v.height(), v.hash))
.setExecutions(executions)
.build();
log.trace("Produced block: {} height: {} on: {}", block.getBodyCase(), block.getHeader().getHeight(),
log.trace("Produce block: {} height: {} on: {}", block.getBodyCase(), block.getHeader().getHeight(),
params.member().getId());
return block;
}
Expand Down Expand Up @@ -573,6 +543,82 @@ public Block reconfigure(Map<Digest, Join> joining, Digest nextViewId, HashedBlo
};
}

private void consume(HashedCertifiedBlock next) {
log.trace("Attempting to consume: {} hash: {} height: {}, head: {} height: {} on: {}", next.block.getBodyCase(),
next.hash, next.height(), head.get().hash, head.get().height(), params.member().getId());
final HashedCertifiedBlock h = head.get();

if (h.height() != null && next.height().compareTo(h.height()) <= 0) {
// block already past tense
log.debug("Stale: {} hash: {} height: {} on: {}", next.block.getBodyCase(), next.hash, next.height(),
params.member().getId());
return;
}

final var nlc = ULong.valueOf(next.block.getHeader().getLastReconfig());

var view = this.view.get().height();
if (h.block == null || nlc.equals(view)) {
// same view
consume(next, h);
return;
}

if (nlc.compareTo(view) > 0) {
// later view
log.trace("Wait for reconfiguration @ {} block: {} hash: {} height: {} current: {} on: {}",
next.block.getHeader().getLastReconfig(), next.block.getBodyCase(), next.hash, next.height(),
h.height(), params.member().getId());
pending.add(next);
} else {
// invalid view
log.trace("Invalid view @ {} current: {} block: {} hash: {} height: {} current: {} on: {}", nlc, view,
next.block.getBodyCase(), next.hash, next.height(), h.height(), params.member().getId());
}
}

private void consume(HashedCertifiedBlock next, HashedCertifiedBlock cur) {
if (isNext(next)) {
if (current.get().validate(next)) {
log.trace("Accept: {} hash: {} height: {} on: {}", next.block.getBodyCase(), next.hash, next.height(),
params.member().getId());
accept(next);
} else {
log.debug("Invalid block: {} hash: {} height: {} on: {}", next.block.getBodyCase(), next.hash,
next.height(), params.member().getId());
}
} else {
log.trace("Premature block: {} : {} height: {} current: {} on: {}", next.block.getBodyCase(), next.hash,
next.height(), cur.height(), params.member().getId());
pending.add(next);
}
}

private void consumer() {
while (started.get()) {
HashedCertifiedBlock next = null;
try {
next = pending.poll(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
if (!started.get()) {
return;
}
if (next == null) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
continue;
}
consume(next);
}
}

private void execute(List<Transaction> execs) {
final var h = head.get();
log.info("Executing transactions for block: {} hash: {} height: {} txns: {} on: {}", h.block.getBodyCase(),
Expand Down Expand Up @@ -1196,7 +1242,13 @@ public void cancelTimer(String timer) {

@Override
public void combine() {
linear.execute(Utils.wrapped(() -> CHOAM.this.combine(), log));
final var current = linear;
if (current == null) {
log.trace("Combining Consumer for: {} on: {}", context().getId(), params.member().getId());
linear = Thread.ofVirtual()
.name("Linear[%s on: %s]".formatted(context().getId(), params.member().getId()))
.start(Utils.wrapped(CHOAM.this::consumer, log));
}
}

@Override
Expand Down
19 changes: 0 additions & 19 deletions choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -59,7 +58,6 @@ public class ViewAssembly {
private final CompletableFuture<Vue> onConsensus;
private final AtomicInteger countdown = new AtomicInteger();
private final List<SignedViewMember> pendingJoins = new CopyOnWriteArrayList<>();
private final AtomicBoolean started = new AtomicBoolean(false);
private final Map<Digest, SignedJoin> joins = new ConcurrentHashMap<>();
private volatile Vue selected;

Expand Down Expand Up @@ -100,23 +98,10 @@ public void joined(SignedViewMember viewMember) {
}

public void start() {
if (!started.compareAndSet(false, true)) {
return;
}
transitions.fsm().enterStartState();
}

void assemble(List<Assemblies> asses) {
if (!started.get()) {
if (!asses.isEmpty()) {
var viewz = asses.stream().flatMap(a -> a.getViewsList().stream()).toList();
var joinz = asses.stream().flatMap(a -> a.getJoinsList().stream()).toList();
log.debug("Not started, ignoring assemblies: {} joins: {} views: {} on: {}", asses.size(), joinz.size(),
viewz.size(), params().member().getId());
}
return;
}

if (asses.isEmpty()) {
return;
}
Expand Down Expand Up @@ -204,9 +189,6 @@ boolean complete() {
}

void join(List<SignedViewMember> joins) {
if (!started.get()) {
return;
}
if (selected == null) {
pendingJoins.addAll(joins);
log.trace("Pending joins: {} on: {}", joins.size(), params().member().getId());
Expand Down Expand Up @@ -473,7 +455,6 @@ public void failed() {
@Override
public void finish() {
countdown.set(-1);
started.set(false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,13 @@ private boolean join(Member member, CompletableFuture<Bound> gateway, Optional<L
return false;
}
if (fs.isEmpty()) {
log.warn("No gateway returned from: {} on: {}", member.getId(), node.getId());
log.warn("No gateway returned from: {} on: {}", member == null ? "<null>" : member.getId(), node.getId());
dec(complete, remaining);
return true;
}
if (gateway.isDone()) {
log.warn("gateway is complete, ignoring from: {} on: {}", member.getId(), node.getId());
log.warn("gateway is complete, ignoring from: {} on: {}", member == null ? "<null>" : member.getId(),
node.getId());
complete.complete(true);
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,6 @@ void finalizeViewChange() {
HashMultiset<Ballot> ballots = HashMultiset.create();
observations.values().forEach(svu -> tally(svu, ballots));
viewManagement.clearVote();
scheduleClearObservations();
var max = ballots.entrySet()
.stream()
.max(Ordering.natural().onResultOf(Multiset.Entry::getCount))
Expand All @@ -372,13 +371,15 @@ void finalizeViewChange() {
viewManagement.cardinality(), currentView(), node.getId());
viewManagement.install(max.getElement());
scheduleViewChange();
scheduleClearObservations();
} else {
@SuppressWarnings("unchecked")
final var reversed = Comparator.comparing(e -> ((Entry<Ballot>) e).getCount()).reversed();
log.info("View consensus failed: {}, required: {} cardinality: {} ballots: {} for: {} on: {}",
max == null ? 0 : max.getCount(), majority, viewManagement.cardinality(),
ballots.entrySet().stream().sorted(reversed).toList(), currentView(), node.getId());
viewManagement.initiateViewChange();
observations.clear();
scheduleViewChange();
}
});
}
Expand All @@ -392,7 +393,7 @@ Node getNode() {
return node;
}

boolean hasMajorityObservervations(boolean bootstrap) {
boolean hasMajorityObservations(boolean bootstrap) {
return bootstrap && context.size() == 1 || observations.size() >= context.majority();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ void maybeViewChange() {
}
}
var change = context.offlineCount() > 0 || !joins.isEmpty();
var shouldChange = isObserver() || view.hasMajorityObservervations(bootstrap);
var shouldChange = isObserver() || view.hasMajorityObservations(bootstrap);
if (change && shouldChange) {
initiateViewChange();
} else {
Expand Down

0 comments on commit bd8393a

Please sign in to comment.