Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

super-majority #214

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
db9c349
use sampling for msg limits
Hellblazer Jun 18, 2024
a82a337
max
Hellblazer Jun 18, 2024
b63093d
no sampling of accusations
Hellblazer Jun 18, 2024
e191df8
no sampling of observations
Hellblazer Jun 18, 2024
4a47caa
better shuffling, etc.
Hellblazer Jun 18, 2024
1f0a47f
d'oh. observers now concurrent skip list. reject joins if not observe…
Hellblazer Jun 18, 2024
eae25e7
enjoin. replicated join to observers in the cluster
Hellblazer Jun 19, 2024
321724d
interim
Hellblazer Jun 22, 2024
2ca8e1c
fixins. Reimplement ReservoirSampler. 5x5
Hellblazer Jun 22, 2024
5b3084e
fixins. Reimplement ReservoirSampler. 5x5
Hellblazer Jun 22, 2024
6d6bd30
handle unknown accusations, initiate view change under lock
Hellblazer Jun 22, 2024
129fa49
serialize via single threaded executor
Hellblazer Jun 23, 2024
b158e5c
cleanup
Hellblazer Jun 23, 2024
fe46823
handle error already sent
Hellblazer Jun 23, 2024
7d18010
multiple fixin's
Hellblazer Jun 23, 2024
1aab2ab
Max length BLOBs for all binary values. reuse/close scheduled executors.
Hellblazer Jun 27, 2024
fb7f118
consolidate executors - just fork threads - and schedulers. better co…
Hellblazer Jun 28, 2024
e40b25c
consolidate executors - just fork threads - and schedulers. better co…
Hellblazer Jun 28, 2024
a799b86
firm up join
Hellblazer Jun 29, 2024
a6ae389
don't use parallel stream on join.
Hellblazer Jun 29, 2024
dfa1e01
decouple
Hellblazer Jun 29, 2024
7d22597
better observation attempt lifecycle. use executor for linearizing
Hellblazer Jun 30, 2024
bd8393a
moar observation lifecycle fixie. don't "start/end" ViewAssembly
Hellblazer Jul 2, 2024
867de1a
subtle
Hellblazer Jul 4, 2024
d743f34
oops
Hellblazer Jul 4, 2024
2bdb1b7
refactor bft subset. simplify TxDataSource
Hellblazer Jul 4, 2024
a143a36
require 11; moar TxDataSource tweak
Hellblazer Jul 4, 2024
fd77fdf
D'oh. Fix maxEpochs
Hellblazer Jul 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 152 additions & 122 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.salesforce.apollo.cryptography.JohnHancock;
import com.salesforce.apollo.cryptography.Verifier;
import com.salesforce.apollo.cryptography.Verifier.DefaultVerifier;
import com.salesforce.apollo.ethereal.Dag;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.membership.MockMember;
import io.grpc.StatusRuntimeException;
Expand All @@ -35,6 +36,8 @@ public interface Committee {

static Map<Member, Verifier> validatorsOf(Reconfigure reconfigure, Context<Member> context, Digest member,
Logger log) {
assert Dag.validate(reconfigure.getJoinsCount()) : "Reconfigure joins: %s is not BFT".formatted(
reconfigure.getJoinsCount());
var validators = reconfigure.getJoinsList().stream().collect(Collectors.toMap(e -> {
var id = new Digest(e.getMember().getVm().getId());
var m = context.getMember(id);
Expand All @@ -47,7 +50,6 @@ static Map<Member, Verifier> validatorsOf(Reconfigure reconfigure, Context<Membe
}, e -> {
var vm = e.getMember().getVm();
if (vm.hasConsensusKey()) {

return new DefaultVerifier(publicKey(vm.getConsensusKey()));
} else {
log.info("No member for validator: {}, returning mock on: {}", Digest.from(vm.getId()), member);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.salesforce.apollo.choam.support.HashedBlock;
import com.salesforce.apollo.choam.support.HashedCertifiedBlock;
import com.salesforce.apollo.choam.support.HashedCertifiedBlock.NullBlock;
import com.salesforce.apollo.choam.support.OneShot;
import com.salesforce.apollo.context.Context;
import com.salesforce.apollo.context.StaticContext;
import com.salesforce.apollo.cryptography.Digest;
Expand All @@ -32,7 +31,7 @@

import java.security.PublicKey;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

Expand All @@ -45,26 +44,27 @@
* @author hal.hildebrand
*/
public class GenesisAssembly implements Genesis {
private static final Logger log = LoggerFactory.getLogger(GenesisAssembly.class);
private final Ethereal controller;
private final ChRbcGossip coordinator;
private final SignedViewMember genesisMember;
private final Map<Digest, Member> nextAssembly;
private final AtomicBoolean published = new AtomicBoolean();
private final Map<Digest, Join> slate = new ConcurrentHashMap<>();
private final AtomicBoolean started = new AtomicBoolean();
private final Transitions transitions;
private final ViewContext view;
private final Map<Member, Validate> witnesses = new ConcurrentHashMap<>();
private final OneShot ds;
private final List<Validate> pendingValidations = new ArrayList<>();
private volatile Thread blockingThread;
private volatile HashedBlock reconfiguration;
private static final Logger log = LoggerFactory.getLogger(GenesisAssembly.class);
private final Ethereal controller;
private final ChRbcGossip coordinator;
private final SignedViewMember genesisMember;
private final Map<Digest, Member> nextAssembly;
private final AtomicBoolean published = new AtomicBoolean();
private final Map<Digest, Join> slate = new ConcurrentHashMap<>();
private final AtomicBoolean started = new AtomicBoolean();
private final Transitions transitions;
private final ViewContext view;
private final Map<Member, Validate> witnesses = new ConcurrentHashMap<>();
private final BlockingDeque<ByteString> ds;
private final List<Validate> pendingValidations = new ArrayList<>();
private final ScheduledExecutorService scheduler;
private volatile HashedBlock reconfiguration;

public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms, SignedViewMember genesisMember,
String label) {
String label, ScheduledExecutorService scheduler) {
view = vc;
ds = new OneShot();
this.scheduler = scheduler;
ds = new LinkedBlockingDeque<>(1024);
Digest hash = view.context().getId();
nextAssembly = ((Set<Member>) ((Context<? super Member>) view.pendingViews().last().context()).bftSubset(
hash)).stream().collect(Collectors.toMap(Member::getId, m -> m));
Expand Down Expand Up @@ -99,7 +99,8 @@ public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms,
transitions::process, transitions::nextEpoch, label);
coordinator = new ChRbcGossip(reContext.getId(), params().member(), nextAssembly.values(),
controller.processor(), params().communications(),
params().metrics() == null ? null : params().metrics().getGensisMetrics());
params().metrics() == null ? null : params().metrics().getGensisMetrics(),
scheduler);
log.debug("Genesis Assembly: {} recontext: {} next assembly: {} on: {}", view.context().getId(),
reContext.getId(), nextAssembly.keySet(), params().member().getId());
}
Expand All @@ -117,7 +118,7 @@ public void certify() {
var validate = view.generateValidation(reconfiguration);
log.debug("Certifying genesis block: {} for: {} slate: {} on: {}", reconfiguration.hash, view.context().getId(),
slate.keySet().stream().sorted().toList(), params().member().getId());
ds.setValue(validate.toByteString());
ds.add(validate.toByteString());
witnesses.put(params().member(), validate);
pendingValidations.forEach(v -> certify(v));
}
Expand All @@ -140,7 +141,7 @@ public void gather() {
var join = Join.newBuilder().setMember(genesisMember).setKerl(params().kerl().get()).build();
slate.put(params().member().getId(), join);

ds.setValue(join.toByteString());
ds.add(join.toByteString());
coordinator.start(params().producer().gossipDuration());
controller.start();
}
Expand Down Expand Up @@ -230,11 +231,6 @@ public void stop() {
log.trace("Stopping genesis assembly: {} on: {}", view.context().getId(), params().member().getId());
coordinator.stop();
controller.stop();
final var cur = blockingThread;
blockingThread = null;
if (cur != null) {
cur.interrupt();
}
}

private void certify(Validate v) {
Expand Down Expand Up @@ -262,11 +258,11 @@ private DataSource dataSource() {
return ByteString.EMPTY;
}
try {
blockingThread = Thread.currentThread();
final var take = ds.get();
return take;
} finally {
blockingThread = null;
var data = ds.poll(100, TimeUnit.MILLISECONDS);
return data == null ? ByteString.EMPTY : data;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ public static class Builder implements Cloneable {
private Supplier<KERL_> kerl = () -> KERL_.getDefaultInstance();
private SigningMember member;
private ChoamMetrics metrics;
private TransactionExecutor processor = (i, h, t, f, exec) -> {
private TransactionExecutor processor = (i, h, t, f) -> {
};
private BiConsumer<HashedBlock, CheckpointState> restorer = (height, checkpointState) -> {
};
Expand Down
Loading
Loading