Skip to content

Commit

Permalink
consolidate executors - just fork threads - and schedulers. better co…
Browse files Browse the repository at this point in the history
…ncurrency control/management
  • Loading branch information
Hellblazer committed Jun 28, 2024
1 parent fb7f118 commit e40b25c
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 46 deletions.
9 changes: 3 additions & 6 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public CHOAM(Parameters params) {
roundScheduler = new RoundScheduler("CHOAM" + params.member().getId() + params.context().getId(),
params.context().timeToLive());
combine.register(_ -> roundScheduler.tick());
session = new Session(params, service());
session = new Session(params, service(), scheduler);
}

public static Checkpoint checkpoint(DigestAlgorithm algo, File state, int segmentSize, Digest initial, int crowns,
Expand Down Expand Up @@ -360,10 +360,6 @@ public void stop() {
} catch (Throwable e) {
}
session.cancelAll();
try {
session.stop();
} catch (Throwable e) {
}
final var c = current.get();
if (c != null) {
try {
Expand Down Expand Up @@ -760,7 +756,8 @@ private void recover(HashedCertifiedBlock anchor) {
log.info("Recovering from: {} height: {} on: {}", anchor.hash, anchor.height(), params.member().getId());
cancelSynchronization();
cancelBootstrap();
futureBootstrap.set(new Bootstrapper(anchor, params, store, comm).synchronize().whenComplete((s, t) -> {
futureBootstrap.set(
new Bootstrapper(anchor, params, store, comm, scheduler).synchronize().whenComplete((s, t) -> {
if (t == null) {
try {
synchronize(s);
Expand Down
28 changes: 12 additions & 16 deletions choam/src/main/java/com/salesforce/apollo/choam/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,26 @@
*/
public class Session {

private final static Logger log = LoggerFactory.getLogger(
Session.class);
private final Limiter<Void> limiter;
private final Parameters params;
private final Function<SubmittedTransaction, SubmitResult> service;
private final Map<Digest, SubmittedTransaction> submitted = new ConcurrentHashMap<>();
private final AtomicReference<HashedCertifiedBlock> view = new AtomicReference<>();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1,
Thread.ofVirtual()
.factory());
private final AtomicInteger nonce = new AtomicInteger();
private final static Logger log = LoggerFactory.getLogger(Session.class);

public Session(Parameters params, Function<SubmittedTransaction, SubmitResult> service) {
private final Limiter<Void> limiter;
private final Parameters params;
private final Function<SubmittedTransaction, SubmitResult> service;
private final Map<Digest, SubmittedTransaction> submitted = new ConcurrentHashMap<>();
private final AtomicReference<HashedCertifiedBlock> view = new AtomicReference<>();
private final ScheduledExecutorService scheduler;
private final AtomicInteger nonce = new AtomicInteger();

public Session(Parameters params, Function<SubmittedTransaction, SubmitResult> service,
ScheduledExecutorService scheduler) {
this.params = params;
this.service = service;
final var metrics = params.metrics();
this.limiter = params.txnLimiterBuilder()
.build(params.member().getId().shortString(),
metrics == null ? EmptyMetricRegistry.INSTANCE : metrics.getMetricRegistry(
params.context().getId().shortString() + ".txnLimiter"));
this.scheduler = scheduler;
}

public static Transaction transactionOf(Digest source, int nonce, Message message, Signer signer) {
Expand Down Expand Up @@ -117,10 +117,6 @@ public void setView(HashedCertifiedBlock v) {
}
}

public void stop() {
scheduler.shutdown();
}

/**
* Submit a transaction.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -60,11 +59,12 @@ public class Bootstrapper {
private volatile HashedCertifiedBlock genesis;

public Bootstrapper(HashedCertifiedBlock anchor, Parameters params, Store store,
CommonCommunications<Terminal, Concierge> bootstrapComm) {
CommonCommunications<Terminal, Concierge> bootstrapComm, ScheduledExecutorService scheduler) {
this.anchor = anchor;
this.params = params;
this.store = store;
this.comms = bootstrapComm;
this.scheduler = scheduler;
CertifiedBlock g = store.getCertifiedBlock(ULong.valueOf(0));
store.put(anchor);
if (g != null) {
Expand All @@ -75,7 +75,6 @@ public Bootstrapper(HashedCertifiedBlock anchor, Parameters params, Store store,
log.info("Restore using no prior state on: {}", params.member().getId());
lastCheckpoint = null;
}
scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
}

public CompletableFuture<SynchronizedState> synchronize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public void func() throws Exception {
});
return SubmitResult.newBuilder().setResult(Result.PUBLISHED).build();
};
Session session = new Session(params, service);
Session session = new Session(params, service,
Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()));
session.setView(new HashedCertifiedBlock(DigestAlgorithm.DEFAULT, CertifiedBlock.newBuilder()
.setBlock(Block.newBuilder()
.setHeader(
Expand Down Expand Up @@ -136,7 +137,7 @@ public void scalingTest() throws Exception {
MetricRegistry reg = new MetricRegistry();
Timer latency = reg.timer("Transaction latency");

Session session = new Session(params, service);
Session session = new Session(params, service, scheduler);
session.setView(new HashedCertifiedBlock(DigestAlgorithm.DEFAULT, CertifiedBlock.newBuilder()
.setBlock(Block.newBuilder()
.setHeader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -107,7 +108,7 @@ public void smoke() throws Exception {
context)
.setMember(member)
.build()), store,
comms);
comms, Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()));

CompletableFuture<SynchronizedState> syncFuture = boot.synchronize();
SynchronizedState state = syncFuture.get(10, TimeUnit.SECONDS);
Expand Down
19 changes: 14 additions & 5 deletions fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public class View {
private final RingCommunications<Participant, Fireflies> gossiper;
private final AtomicBoolean introduced = new AtomicBoolean();
private final Map<String, Consumer<ViewChange>> viewChangeListeners = new HashMap<>();
private final Executor viewNotificationQueue;
private final Semaphore viewSerialization = new Semaphore(1);
private final FireflyMetrics metrics;
private final Node node;
private final Map<Digest, SVU> observations = new ConcurrentSkipListMap<>();
Expand All @@ -111,7 +111,6 @@ public class View {
private final Verifiers verifiers;
private final ScheduledExecutorService scheduler;
private volatile ScheduledFuture<?> futureGossip;
private volatile boolean boostrap = false;

public View(DynamicContext<Participant> context, ControlledIdentifierMember member, String endpoint,
EventValidation validation, Verifiers verifiers, Router communications, Parameters params,
Expand Down Expand Up @@ -144,7 +143,6 @@ public View(DynamicContext<Participant> context, ControlledIdentifierMember memb
gossiper.ignoreSelf();
this.validation = validation;
this.verifiers = verifiers;
viewNotificationQueue = Executors.newSingleThreadExecutor(Thread.ofVirtual().factory());
viewChange = new ReentrantReadWriteLock(true);
}

Expand Down Expand Up @@ -243,6 +241,7 @@ public void stop() {
if (!started.compareAndSet(true, false)) {
return;
}
viewSerialization.release(10000);
roundTimers.reset();
comm.deregister(context.getId());
pendingRebuttals.clear();
Expand Down Expand Up @@ -328,7 +327,6 @@ boolean addToView(NoteWrapper note) {
}

void bootstrap(NoteWrapper nw, Duration dur) {
boostrap = true;
viewManagement.bootstrap(nw, dur);
}

Expand Down Expand Up @@ -415,13 +413,24 @@ void notifyListeners(List<SelfAddressingIdentifier> joining, List<Digest> leavin
joining.stream().map(SelfAddressingIdentifier::getDigest).toList(),
Collections.unmodifiableList(leaving));
viewChangeListeners.forEach((key, value) -> {
viewNotificationQueue.execute(Utils.wrapped(() -> {
Thread.ofVirtual().start(Utils.wrapped(() -> {
try {
viewSerialization.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
if (!started.get()) {
return;
}
try {
log.trace("Notifying: {} view change: {} cardinality: {} joins: {} leaves: {} on: {} ", key,
currentView(), context.size(), joining.size(), leaving.size(), node.getId());
value.accept(viewChange);
} catch (Throwable e) {
log.error("error in view change listener: {} on: {} ", key, node.getId(), e);
} finally {
viewSerialization.release();
}
}, log));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public void start(Duration duration, Predicate<FernetServerInterceptor.HashedTok
var initialDelay = Entropy.nextBitsStreamLong(duration.toMillis());
log.info("Starting Reliable Broadcaster[{}] for {}", context.getId(), member.getId());
comm.register(context.getId(), new Service(), validator);
var scheduler = Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory());
var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(() -> oneRound(duration, scheduler), log)),
initialDelay, TimeUnit.MILLISECONDS);
}
Expand Down
29 changes: 17 additions & 12 deletions sql-state/src/main/java/com/salesforce/apollo/state/Emulator.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.sql.Connection;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -47,15 +49,16 @@
*/
public class Emulator {

private final AtomicReference<Digest> hash;
private final AtomicLong height = new AtomicLong(0);
private final ReentrantLock lock = new ReentrantLock();
private final Mutator mutator;
private final Parameters params;
private final SqlStateMachine ssm;
private final AtomicBoolean started = new AtomicBoolean();
private final TransactionExecutor txnExec;
private final AtomicInteger txnIndex = new AtomicInteger(0);
private final AtomicReference<Digest> hash;
private final AtomicLong height = new AtomicLong(0);
private final ReentrantLock lock = new ReentrantLock();
private final Mutator mutator;
private final Parameters params;
private final SqlStateMachine ssm;
private final AtomicBoolean started = new AtomicBoolean();
private final TransactionExecutor txnExec;
private final AtomicInteger txnIndex = new AtomicInteger(0);
private final ScheduledExecutorService scheduler;

public Emulator() throws IOException {
this(DigestAlgorithm.DEFAULT.getOrigin().prefix(Entropy.nextBitsStreamLong()));
Expand All @@ -64,11 +67,13 @@ public Emulator() throws IOException {
public Emulator(Digest base) throws IOException {
this(new SqlStateMachine(DigestAlgorithm.DEFAULT.getOrigin(),
String.format("jdbc:h2:mem:emulation-%s-%s", base, Entropy.nextBitsStreamLong()),
new Properties(), Files.createTempDirectory("emulation").toFile()), base);
new Properties(), Files.createTempDirectory("emulation").toFile()), base,
Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()));
}

public Emulator(SqlStateMachine ssm, Digest base) {
public Emulator(SqlStateMachine ssm, Digest base, ScheduledExecutorService scheduler) throws IOException {
this.ssm = ssm;
this.scheduler = scheduler;
txnExec = this.ssm.getExecutor();
hash = new AtomicReference<>(base);
SecureRandom entropy;
Expand Down Expand Up @@ -97,7 +102,7 @@ public Emulator(SqlStateMachine ssm, Digest base) {
} finally {
lock.unlock();
}
});
}, scheduler);
session.setView(new HashedCertifiedBlock(DigestAlgorithm.DEFAULT, CertifiedBlock.newBuilder()
.setBlock(Block.newBuilder()
.setHeader(
Expand Down

0 comments on commit e40b25c

Please sign in to comment.