Skip to content

Commit

Permalink
better observation attempt lifecycle. use executor for linearizing
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Jun 30, 2024
1 parent dfa1e01 commit 7d22597
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 119 deletions.
71 changes: 25 additions & 46 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,14 @@ public class CHOAM {
private final AtomicReference<HashedCertifiedBlock> view = new AtomicReference<>();
private final PendingViews pendingViews = new PendingViews();
private final ScheduledExecutorService scheduler;
private final Semaphore linear = new Semaphore(1);
private final ExecutorService linear;
private final AtomicBoolean ongoingJoin = new AtomicBoolean();

public CHOAM(Parameters params) {
scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
this.store = new Store(params.digestAlgorithm(), params.mvBuilder().clone().build());
this.params = params;
linear = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
pendingViews.add(params.context().getId(), params.context().delegate());

rotateViewKeys();
Expand All @@ -116,18 +117,16 @@ public CHOAM(Parameters params) {
combine = new ReliableBroadcaster(bContext, params.member(), params.combine(), params.communications(),
params.metrics() == null ? null : params.metrics().getCombineMetrics(),
adapter);
combine.registerHandler((_, messages) -> {
Thread.ofVirtual().start(() -> {
if (!started.get()) {
return;
}
try {
combine(messages);
} catch (Throwable t) {
log.error("Failed to combine messages on: {}", params.member().getId(), t);
}
});
});
combine.registerHandler((_, messages) -> Thread.ofVirtual().start(() -> {
if (!started.get()) {
return;
}
try {
combine(messages);
} catch (Throwable t) {
log.error("Failed to combine messages on: {}", params.member().getId(), t);
}
}));
head.set(new NullBlock(params.digestAlgorithm()));
view.set(new NullBlock(params.digestAlgorithm()));
checkpoint.set(new NullBlock(params.digestAlgorithm()));
Expand Down Expand Up @@ -354,22 +353,25 @@ public void stop() {
if (!started.compareAndSet(true, false)) {
return;
}
linear.release(10000);
linear.shutdown();
try {
scheduler.shutdownNow();
} catch (Throwable e) {
// ignore
}
session.cancelAll();
final var c = current.get();
if (c != null) {
try {
c.complete();
} catch (Throwable e) {
// ignore
}
}
try {
combine.stop();
} catch (Throwable e) {
// ignore
}
}

Expand Down Expand Up @@ -577,12 +579,11 @@ private void execute(List<Transaction> execs) {
h.hash, h.height(), execs.size(), params.member().getId());
for (int i = 0; i < execs.size(); i++) {
var exec = execs.get(i);
final var index = i;
Digest hash = hashOf(exec, params.digestAlgorithm());
var stxn = session.complete(hash);
try {
params.processor()
.execute(index, CHOAM.hashOf(exec, params.digestAlgorithm()), exec,
.execute(i, CHOAM.hashOf(exec, params.digestAlgorithm()), exec,
stxn == null ? null : stxn.onCompletion());
} catch (Throwable t) {
log.error("Exception processing transaction: {} block: {} height: {} on: {}", hash, h.hash, h.height(),
Expand All @@ -591,7 +592,7 @@ private void execute(List<Transaction> execs) {
}
}

private CheckpointSegments fetch(CheckpointReplication request, Digest from) {
private CheckpointSegments fetch(CheckpointReplication request) {
CheckpointState state = cachedCheckpoints.get(ULong.valueOf(request.getCheckpoint()));
if (state == null) {
log.info("No cached checkpoint for {} on: {}", request.getCheckpoint(), params.member().getId());
Expand All @@ -604,14 +605,14 @@ private CheckpointSegments fetch(CheckpointReplication request, Digest from) {
.build();
}

private Blocks fetchBlocks(BlockReplication rep, Digest from) {
private Blocks fetchBlocks(BlockReplication rep) {
BloomFilter<ULong> bff = BloomFilter.from(rep.getBlocksBff());
Blocks.Builder blocks = Blocks.newBuilder();
store.fetchBlocks(bff, blocks, 100, ULong.valueOf(rep.getFrom()), ULong.valueOf(rep.getTo()));
return blocks.build();
}

private Blocks fetchViewChain(BlockReplication rep, Digest from) {
private Blocks fetchViewChain(BlockReplication rep) {
BloomFilter<ULong> bff = BloomFilter.from(rep.getBlocksBff());
Blocks.Builder blocks = Blocks.newBuilder();
store.fetchViewChain(bff, blocks, 100, ULong.valueOf(rep.getFrom()), ULong.valueOf(rep.getTo()));
Expand Down Expand Up @@ -649,7 +650,7 @@ private boolean isNext(HashedBlock next) {
return isNext;
}

private Empty join(SignedViewMember nextView, Digest from) {
private void join(SignedViewMember nextView, Digest from) {
var c = current.get();
if (c == null) {
log.trace("No committee for: {} to join: {} diadem: {} on: {}", from,
Expand All @@ -658,7 +659,6 @@ private Empty join(SignedViewMember nextView, Digest from) {
throw new StatusRuntimeException(FAILED_PRECONDITION);
}
c.join(nextView, from);
return Empty.getDefaultInstance();
}

private Supplier<PendingViews> pendingViews() {
Expand Down Expand Up @@ -1196,25 +1196,7 @@ public void cancelTimer(String timer) {

@Override
public void combine() {
Thread.ofVirtual().start(Utils.wrapped(() -> {
if (!started.get()) {
return;
}
try {
linear.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
try {
CHOAM.this.combine();
} finally {
linear.release();
}
}, log));
if (linear.getQueueLength() > 0) {
log.info("Linear Q: {} on: {}", linear.getQueueLength(), params.member().getId());
}
linear.execute(Utils.wrapped(() -> CHOAM.this.combine(), log));
}

@Override
Expand Down Expand Up @@ -1270,17 +1252,17 @@ public class Trampoline implements Concierge {

@Override
public CheckpointSegments fetch(CheckpointReplication request, Digest from) {
return CHOAM.this.fetch(request, from);
return CHOAM.this.fetch(request);
}

@Override
public Blocks fetchBlocks(BlockReplication request, Digest from) {
return CHOAM.this.fetchBlocks(request, from);
return CHOAM.this.fetchBlocks(request);
}

@Override
public Blocks fetchViewChain(BlockReplication request, Digest from) {
return CHOAM.this.fetchViewChain(request, from);
return CHOAM.this.fetchViewChain(request);
}

@Override
Expand Down Expand Up @@ -1506,9 +1488,6 @@ private Attempt join(View view, Terminal t, SignedViewMember svm) {

record Attempt(Member m, ListenableFuture<Empty> fs) {
}

private record JoinState(AtomicBoolean halt, Thread joining) {
}
}

/** a member of the current committee */
Expand Down
58 changes: 16 additions & 42 deletions choam/src/main/java/com/salesforce/apollo/choam/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class Producer {
private final Transitions transitions;
private final ViewContext view;
private final Digest nextViewId;
private final Semaphore serialize = new Semaphore(1);
private final ExecutorService serialize;
private final ViewAssembly assembly;
private final int maxEpoch;
private final AtomicBoolean assembled = new AtomicBoolean(false);
Expand Down Expand Up @@ -94,7 +94,7 @@ public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, Hash
log.trace("Pid: {} for: {} on: {}", pid, getViewId(), params().member().getId());
config.setPid(pid).setnProc((short) view.roster().size());
}

serialize = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
config.setLabel("Producer" + getViewId() + " on: " + params().member().getId());
var producerMetrics = params().metrics() == null ? null : params().metrics().getProducerMetrics();
controller = new Ethereal(config.build(), params().producer().maxBatchByteSize() + (8 * 1024), ds, this::serial,
Expand Down Expand Up @@ -155,7 +155,7 @@ public void stop() {
return;
}
log.trace("Closing producer for: {} on: {}", getViewId(), params().member().getId());
serialize.release(10000);
serialize.shutdown();
controller.stop();
coordinator.stop();
ds.close();
Expand Down Expand Up @@ -221,32 +221,18 @@ private Digest getViewId() {
}

private void newEpoch(Integer epoch) {
Thread.ofVirtual().start(Utils.wrapped(() -> {
try {
serialize.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
try {
log.trace("new epoch: {} on: {}", epoch, params().member().getId());
assembly.newEpoch();
var last = epoch >= maxEpoch && assembled.get();
if (last) {
controller.completeIt();
Producer.this.transitions.viewComplete();
} else {
ds.reset();
}
transitions.newEpoch(epoch, last);
} finally {
serialize.release();
serialize.execute(Utils.wrapped(() -> {
log.trace("new epoch: {} on: {}", epoch, params().member().getId());
assembly.newEpoch();
var last = epoch >= maxEpoch && assembled.get();
if (last) {
controller.completeIt();
Producer.this.transitions.viewComplete();
} else {
ds.reset();
}
transitions.newEpoch(epoch, last);
}, log));
var awaiting = serialize.getQueueLength();
if (awaiting > 0) {
log.error("Serialize: {} on: {}", awaiting, params().member().getId());
}
}

private Parameters params() {
Expand Down Expand Up @@ -374,27 +360,14 @@ private void reconfigure() {
}

private void serial(List<ByteString> preblock, Boolean last) {
Thread.ofVirtual().start(() -> {
try {
serialize.acquire();
create(preblock, last);
} catch (Throwable t) {
log.error("Error processing preblock last: {} on: {}", last, params().member().getId(), t);
} finally {
serialize.release();
}
});
var awaiting = serialize.getQueueLength();
if (awaiting > 0) {
log.error("Serialize: {} on: {}", awaiting, params().member().getId());
}
serialize.execute(Utils.wrapped(() -> create(preblock, last), log));
}

private PendingBlock validate(Validate v) {
Digest hash = Digest.from(v.getHash());
var p = pending.get(hash);
if (p == null) {
pendingValidations.computeIfAbsent(hash, h -> new CopyOnWriteArrayList<>()).add(v);
pendingValidations.computeIfAbsent(hash, _ -> new CopyOnWriteArrayList<>()).add(v);
return null;
}
return validate(v, p, hash);
Expand Down Expand Up @@ -442,6 +415,7 @@ public void checkpoint() {
final var p = new PendingBlock(next, new HashMap<>(), new AtomicBoolean());
pending.put(next.hash, p);
p.witnesses.put(params().member(), validation);
assert next.block != null;
log.info("Produced: {} hash: {} height: {} for: {} on: {}", next.block.getBodyCase(), next.hash,
next.height(), getViewId(), params().member().getId());
processPendingValidations(next, p);
Expand Down
12 changes: 1 addition & 11 deletions choam/src/main/java/com/salesforce/apollo/choam/fsm/Driven.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ public void assemble() {
public Transitions assembled() {
return SPICE;
}

@Override
public Transitions newEpoch(int epoch, boolean lastEpoch) {
return lastEpoch ? PROTOCOL_FAILURE : null;
}
}, CHECKPOINTING {
@Entry
public void check() {
Expand Down Expand Up @@ -116,11 +111,6 @@ public void terminate() {
context().fail();
}
}, SPICE {
@Override
public Transitions newEpoch(int epoch, boolean lastEpoch) {
return lastEpoch ? PROTOCOL_FAILURE : null;
}

@Override
public Transitions viewComplete() {
return END_EPOCHS;
Expand Down Expand Up @@ -157,7 +147,7 @@ default Transitions lastBlock() {
}

default Transitions newEpoch(int epoch, boolean lastEpoch) {
throw fsm().invalidTransitionOn();
return null;
}

default Transitions start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public void start(Duration duration, Predicate<FernetServerInterceptor.HashedTok
log.error("Error in gossip on: {}", member.getId(), e);
}
}, log)), initialDelay.toMillis(), TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
log.trace("Rejected scheduling gossip on: {}", member.getId());
} catch (Throwable e) {
log.error("Error in gossip on: {}", member.getId(), e);
}
Expand Down
19 changes: 13 additions & 6 deletions fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,10 @@ Node getNode() {
return node;
}

boolean hasMajorityObservervations(boolean bootstrap) {
return bootstrap && context.size() == 1 || observations.size() >= context.majority();
}

boolean hasPendingRebuttals() {
return !pendingRebuttals.isEmpty();
}
Expand Down Expand Up @@ -835,7 +839,8 @@ private boolean add(NoteWrapper note) {
*/
private boolean add(SignedViewChange observation) {
var svu = new SVU(observation, digestAlgo);
if (!viewManagement.isObserver(svu.observer)) {
var highWater = viewManagement.highWater(svu.observer);
if (highWater == null) {
log.trace("Invalid observer: {} current: {} on: {}", svu.observer, currentView(), node.getId());
return false;
}
Expand All @@ -845,17 +850,17 @@ private boolean add(SignedViewChange observation) {
node.getId());
return false;
}
if (highWater >= svu.attempt) {
log.trace("Redundant view change: {} current: {} view: {} from {} on: {}", svu.attempt, highWater,
currentView(), svu.observer, node.getId());
return false;
}
final var member = context.getActiveMember(svu.observer);
if (member == null) {
log.trace("Cannot validate view change: {} current: {} from: {} on: {}", inView, currentView(),
svu.observer, node.getId());
return false;
}
if (!viewManagement.isObserver(member.id)) {
log.trace("Not an observer of: {} current: {} from: {} on: {}", inView, currentView(), svu.observer,
node.getId());
return false;
}
final var signature = JohnHancock.from(observation.getSignature());
if (!member.verify(signature, observation.getChange().toByteString())) {
return false;
Expand All @@ -866,6 +871,8 @@ private boolean add(SignedViewChange observation) {
log.trace("Stale observation: {} current: {} view change: {} current: {} offline: {} on: {}",
svu.attempt, cur.attempt, inView, currentView(), svu.observer, node.getId());
return cur;
} else {
viewManagement.updateHighWater(d, svu.attempt);
}
}
log.trace("Observation: {} current: {} view change: {} from: {} on: {}", svu.attempt, inView, currentView(),
Expand Down
Loading

0 comments on commit 7d22597

Please sign in to comment.