diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index 68fe30726..e17faf827 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -76,37 +76,36 @@ public class CHOAM { private static final Logger log = LoggerFactory.getLogger(CHOAM.class); - private final Map cachedCheckpoints = new ConcurrentHashMap<>(); - private final AtomicReference checkpoint = new AtomicReference<>(); - private final ReliableBroadcaster combine; - private final CommonCommunications comm; - private final AtomicReference current = new AtomicReference<>(); - private final AtomicReference> futureBootstrap = new AtomicReference<>(); - private final AtomicReference> futureSynchronization = new AtomicReference<>(); - private final AtomicReference genesis = new AtomicReference<>(); - private final AtomicReference head = new AtomicReference<>(); - private final AtomicReference next = new AtomicReference<>(); - private final AtomicReference nextViewId = new AtomicReference<>(); - private final Parameters params; - private final PriorityBlockingQueue pending = new PriorityBlockingQueue<>(); - private final RoundScheduler roundScheduler; - private final Session session; - private final AtomicBoolean started = new AtomicBoolean(); - private final Store store; - private final CommonCommunications submissionComm; - private final Combine.Transitions transitions; - private final TransSubmission txnSubmission = new TransSubmission(); - private final AtomicReference view = new AtomicReference<>(); - private final PendingViews pendingViews = new PendingViews(); - private final ScheduledExecutorService scheduler; - private final ExecutorService linear; - private final AtomicBoolean ongoingJoin = new AtomicBoolean(); + private final Map cachedCheckpoints = new ConcurrentHashMap<>(); + private final AtomicReference checkpoint = new AtomicReference<>(); + private final ReliableBroadcaster combine; + private final CommonCommunications comm; + private final AtomicReference current = new AtomicReference<>(); + private final AtomicReference> futureBootstrap = new AtomicReference<>(); + private final AtomicReference> futureSynchronization = new AtomicReference<>(); + private final AtomicReference genesis = new AtomicReference<>(); + private final AtomicReference head = new AtomicReference<>(); + private final AtomicReference next = new AtomicReference<>(); + private final AtomicReference nextViewId = new AtomicReference<>(); + private final Parameters params; + private final PriorityBlockingQueue pending = new PriorityBlockingQueue<>(); + private final RoundScheduler roundScheduler; + private final Session session; + private final AtomicBoolean started = new AtomicBoolean(); + private final Store store; + private final CommonCommunications submissionComm; + private final Combine.Transitions transitions; + private final TransSubmission txnSubmission = new TransSubmission(); + private final AtomicReference view = new AtomicReference<>(); + private final PendingViews pendingViews = new PendingViews(); + private final ScheduledExecutorService scheduler; + private final AtomicBoolean ongoingJoin = new AtomicBoolean(); + private volatile Thread linear; public CHOAM(Parameters params) { scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); this.store = new Store(params.digestAlgorithm(), params.mvBuilder().clone().build()); this.params = params; - linear = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); pendingViews.add(params.context().getId(), params.context().delegate()); rotateViewKeys(); @@ -353,7 +352,11 @@ public void stop() { if (!started.compareAndSet(true, false)) { return; } - linear.shutdown(); + var l = linear; + linear = null; + if (l != null) { + l.interrupt(); + } try { scheduler.shutdownNow(); } catch (Throwable e) { @@ -434,39 +437,6 @@ private Block checkpoint() { return block; } - private void combine() { - var next = pending.peek(); - log.trace("Attempting to combine blocks, peek: {} height: {}, head: {} height: {} on: {}", - next == null ? "" : next.hash, next == null ? "-1" : next.height(), head.get().hash, - head.get().height(), params.member().getId()); - while (next != null) { - final HashedCertifiedBlock h = head.get(); - if (h.height() != null && next.height().compareTo(h.height()) <= 0) { - pending.poll(); - } else if (isNext(next)) { - if (current.get().validate(next)) { - HashedCertifiedBlock nextBlock = pending.poll(); - if (nextBlock == null) { - return; - } - accept(nextBlock); - } else { - log.debug("Unable to validate block: {} hash: {} height: {} on: {}", next.block.getBodyCase(), - next.hash, next.height(), params.member().getId()); - pending.poll(); - } - } else { - log.trace("Premature block: {} : {} height: {} current: {} on: {}", next.block.getBodyCase(), next.hash, - next.height(), h.height(), params.member().getId()); - return; - } - next = pending.peek(); - } - - log.trace("Finished combined, head: {} height: {} on: {}", head.get().hash, head.get().height(), - params.member().getId()); - } - private void combine(List messages) { messages.forEach(this::combine); transitions.combine(); @@ -542,7 +512,7 @@ public Block produce(ULong height, Digest prev, Executions executions, HashedBlo checkpoint.hash, v.height(), v.hash)) .setExecutions(executions) .build(); - log.trace("Produced block: {} height: {} on: {}", block.getBodyCase(), block.getHeader().getHeight(), + log.trace("Produce block: {} height: {} on: {}", block.getBodyCase(), block.getHeader().getHeight(), params.member().getId()); return block; } @@ -573,6 +543,82 @@ public Block reconfigure(Map joining, Digest nextViewId, HashedBlo }; } + private void consume(HashedCertifiedBlock next) { + log.trace("Attempting to consume: {} hash: {} height: {}, head: {} height: {} on: {}", next.block.getBodyCase(), + next.hash, next.height(), head.get().hash, head.get().height(), params.member().getId()); + final HashedCertifiedBlock h = head.get(); + + if (h.height() != null && next.height().compareTo(h.height()) <= 0) { + // block already past tense + log.debug("Stale: {} hash: {} height: {} on: {}", next.block.getBodyCase(), next.hash, next.height(), + params.member().getId()); + return; + } + + final var nlc = ULong.valueOf(next.block.getHeader().getLastReconfig()); + + var view = this.view.get().height(); + if (h.block == null || nlc.equals(view)) { + // same view + consume(next, h); + return; + } + + if (nlc.compareTo(view) > 0) { + // later view + log.trace("Wait for reconfiguration @ {} block: {} hash: {} height: {} current: {} on: {}", + next.block.getHeader().getLastReconfig(), next.block.getBodyCase(), next.hash, next.height(), + h.height(), params.member().getId()); + pending.add(next); + } else { + // invalid view + log.trace("Invalid view @ {} current: {} block: {} hash: {} height: {} current: {} on: {}", nlc, view, + next.block.getBodyCase(), next.hash, next.height(), h.height(), params.member().getId()); + } + } + + private void consume(HashedCertifiedBlock next, HashedCertifiedBlock cur) { + if (isNext(next)) { + if (current.get().validate(next)) { + log.trace("Accept: {} hash: {} height: {} on: {}", next.block.getBodyCase(), next.hash, next.height(), + params.member().getId()); + accept(next); + } else { + log.debug("Invalid block: {} hash: {} height: {} on: {}", next.block.getBodyCase(), next.hash, + next.height(), params.member().getId()); + } + } else { + log.trace("Premature block: {} : {} height: {} current: {} on: {}", next.block.getBodyCase(), next.hash, + next.height(), cur.height(), params.member().getId()); + pending.add(next); + } + } + + private void consumer() { + while (started.get()) { + HashedCertifiedBlock next = null; + try { + next = pending.poll(500, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + if (!started.get()) { + return; + } + if (next == null) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + continue; + } + consume(next); + } + } + private void execute(List execs) { final var h = head.get(); log.info("Executing transactions for block: {} hash: {} height: {} txns: {} on: {}", h.block.getBodyCase(), @@ -1196,7 +1242,13 @@ public void cancelTimer(String timer) { @Override public void combine() { - linear.execute(Utils.wrapped(() -> CHOAM.this.combine(), log)); + final var current = linear; + if (current == null) { + log.trace("Combining Consumer for: {} on: {}", context().getId(), params.member().getId()); + linear = Thread.ofVirtual() + .name("Linear[%s on: %s]".formatted(context().getId(), params.member().getId())) + .start(Utils.wrapped(CHOAM.this::consumer, log)); + } } @Override diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java index 25e4cb2bb..ffaa6fc35 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java @@ -28,7 +28,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -59,7 +58,6 @@ public class ViewAssembly { private final CompletableFuture onConsensus; private final AtomicInteger countdown = new AtomicInteger(); private final List pendingJoins = new CopyOnWriteArrayList<>(); - private final AtomicBoolean started = new AtomicBoolean(false); private final Map joins = new ConcurrentHashMap<>(); private volatile Vue selected; @@ -100,23 +98,10 @@ public void joined(SignedViewMember viewMember) { } public void start() { - if (!started.compareAndSet(false, true)) { - return; - } transitions.fsm().enterStartState(); } void assemble(List asses) { - if (!started.get()) { - if (!asses.isEmpty()) { - var viewz = asses.stream().flatMap(a -> a.getViewsList().stream()).toList(); - var joinz = asses.stream().flatMap(a -> a.getJoinsList().stream()).toList(); - log.debug("Not started, ignoring assemblies: {} joins: {} views: {} on: {}", asses.size(), joinz.size(), - viewz.size(), params().member().getId()); - } - return; - } - if (asses.isEmpty()) { return; } @@ -204,9 +189,6 @@ boolean complete() { } void join(List joins) { - if (!started.get()) { - return; - } if (selected == null) { pendingJoins.addAll(joins); log.trace("Pending joins: {} on: {}", joins.size(), params().member().getId()); @@ -473,7 +455,6 @@ public void failed() { @Override public void finish() { countdown.set(-1); - started.set(false); } @Override diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java index 5da22769e..c00a41c44 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java @@ -253,12 +253,13 @@ private boolean join(Member member, CompletableFuture gateway, Optional" : member.getId(), node.getId()); dec(complete, remaining); return true; } if (gateway.isDone()) { - log.warn("gateway is complete, ignoring from: {} on: {}", member.getId(), node.getId()); + log.warn("gateway is complete, ignoring from: {} on: {}", member == null ? "" : member.getId(), + node.getId()); complete.complete(true); return false; } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java index 255396f9e..e1ff9f8e5 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java @@ -362,7 +362,6 @@ void finalizeViewChange() { HashMultiset ballots = HashMultiset.create(); observations.values().forEach(svu -> tally(svu, ballots)); viewManagement.clearVote(); - scheduleClearObservations(); var max = ballots.entrySet() .stream() .max(Ordering.natural().onResultOf(Multiset.Entry::getCount)) @@ -372,13 +371,15 @@ void finalizeViewChange() { viewManagement.cardinality(), currentView(), node.getId()); viewManagement.install(max.getElement()); scheduleViewChange(); + scheduleClearObservations(); } else { @SuppressWarnings("unchecked") final var reversed = Comparator.comparing(e -> ((Entry) e).getCount()).reversed(); log.info("View consensus failed: {}, required: {} cardinality: {} ballots: {} for: {} on: {}", max == null ? 0 : max.getCount(), majority, viewManagement.cardinality(), ballots.entrySet().stream().sorted(reversed).toList(), currentView(), node.getId()); - viewManagement.initiateViewChange(); + observations.clear(); + scheduleViewChange(); } }); } @@ -392,7 +393,7 @@ Node getNode() { return node; } - boolean hasMajorityObservervations(boolean bootstrap) { + boolean hasMajorityObservations(boolean bootstrap) { return bootstrap && context.size() == 1 || observations.size() >= context.majority(); } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java index f4c77e354..900015a02 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java @@ -489,7 +489,7 @@ void maybeViewChange() { } } var change = context.offlineCount() > 0 || !joins.isEmpty(); - var shouldChange = isObserver() || view.hasMajorityObservervations(bootstrap); + var shouldChange = isObserver() || view.hasMajorityObservations(bootstrap); if (change && shouldChange) { initiateViewChange(); } else {