Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

misc-7 #210

Merged
merged 14 commits into from
Jun 15, 2024
Merged
22 changes: 11 additions & 11 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.salesforce.apollo.context.Context;
import com.salesforce.apollo.context.DelegatedContext;
import com.salesforce.apollo.context.StaticContext;
import com.salesforce.apollo.context.ViewChange;
import com.salesforce.apollo.cryptography.*;
import com.salesforce.apollo.cryptography.Signer.SignerImpl;
import com.salesforce.apollo.cryptography.proto.PubKey;
Expand Down Expand Up @@ -322,15 +323,14 @@ public String logState() {

/**
* A view change has occurred
*
* @param context - the new membership context
* @param diadem - the compact HexBloom of the context view
*/
public void rotateViewKeys(Context<Member> context, Digest diadem) {
public void rotateViewKeys(ViewChange viewChange) {
var context = viewChange.context();
var diadem = viewChange.diadem();
((DelegatedContext<Member>) combine.getContext()).setContext(context);
var c = current.get();
if (c != null) {
c.nextView(diadem, context);
c.nextView(viewChange.diadem(), context);
} else {
log.info("Acquiring new view of: {}, diadem: {} size: {} on: {}", context.getId(), diadem, context.size(),
params.member().getId());
Expand Down Expand Up @@ -361,11 +361,11 @@ public void stop() {
}
session.cancelAll();
try {
linear.shutdown();
linear.shutdownNow();
} catch (Throwable e) {
}
try {
executions.shutdown();
executions.shutdownNow();
} catch (Throwable e) {
}
final var c = current.get();
Expand Down Expand Up @@ -1239,16 +1239,16 @@ private void synchronizationFailed() {
cancelSynchronization();
Context<Member> memberContext = context();
var activeCount = memberContext.size();
var majority = params.majority();
if (params.generateGenesis() && activeCount >= majority) {
var count = context().getRingCount();
if (params.generateGenesis() && activeCount >= context().getRingCount()) {
if (current.get() == null && current.compareAndSet(null, new Formation())) {
log.info(
"Quorum achieved, triggering regeneration. members: {} required: {} forming Genesis committee on: {}",
activeCount, majority, params.member().getId());
activeCount, count, params.member().getId());
transitions.regenerate();
} else {
log.info("Quorum achieved, members: {} required: {} existing committee: {} on: {}", activeCount,
majority, current.get().getClass().getSimpleName(), params.member().getId());
count, current.get().getClass().getSimpleName(), params.member().getId());
}
} else {
final var c = current.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms,
} else {
config.setPid(pid).setnProc((short) view.roster().size());
}
config.setEpochLength(7).setNumberOfEpochs(3);
config.setEpochLength(33).setNumberOfEpochs(-1);
config.setLabel("Genesis Assembly" + view.context().getId() + " on: " + params().member().getId());
controller = new Ethereal(config.build(), params().producer().maxBatchByteSize(), dataSource(),
transitions::process, transitions::nextEpoch, label);
Expand All @@ -106,13 +106,11 @@ public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms,

@Override
public void certify() {
if (slate.size() < params().majority()) {
log.info("Not certifying genesis for: {} need: {} slate incomplete: {} on: {}", view.context().getId(),
params().majority(), slate.keySet().stream().sorted().toList(), params().member().getId());
if (slate.size() != nextAssembly.size()) {
log.info("Not certifying genesis for: {} slate incomplete: {} on: {}", view.context().getId(),
slate.keySet().stream().sorted().toList(), params().member().getId());
return;
}
assert slate.size() >= params().majority() : "Expected: %s members, slate: %s".formatted(params().majority(),
slate.size());
reconfiguration = new HashedBlock(params().digestAlgorithm(), view.genesis(slate, view.context().getId(),
new NullBlock(
params().digestAlgorithm())));
Expand All @@ -130,7 +128,7 @@ public void certify(List<ByteString> preblock, boolean last) {
try {
return Validate.parseFrom(bs);
} catch (InvalidProtocolBufferException e) {
log.warn("Unable to parse preblock: {} on: {}", bs, params().member().getId(), e);
log.trace("Unable to parse preblock: {} on: {}", bs, params().member().getId(), e);
return null;
}
}).filter(Objects::nonNull).filter(v -> !v.equals(Validate.getDefaultInstance())).forEach(this::certify);
Expand All @@ -154,7 +152,7 @@ public void gather(List<ByteString> preblock, boolean last) {
try {
return Join.parseFrom(bs);
} catch (InvalidProtocolBufferException e) {
log.warn("error parsing join: {} on: {}", bs, params().member().getId(), e);
log.trace("error parsing join: {} on: {}", bs, params().member().getId(), e);
return null;
}
})
Expand All @@ -163,6 +161,9 @@ public void gather(List<ByteString> preblock, boolean last) {
.peek(j -> log.info("Gathering: {} on: {}", Digest.from(j.getMember().getVm().getId()),
params().member().getId()))
.forEach(this::join);
if (slate.size() == nextAssembly.size()) {
transitions.gathered();
}
}

@Override
Expand All @@ -172,7 +173,7 @@ public void nominations(List<ByteString> preblock, boolean last) {
try {
return Validations.parseFrom(bs);
} catch (InvalidProtocolBufferException e) {
log.warn("error parsing validations: {} on: {}", bs, params().member().getId(), e);
log.trace("error parsing validations: {} on: {}", bs, params().member().getId(), e);
return null;
}
})
Expand All @@ -187,12 +188,12 @@ public void publish() {
log.trace("Cannot publish genesis, reconfiguration is NULL on: {}", params().member().getId());
return;
}
if (witnesses.size() < params().majority()) {
if (witnesses.size() < nextAssembly.size()) {
log.trace("Cannot publish genesis: {} with: {} witnesses on: {}", reconfiguration.hash, witnesses.size(),
params().member().getId());
return;
}
if (reconfiguration.block.getGenesis().getInitialView().getJoinsCount() < params().majority()) {
if (reconfiguration.block.getGenesis().getInitialView().getJoinsCount() < nextAssembly.size()) {
log.trace("Cannot publish genesis: {} with: {} joins on: {}", reconfiguration.hash,
reconfiguration.block.getGenesis().getInitialView().getJoinsCount(), params().member().getId());
return;
Expand Down
22 changes: 2 additions & 20 deletions choam/src/main/java/com/salesforce/apollo/choam/Parameters.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public record Parameters(Parameters.RuntimeParameters runtime, ReliableBroadcast
Parameters.BootstrapParameters bootstrap, Parameters.ProducerParameters producer,
Parameters.MvStoreBuilder mvBuilder, Parameters.LimiterBuilder txnLimiterBuilder,
ExponentialBackoffPolicy.Builder submitPolicy, int checkpointSegmentSize,
ExponentialBackoffPolicy.Builder drainPolicy, boolean generateGenesis) {
boolean generateGenesis) {

public static Builder newBuilder() {
return new Builder();
Expand Down Expand Up @@ -677,14 +677,6 @@ public static class Builder implements Cloneable {
private ReliableBroadcaster.Parameters combine = ReliableBroadcaster.Parameters.newBuilder()
.build();
private DigestAlgorithm digestAlgorithm = DigestAlgorithm.DEFAULT;
private ExponentialBackoffPolicy.Builder drainPolicy = ExponentialBackoffPolicy.newBuilder()
.setInitialBackoff(
Duration.ofMillis(5))
.setJitter(0.2)
.setMultiplier(1.2)
.setMaxBackoff(
Duration.ofMillis(
500));
private Digest genesisViewId;
private Duration gossipDuration = Duration.ofSeconds(1);
private int maxCheckpointSegments = 200;
Expand All @@ -709,7 +701,7 @@ public Parameters build(RuntimeParameters runtime) {
return new Parameters(runtime, combine, gossipDuration, maxCheckpointSegments, submitTimeout, genesisViewId,
checkpointBlockDelta, crowns, digestAlgorithm, viewSigAlgorithm,
synchronizationCycles, regenerationCycles, bootstrap, producer, mvBuilder,
txnLimiterBuilder, submitPolicy, checkpointSegmentSize, drainPolicy, generateGenesis);
txnLimiterBuilder, submitPolicy, checkpointSegmentSize, generateGenesis);
}

@Override
Expand All @@ -726,7 +718,6 @@ public Builder clone() {
producer.batchInterval, producer.maxBatchCount(), producer.maxGossipDelay));
clone.setTxnLimiterBuilder(txnLimiterBuilder.clone());
clone.setSubmitPolicy(submitPolicy.clone());
clone.setDrainPolicy(drainPolicy.clone());
return clone;
}

Expand Down Expand Up @@ -783,15 +774,6 @@ public Builder setDigestAlgorithm(DigestAlgorithm digestAlgorithm) {
return this;
}

public ExponentialBackoffPolicy.Builder getDrainPolicy() {
return drainPolicy;
}

public Builder setDrainPolicy(ExponentialBackoffPolicy.Builder drainPolicy) {
this.drainPolicy = drainPolicy;
return this;
}

public Digest getGenesisViewId() {
return genesisViewId;
}
Expand Down
20 changes: 7 additions & 13 deletions choam/src/main/java/com/salesforce/apollo/choam/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public class Producer {
private final Semaphore serialize = new Semaphore(1);
private final ViewAssembly assembly;
private final int maxEpoch;
private volatile int emptyPreBlocks = 0;
private volatile boolean assembled = false;

public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint, String label) {
Expand All @@ -76,8 +75,7 @@ public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, Hash
maxEpoch = ep.getEpochLength();

ds = new TxDataSource(params.member(), blocks, params.metrics(), producerParams.maxBatchByteSize(),
producerParams.batchInterval(), producerParams.maxBatchCount(),
params().drainPolicy().build());
producerParams.batchInterval(), producerParams.maxBatchCount());

log.debug("Producer max elements: {} reconfiguration epoch: {} on: {}", blocks, maxEpoch,
params.member().getId());
Expand Down Expand Up @@ -206,7 +204,7 @@ private void create(List<ByteString> preblock, boolean last) {
processAssemblies(aggregate);
processTransactions(last, aggregate);
if (last) {
started.set(true);
started.set(false);
transitions.lastBlock();
}
}
Expand Down Expand Up @@ -255,15 +253,11 @@ private void processTransactions(boolean last, List<UnitData> aggregate) {
final var txns = aggregate.stream().flatMap(e -> e.getTransactionsList().stream()).toList();

if (txns.isEmpty()) {
var empty = emptyPreBlocks + 1;
emptyPreBlocks = empty;
if (empty % 5 == 0) {
pending.values()
.stream()
.filter(pb -> pb.published.get())
.max(Comparator.comparing(pb -> pb.block.height()))
.ifPresent(pb -> publish(pb, true));
}
pending.values()
.stream()
.filter(pb -> pb.published.get())
.max(Comparator.comparing(pb -> pb.block.height()))
.ifPresent(pb -> publish(pb, true));
return;
}
log.trace("transactions: {} combined hash: {} height: {} on: {}", txns.size(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ public String toString() {
private class Recon implements Reconfiguration {
@Override
public void certify() {
countdown.set(-1);
if (proposals.size() == selected.assembly.size()) {
log.info("Certifying: {} majority: {} of: {} slate: {} on: {}", nextViewId, selected.majority,
nextViewId, proposals.keySet().stream().sorted().toList(), params().member().getId());
Expand Down Expand Up @@ -440,6 +441,7 @@ public void chill() {

@Override
public void complete() {
countdown.set(-1);
ViewAssembly.this.complete();
}

Expand All @@ -454,12 +456,14 @@ public void convened() {

@Override
public void failed() {
countdown.set(-1);
view.onFailure();
log.debug("Failed view assembly for: {} on: {}", nextViewId, params().member().getId());
}

@Override
public void finish() {
countdown.set(-1);
started.set(false);
}

Expand Down
65 changes: 65 additions & 0 deletions choam/src/main/java/com/salesforce/apollo/choam/fsm/Combine.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,71 @@ public Transitions rotateViewKeys() {
public void failIt() {
context().fail();
}

@Override
public Transitions beginCheckpoint() {
return null;
}

@Override
public Transitions bootstrap(HashedCertifiedBlock anchor) {
return null;
}

@Override
public Transitions combine() {
return null;
}

@Override
public Transitions fail() {
return null;
}

@Override
public Transitions finishCheckpoint() {
return null;
}

@Override
public Transitions nextView() {
return null;
}

@Override
public Transitions regenerate() {
return null;
}

@Override
public Transitions regenerated() {
return null;
}

@Override
public Transitions rotateViewKeys() {
return null;
}

@Override
public Transitions start() {
return null;
}

@Override
public Transitions synchd() {
return null;
}

@Override
public Transitions synchronizationFailed() {
return null;
}

@Override
public Transitions synchronizing() {
return null;
}
}, RECOVERING {
@Override
public Transitions bootstrap(HashedCertifiedBlock anchor) {
Expand Down
11 changes: 9 additions & 2 deletions choam/src/main/java/com/salesforce/apollo/choam/fsm/Genesis.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,12 @@ public void gather() {

@Override
public Transitions nextEpoch(Integer epoch) {
return epoch.equals(0) ? null : CERTIFICATION;
return null;
}

@Override
public Transitions gathered() {
return CERTIFICATION;
}

@Override
Expand All @@ -80,11 +84,14 @@ public Transitions process(List<ByteString> preblock, boolean last) {
return null;
}
}

}

interface Transitions extends FsmExecutor<Genesis, Genesis.Transitions> {

default Transitions gathered() {
throw fsm().invalidTransitionOn();
}

default Transitions nextEpoch(Integer epoch) {
throw fsm().invalidTransitionOn();
}
Expand Down
Loading
Loading