Skip to content

Commit

Permalink
consolidate executors - just fork threads - and schedulers. better co…
Browse files Browse the repository at this point in the history
…ncurrency control/management
  • Loading branch information
Hellblazer committed Jun 28, 2024
1 parent 1aab2ab commit fb7f118
Show file tree
Hide file tree
Showing 22 changed files with 123 additions and 103 deletions.
69 changes: 40 additions & 29 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,10 @@ public class CHOAM {
private final ReliableBroadcaster combine;
private final CommonCommunications<Terminal, Concierge> comm;
private final AtomicReference<Committee> current = new AtomicReference<>();
private final ExecutorService executions;
private final AtomicReference<CompletableFuture<SynchronizedState>> futureBootstrap = new AtomicReference<>();
private final AtomicReference<ScheduledFuture<?>> futureSynchronization = new AtomicReference<>();
private final AtomicReference<HashedCertifiedBlock> genesis = new AtomicReference<>();
private final AtomicReference<HashedCertifiedBlock> head = new AtomicReference<>();
private final ExecutorService linear;
private final AtomicReference<nextView> next = new AtomicReference<>();
private final AtomicReference<Digest> nextViewId = new AtomicReference<>();
private final Parameters params;
Expand All @@ -101,13 +99,13 @@ public class CHOAM {
private final AtomicReference<HashedCertifiedBlock> view = new AtomicReference<>();
private final PendingViews pendingViews = new PendingViews();
private final ScheduledExecutorService scheduler;
private final Semaphore linear = new Semaphore(1);
private volatile AtomicBoolean ongoingJoin;

public CHOAM(Parameters params) {
scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
this.store = new Store(params.digestAlgorithm(), params.mvBuilder().clone().build());
this.params = params;
executions = Executors.newVirtualThreadPerTaskExecutor();
pendingViews.add(params.context().getId(), params.context().delegate());

rotateViewKeys();
Expand All @@ -118,14 +116,17 @@ public CHOAM(Parameters params) {
combine = new ReliableBroadcaster(bContext, params.member(), params.combine(), params.communications(),
params.metrics() == null ? null : params.metrics().getCombineMetrics(),
adapter);
linear = Executors.newSingleThreadExecutor(
Thread.ofVirtual().name("Linear " + params.member().getId()).factory());
combine.registerHandler((_, messages) -> {
try {
linear.execute(Utils.wrapped(() -> combine(messages), log));
} catch (RejectedExecutionException e) {
// ignore
}
Thread.ofVirtual().start(() -> {
if (!started.get()) {
return;
}
try {
combine(messages);
} catch (Throwable t) {
log.error("Failed to combine messages on: {}", params.member().getId(), t);
}
});
});
head.set(new NullBlock(params.digestAlgorithm()));
view.set(new NullBlock(params.digestAlgorithm()));
Expand Down Expand Up @@ -353,10 +354,7 @@ public void stop() {
if (!started.compareAndSet(true, false)) {
return;
}
try {
linear.shutdownNow();
} catch (Throwable e) {
}
linear.release(10000);
try {
scheduler.shutdownNow();
} catch (Throwable e) {
Expand All @@ -366,10 +364,6 @@ public void stop() {
session.stop();
} catch (Throwable e) {
}
try {
executions.shutdownNow();
} catch (Throwable e) {
}
final var c = current.get();
if (c != null) {
try {
Expand Down Expand Up @@ -593,7 +587,7 @@ private void execute(List<Transaction> execs) {
try {
params.processor()
.execute(index, CHOAM.hashOf(exec, params.digestAlgorithm()), exec,
stxn == null ? null : stxn.onCompletion(), executions);
stxn == null ? null : stxn.onCompletion());
} catch (Throwable t) {
log.error("Exception processing transaction: {} block: {} height: {} on: {}", hash, h.hash, h.height(),
params.member().getId());
Expand Down Expand Up @@ -951,14 +945,13 @@ private void synchronize(SynchronizedState state) {
log.info("Synchronized, resuming view: {} deferred blocks: {} on: {}",
state.lastCheckpoint() != null ? state.lastCheckpoint().hash : state.genesis().hash, pending.size(),
params.member().getId());
try {
linear.execute(Utils.wrapped(() -> {
transitions.synchd();
transitions.combine();
}, log));
} catch (RejectedExecutionException e) {
// ignore
}
Thread.ofVirtual().start(Utils.wrapped(() -> {
if (!started.get()) {
return;
}
transitions.synchd();
transitions.combine();
}, log));
}

private void synchronizedProcess(CertifiedBlock certifiedBlock) {
Expand Down Expand Up @@ -1052,7 +1045,7 @@ default void endBlock(ULong height, Digest hash) {
}

@SuppressWarnings("rawtypes")
void execute(int index, Digest hash, Transaction tx, CompletableFuture onComplete, Executor executor);
void execute(int index, Digest hash, Transaction tx, CompletableFuture onComplete);

default void genesis(Digest hash, List<Transaction> initialization) {
}
Expand Down Expand Up @@ -1209,7 +1202,25 @@ public void cancelTimer(String timer) {

@Override
public void combine() {
CHOAM.this.combine();
Thread.ofVirtual().start(Utils.wrapped(() -> {
if (!started.get()) {
return;
}
try {
linear.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
try {
CHOAM.this.combine();
} finally {
linear.release();
}
}, log));
if (linear.getQueueLength() > 0) {
log.info("Linear Q: {} on: {}", linear.getQueueLength(), params.member().getId());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ public static class Builder implements Cloneable {
private Supplier<KERL_> kerl = () -> KERL_.getDefaultInstance();
private SigningMember member;
private ChoamMetrics metrics;
private TransactionExecutor processor = (i, h, t, f, exec) -> {
private TransactionExecutor processor = (i, h, t, f) -> {
};
private BiConsumer<HashedBlock, CheckpointState> restorer = (height, checkpointState) -> {
};
Expand Down
15 changes: 10 additions & 5 deletions choam/src/main/java/com/salesforce/apollo/choam/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,13 @@ public class Producer {
private final Transitions transitions;
private final ViewContext view;
private final Digest nextViewId;
private final ExecutorService serialize = Executors.newSingleThreadExecutor();
private final Semaphore serialize = new Semaphore(1);
private final ViewAssembly assembly;
private final int maxEpoch;
private final ScheduledExecutorService scheduler;
private volatile boolean assembled = false;

public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint, String label,
ScheduledExecutorService scheduler) {
this.scheduler = scheduler;
assert view != null;
this.view = view;
this.previousBlock.set(lastBlock);
Expand Down Expand Up @@ -150,7 +148,7 @@ public void stop() {
return;
}
log.trace("Closing producer for: {} on: {}", getViewId(), params().member().getId());
serialize.shutdown();
serialize.release(10000);
controller.stop();
coordinator.stop();
ds.close();
Expand Down Expand Up @@ -353,13 +351,20 @@ private void reconfigure() {
}

private void serial(List<ByteString> preblock, Boolean last) {
serialize.execute(() -> {
Thread.ofVirtual().start(() -> {
try {
serialize.acquire();
create(preblock, last);
} catch (Throwable t) {
log.error("Error processing preblock last: {} on: {}", last, params().member().getId(), t);
} finally {
serialize.release();
}
});
var awaiting = serialize.getQueueLength();
if (awaiting > 0) {
log.error("Serialize: {} on: {}", awaiting, params().member().getId());
}
}

private PendingBlock validate(Validate v) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ public void start() {

void assemble(List<Assemblies> asses) {
if (!started.get()) {
if (!asses.isEmpty()) {
var viewz = asses.stream().flatMap(a -> a.getViewsList().stream()).toList();
var joinz = asses.stream().flatMap(a -> a.getJoinsList().stream()).toList();
log.debug("Not started, ignoring assemblies: {} joins: {} views: {} on: {}", asses.size(), joinz.size(),
viewz.size(), params().member().getId());
}
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public void tearDown() throws Exception {
}

private CHOAM constructCHOAM(SigningMember m, Parameters.Builder params, Context<Member> context) {
final CHOAM.TransactionExecutor processor = (index, hash, t, f, executor) -> {
final CHOAM.TransactionExecutor processor = (index, hash, t, f) -> {
if (f != null) {
f.completeAsync(Object::new, executor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ public SigningMember initialize(int checkpointBlockSize, int cardinality) throws
}

private CHOAM constructCHOAM(SigningMember m, Parameters.Builder params, boolean testSubject) {
final TransactionExecutor processor = (index, hash, t, f, executor) -> {
final TransactionExecutor processor = (_, _, _, f) -> {
if (f != null) {
f.completeAsync(Object::new, executor);
f.completeAsync(Object::new);
}
};
params.getProducer().ethereal().setSigner(m);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void before() throws Exception {

@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void execute(int index, Digest hash, Transaction t, CompletableFuture f, Executor executor) {
public void execute(int index, Digest hash, Transaction t, CompletableFuture f) {
if (f != null) {
f.completeAsync(() -> new Object(), executor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,10 @@ assert parentsOnPreviousLevel(u) >= quorum : "Parents: " + Arrays.asList(u.paren
private ByteString getData(int level) {
if (level < conf.lastLevel()) {
if (ds != null) {
log.info("Requesting timing unit: {} on: {}", level, conf.logLabel());
log.trace("Requesting timing unit: {} on: {}", level, conf.logLabel());
return ds.getData();
}
log.info("No datasource for timing unit: {} on: {}", level, conf.logLabel());
log.trace("No datasource for timing unit: {} on: {}", level, conf.logLabel());
return ByteString.EMPTY;
}
Unit timingUnit = lastTiming.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.time.Duration;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -46,25 +45,26 @@
* @author hal.hildebrand
*/
public class ChRbcGossip {
private static final Logger log = LoggerFactory.getLogger(ChRbcGossip.class);

private static final Logger log = LoggerFactory.getLogger(
ChRbcGossip.class);
private final CommonCommunications<Gossiper, GossiperService> comm;
private final Digest id;
private final SigningMember member;
private final EtherealMetrics metrics;
private final Processor processor;
private final SliceIterator<Gossiper> ring;
private final AtomicBoolean started = new AtomicBoolean();
private final Terminal terminal = new Terminal();
private volatile ScheduledFuture<?> scheduled;
private final CommonCommunications<Gossiper, GossiperService> comm;
private final Digest id;
private final SigningMember member;
private final EtherealMetrics metrics;
private final Processor processor;
private final SliceIterator<Gossiper> ring;
private final AtomicBoolean started = new AtomicBoolean();
private final Terminal terminal = new Terminal();
private final ScheduledExecutorService scheduler;
private volatile ScheduledFuture<?> scheduled;

public ChRbcGossip(Digest id, SigningMember member, Collection<Member> membership, Processor processor,
Router communications, EtherealMetrics m, ScheduledExecutorService scheduler) {
this.processor = processor;
this.member = member;
this.metrics = m;
this.id = id;
this.scheduler = scheduler;
comm = communications.create(member, id, terminal, getClass().getCanonicalName(),
r -> new GossiperServer(communications.getClientIdentityProvider(), metrics, r),
getCreate(metrics), Gossiper.getLocalLoopback(member));
Expand All @@ -90,7 +90,6 @@ public void start(Duration duration, Predicate<FernetServerInterceptor.HashedTok
log.trace("Starting GossipService[{}] on: {}", id, member.getId());
comm.register(id, terminal, validator);
try {
var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(() -> {
try {
gossip(duration, scheduler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import com.salesforce.apollo.choam.Parameters.ProducerParameters;
import com.salesforce.apollo.choam.Parameters.RuntimeParameters;
import com.salesforce.apollo.choam.proto.FoundationSeal;
import com.salesforce.apollo.context.Context;
import com.salesforce.apollo.context.DynamicContextImpl;
import com.salesforce.apollo.context.ViewChange;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.DigestAlgorithm;
import com.salesforce.apollo.delphinius.Oracle;
Expand All @@ -40,7 +40,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -201,7 +201,8 @@ public void before() throws Exception {
Duration.ofMinutes(1),
"jdbc:h2:mem:%s-dht".formatted(digest),
checkpointDirBase, Duration.ofMillis(10), 0.00125,
Duration.ofMinutes(1), 3, 10, 0.1);
Duration.ofMinutes(1), 3, Duration.ofMillis(100),
10, 0.1);
var node = new ProcessContainerDomain(group, member, pdParams, params, RuntimeParameters.newBuilder()
.setFoundation(
sealed)
Expand All @@ -223,17 +224,17 @@ public void smokin() throws Exception {
final var seeds = Collections.singletonList(
new Seed(domains.getFirst().getMember().getIdentifier().getIdentifier(), EndpointProvider.allocatePort()));
domains.forEach(d -> {
BiConsumer<Context, Digest> c = (context, viewId) -> {
if (context.cardinality() == CARDINALITY) {
System.out.printf("Full view: %s members: %s on: %s%n", viewId, context.cardinality(),
d.getMember().getId());
Consumer<ViewChange> c = viewChange -> {
if (viewChange.context().cardinality() == CARDINALITY) {
System.out.printf("Full view: %s members: %s on: %s%n", viewChange.diadem(),
viewChange.context().cardinality(), d.getMember().getId());
countdown.countDown();
} else {
System.out.printf("Members joining: %s members: %s on: %s%n", viewId, context.cardinality(),
d.getMember().getId());
System.out.printf("Members joining: %s members: %s on: %s%n", viewChange.diadem(),
viewChange.context().cardinality(), d.getMember().getId());
}
};
d.getFoundation().register(c);
d.getFoundation().register("foo", c);
});
// start seed
final var started = new AtomicReference<>(new CountDownLatch(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ protected Consumer<ViewChange> listener() {
}

protected void startServices() {
dht.start(params.gossipDuration());
dht.start(parameters.kerlSpaceDuration);
}

protected void stopServices() {
Expand All @@ -143,7 +143,7 @@ protected void stopServices() {

public record ProcessDomainParameters(String dbURL, Duration dhtOperationsTimeout, String dhtDbUrl,
Path checkpointBaseDir, Duration dhtOpsFrequency, double dhtFpr,
Duration dhtEventValidTO, int dhtBias, int jdbcMaxConnections,
double dhtPbyz) {
Duration dhtEventValidTO, int dhtBias, Duration kerlSpaceDuration,
int jdbcMaxConnections, double dhtPbyz) {
}
}
Loading

0 comments on commit fb7f118

Please sign in to comment.