Skip to content

Commit

Permalink
Max length BLOBs for all binary values. reuse/close scheduled executors.
Browse files Browse the repository at this point in the history
Pretty massive refactoring of the stereotomy and thoth schemas.  :: sigh ::
  • Loading branch information
Hellblazer committed Jun 27, 2024
1 parent 7d18010 commit 1aab2ab
Show file tree
Hide file tree
Showing 33 changed files with 577 additions and 561 deletions.
61 changes: 34 additions & 27 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,11 @@ public class CHOAM {
private final TransSubmission txnSubmission = new TransSubmission();
private final AtomicReference<HashedCertifiedBlock> view = new AtomicReference<>();
private final PendingViews pendingViews = new PendingViews();
private final ScheduledExecutorService scheduler;
private volatile AtomicBoolean ongoingJoin;

public CHOAM(Parameters params) {
scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
this.store = new Store(params.digestAlgorithm(), params.mvBuilder().clone().build());
this.params = params;
executions = Executors.newVirtualThreadPerTaskExecutor();
Expand Down Expand Up @@ -351,11 +353,19 @@ public void stop() {
if (!started.compareAndSet(true, false)) {
return;
}
session.cancelAll();
try {
linear.shutdownNow();
} catch (Throwable e) {
}
try {
scheduler.shutdownNow();
} catch (Throwable e) {
}
session.cancelAll();
try {
session.stop();
} catch (Throwable e) {
}
try {
executions.shutdownNow();
} catch (Throwable e) {
Expand Down Expand Up @@ -1393,31 +1403,28 @@ private void join(View view) {
var joined = new AtomicInteger();
var halt = new AtomicBoolean(false);
ongoingJoin = halt;
Thread.ofVirtual().start(Utils.wrapped(() -> {
log.trace("Starting join of: {} diadem {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()),
params.member().getId());

var scheduler = Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory());
AtomicReference<Runnable> action = new AtomicReference<>();
var attempts = new AtomicInteger();
action.set(() -> {
log.trace("Join attempt: {} halt: {} joined: {} majority: {} on: {}", attempts.incrementAndGet(),
halt.get(), joined.get(), view.getMajority(), params.member().getId());
if (!halt.get() & joined.get() < view.getMajority()) {
join(view, servers, joined);
if (joined.get() >= view.getMajority()) {
ongoingJoin = null;
log.trace("Finished join of: {} diadem: {} joins: {} on: {}", nextViewId.get(),
Digest.from(view.getDiadem()), joined.get(), params.member().getId());
} else if (!halt.get()) {
log.trace("Rescheduling join of: {} diadem: {} joins: {} on: {}", nextViewId.get(),
Digest.from(view.getDiadem()), joined.get(), params.member().getId());
scheduler.schedule(action.get(), 50, TimeUnit.MILLISECONDS);
}
log.trace("Starting join of: {} diadem {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()),
params.member().getId());
var scheduler = Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory());
AtomicReference<Runnable> action = new AtomicReference<>();
var attempts = new AtomicInteger();
action.set(() -> {
log.trace("Join attempt: {} halt: {} joined: {} majority: {} on: {}", attempts.incrementAndGet(),
halt.get(), joined.get(), view.getMajority(), params.member().getId());
if (!halt.get() & joined.get() < view.getMajority()) {
join(view, servers, joined);
if (joined.get() >= view.getMajority()) {
ongoingJoin = null;
log.trace("Finished join of: {} diadem: {} joins: {} on: {}", nextViewId.get(),
Digest.from(view.getDiadem()), joined.get(), params.member().getId());
} else if (!halt.get()) {
log.trace("Rescheduling join of: {} diadem: {} joins: {} on: {}", nextViewId.get(),
Digest.from(view.getDiadem()), joined.get(), params.member().getId());
scheduler.schedule(action.get(), 50, TimeUnit.MILLISECONDS);
}
});
scheduler.schedule(action.get(), 50, TimeUnit.MILLISECONDS);
}, log()));
}
});
scheduler.schedule(action.get(), 50, TimeUnit.MILLISECONDS);
}

private void join(View view, Collection<Member> members, AtomicInteger joined) {
Expand Down Expand Up @@ -1520,7 +1527,7 @@ private class Associate extends Administration {
var pv = pendingViews();
producer = new Producer(nextViewId.get(),
new ViewContext(context, params, pv, signer, validators, constructBlock()),
head.get(), checkpoint.get(), getLabel());
head.get(), checkpoint.get(), getLabel(), scheduler);
producer.start();
}

Expand Down Expand Up @@ -1575,7 +1582,7 @@ private Formation() {
.setVm(inView)
.setSignature(params.member().sign(inView.toByteString()).toSig())
.build();
assembly = new GenesisAssembly(vc, comm, svm, getLabel());
assembly = new GenesisAssembly(vc, comm, svm, getLabel(), scheduler);
log.info("Setting next view id to genesis: {} on: {}", params.genesisViewId(), params.member().getId());
nextViewId.set(params.genesisViewId());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.salesforce.apollo.choam.support.HashedBlock;
import com.salesforce.apollo.choam.support.HashedCertifiedBlock;
import com.salesforce.apollo.choam.support.HashedCertifiedBlock.NullBlock;
import com.salesforce.apollo.choam.support.OneShot;
import com.salesforce.apollo.context.Context;
import com.salesforce.apollo.context.StaticContext;
import com.salesforce.apollo.cryptography.Digest;
Expand All @@ -32,7 +31,7 @@

import java.security.PublicKey;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

Expand All @@ -45,26 +44,27 @@
* @author hal.hildebrand
*/
public class GenesisAssembly implements Genesis {
private static final Logger log = LoggerFactory.getLogger(GenesisAssembly.class);
private final Ethereal controller;
private final ChRbcGossip coordinator;
private final SignedViewMember genesisMember;
private final Map<Digest, Member> nextAssembly;
private final AtomicBoolean published = new AtomicBoolean();
private final Map<Digest, Join> slate = new ConcurrentHashMap<>();
private final AtomicBoolean started = new AtomicBoolean();
private final Transitions transitions;
private final ViewContext view;
private final Map<Member, Validate> witnesses = new ConcurrentHashMap<>();
private final OneShot ds;
private final List<Validate> pendingValidations = new ArrayList<>();
private volatile Thread blockingThread;
private volatile HashedBlock reconfiguration;
private static final Logger log = LoggerFactory.getLogger(GenesisAssembly.class);
private final Ethereal controller;
private final ChRbcGossip coordinator;
private final SignedViewMember genesisMember;
private final Map<Digest, Member> nextAssembly;
private final AtomicBoolean published = new AtomicBoolean();
private final Map<Digest, Join> slate = new ConcurrentHashMap<>();
private final AtomicBoolean started = new AtomicBoolean();
private final Transitions transitions;
private final ViewContext view;
private final Map<Member, Validate> witnesses = new ConcurrentHashMap<>();
private final BlockingDeque<ByteString> ds;
private final List<Validate> pendingValidations = new ArrayList<>();
private final ScheduledExecutorService scheduler;
private volatile HashedBlock reconfiguration;

public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms, SignedViewMember genesisMember,
String label) {
String label, ScheduledExecutorService scheduler) {
view = vc;
ds = new OneShot();
this.scheduler = scheduler;
ds = new LinkedBlockingDeque<>(1024);
Digest hash = view.context().getId();
nextAssembly = ((Set<Member>) ((Context<? super Member>) view.pendingViews().last().context()).bftSubset(
hash)).stream().collect(Collectors.toMap(Member::getId, m -> m));
Expand Down Expand Up @@ -99,7 +99,8 @@ public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms,
transitions::process, transitions::nextEpoch, label);
coordinator = new ChRbcGossip(reContext.getId(), params().member(), nextAssembly.values(),
controller.processor(), params().communications(),
params().metrics() == null ? null : params().metrics().getGensisMetrics());
params().metrics() == null ? null : params().metrics().getGensisMetrics(),
scheduler);
log.debug("Genesis Assembly: {} recontext: {} next assembly: {} on: {}", view.context().getId(),
reContext.getId(), nextAssembly.keySet(), params().member().getId());
}
Expand All @@ -117,7 +118,7 @@ public void certify() {
var validate = view.generateValidation(reconfiguration);
log.debug("Certifying genesis block: {} for: {} slate: {} on: {}", reconfiguration.hash, view.context().getId(),
slate.keySet().stream().sorted().toList(), params().member().getId());
ds.setValue(validate.toByteString());
ds.add(validate.toByteString());
witnesses.put(params().member(), validate);
pendingValidations.forEach(v -> certify(v));
}
Expand All @@ -140,7 +141,7 @@ public void gather() {
var join = Join.newBuilder().setMember(genesisMember).setKerl(params().kerl().get()).build();
slate.put(params().member().getId(), join);

ds.setValue(join.toByteString());
ds.add(join.toByteString());
coordinator.start(params().producer().gossipDuration());
controller.start();
}
Expand Down Expand Up @@ -230,11 +231,6 @@ public void stop() {
log.trace("Stopping genesis assembly: {} on: {}", view.context().getId(), params().member().getId());
coordinator.stop();
controller.stop();
final var cur = blockingThread;
blockingThread = null;
if (cur != null) {
cur.interrupt();
}
}

private void certify(Validate v) {
Expand Down Expand Up @@ -262,11 +258,11 @@ private DataSource dataSource() {
return ByteString.EMPTY;
}
try {
blockingThread = Thread.currentThread();
final var take = ds.get();
return take;
} finally {
blockingThread = null;
var data = ds.poll(100, TimeUnit.MILLISECONDS);
return data == null ? ByteString.EMPTY : data;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
};
}
Expand Down
11 changes: 7 additions & 4 deletions choam/src/main/java/com/salesforce/apollo/choam/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@ public class Producer {
private final Transitions transitions;
private final ViewContext view;
private final Digest nextViewId;
private final Executor serialize = Executors.newSingleThreadExecutor(
Thread.ofVirtual().factory());
private final ExecutorService serialize = Executors.newSingleThreadExecutor();
private final ViewAssembly assembly;
private final int maxEpoch;
private final ScheduledExecutorService scheduler;
private volatile boolean assembled = false;

public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint, String label) {
public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint, String label,
ScheduledExecutorService scheduler) {
this.scheduler = scheduler;
assert view != null;
this.view = view;
this.previousBlock.set(lastBlock);
Expand Down Expand Up @@ -99,7 +101,7 @@ public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, Hash
controller = new Ethereal(config.build(), params().producer().maxBatchByteSize() + (8 * 1024), ds, this::serial,
this::newEpoch, label);
coordinator = new ChRbcGossip(view.context().getId(), params().member(), view.membership(),
controller.processor(), params().communications(), producerMetrics);
controller.processor(), params().communications(), producerMetrics, scheduler);
log.debug("Roster for: {} is: {} on: {}", getViewId(), view.roster(), params().member().getId());

var onConsensus = new CompletableFuture<ViewAssembly.Vue>();
Expand Down Expand Up @@ -148,6 +150,7 @@ public void stop() {
return;
}
log.trace("Closing producer for: {} on: {}", getViewId(), params().member().getId());
serialize.shutdown();
controller.stop();
coordinator.stop();
ds.close();
Expand Down
4 changes: 4 additions & 0 deletions choam/src/main/java/com/salesforce/apollo/choam/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ public void setView(HashedCertifiedBlock v) {
}
}

public void stop() {
scheduler.shutdown();
}

/**
* Submit a transaction.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private void gossip(ScheduledExecutorService scheduler, Duration duration) {
diadem.compactWrapped(), member.getId());

var ringer = new SliceIterator<>("Assembly[%s:%s]".formatted(diadem.compactWrapped(), member.getId()), member,
committee, comms);
committee, comms, scheduler);
ringer.iterate((link) -> {
log.debug("Requesting Seeding from: {} on: {}", link.getMember().getId(), member.getId());
return gossip(link);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -175,7 +176,8 @@ public Block reconfigure(Map<Digest, Join> joining, Digest nextViewId, HashedBlo
.setVm(vm)
.setSignature(((SigningMember) m).sign(vm.toByteString()).toSig())
.build();
genii.put(m, new GenesisAssembly(view, comms.get(m), svm, m.getId().toString()));
genii.put(m, new GenesisAssembly(view, comms.get(m), svm, m.getId().toString(),
Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory())));
});

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
import java.util.UUID;
import java.util.stream.Stream;

import static com.salesforce.apollo.cryptography.DigestAlgorithm.EMPTY;

/**
* A computed digest
*
* @author hal.hildebrand
*/
public class Digest implements Comparable<Digest> {
public static final Digest NONE = new Digest(DigestAlgorithm.NONE, new byte[0]) {
public static final Digest NONE = new Digest(DigestAlgorithm.NONE, new long[] { 0L }) {

@Override
public String toString() {
Expand Down Expand Up @@ -74,11 +76,15 @@ public Digest(DigestAlgorithm algo, long[] hash) {

public Digest(Digeste d) {
algorithm = DigestAlgorithm.fromDigestCode(d.getType());
assert d.getHashCount() == algorithm.longLength();
hash = new long[d.getHashCount()];
int i = 0;
for (long l : d.getHashList()) {
hash[i++] = l;
if (algorithm.equals(DigestAlgorithm.NONE)) {
hash = EMPTY;
} else {
assert d.getHashCount() == algorithm.longLength();
hash = new long[d.getHashCount()];
int i = 0;
for (long l : d.getHashList()) {
hash[i++] = l;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,12 @@ public byte digestCode() {

@Override
public int digestLength() {
return 0;
return 8;
}

@Override
public int longLength() {
return 1;
}

@Override
Expand All @@ -237,12 +242,12 @@ public Digest getOrigin() {

@Override
public byte[] hashOf(byte[] bytes, int len) {
return EMPTY;
return EMPTY_BYTES;
}

@Override
public byte[] hashOf(InputStream is) {
return EMPTY;
return EMPTY_BYTES;
}
}, SHA2_256 {
@Override
Expand Down Expand Up @@ -316,7 +321,8 @@ public int digestLength() {

public static final DigestAlgorithm DEFAULT = BLAKE2B_256;
public static final long MAX_UNSIGNED_LONG = -1L;
private static final byte[] EMPTY = new byte[0];
public static final long[] EMPTY = new long[] { 0L };
public static final byte[] EMPTY_BYTES = new byte[] { 0, 0, 0, 0, 0, 0, 0, 0 };
private static final long[] LAST_32 = new long[4];
private static final long[] LAST_64 = new long[8];
private static final ThreadLocal<DigestCache> MESSAGE_DIGEST = ThreadLocal.withInitial(() -> new DigestCache());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,10 @@ assert parentsOnPreviousLevel(u) >= quorum : "Parents: " + Arrays.asList(u.paren
private ByteString getData(int level) {
if (level < conf.lastLevel()) {
if (ds != null) {
log.info("Requesting timing unit: {} on: {}", level, conf.logLabel());
return ds.getData();
}
log.info("No datasource for timing unit: {} on: {}", level, conf.logLabel());
return ByteString.EMPTY;
}
Unit timingUnit = lastTiming.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

/**
* @author hal.hildebrand
*
*/
@FunctionalInterface
public interface DataSource {

ByteString getData();
Expand Down
Loading

0 comments on commit 1aab2ab

Please sign in to comment.