From 1aab2ab1c088811a7d8babc5b2177c8f1902c23c Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Wed, 26 Jun 2024 19:26:53 -0700 Subject: [PATCH] Max length BLOBs for all binary values. reuse/close scheduled executors. Pretty massive refactoring of the stereotomy and thoth schemas. :: sigh :: --- .../com/salesforce/apollo/choam/CHOAM.java | 61 ++-- .../apollo/choam/GenesisAssembly.java | 60 ++-- .../com/salesforce/apollo/choam/Producer.java | 11 +- .../com/salesforce/apollo/choam/Session.java | 4 + .../choam/support/CheckpointAssembler.java | 2 +- .../apollo/choam/GenesisAssemblyTest.java | 4 +- .../apollo/cryptography/Digest.java | 18 +- .../apollo/cryptography/DigestAlgorithm.java | 14 +- .../salesforce/apollo/ethereal/Creator.java | 2 + .../apollo/ethereal/DataSource.java | 2 +- .../salesforce/apollo/ethereal/Ethereal.java | 3 +- .../ethereal/memberships/ChRbcGossip.java | 5 +- .../apollo/ethereal/EtherealTest.java | 7 +- .../salesforce/apollo/fireflies/Binding.java | 17 +- .../com/salesforce/apollo/fireflies/View.java | 38 +-- .../apollo/fireflies/ViewManagement.java | 31 +- .../apollo/gorgoneion/Gorgoneion.java | 15 +- .../apollo/archipelago/MtlsClient.java | 5 + .../apollo/archipelago/MtlsServer.java | 8 +- .../salesforce/apollo/ring/SliceIterator.java | 13 +- .../apollo/context/ContextTests.java | 6 +- .../apollo/context/StaticContextTest.java | 6 +- .../model/stereotomy/ShardedKERLTest.java | 6 +- .../src/main/resources/sql-state/internal.xml | 2 +- .../main/resources/stereotomy/stereotomy.xml | 173 +++++----- schemas/src/main/resources/thoth/thoth.xml | 210 ++++++------ .../apollo/stereotomy/db/UniKERL.java | 35 +- .../apollo/stereotomy/StereotomyTests.java | 4 +- .../com/salesforce/apollo/thoth/KerlDHT.java | 2 +- .../salesforce/apollo/thoth/KerlSpace.java | 53 +-- .../apollo/thoth/KerlSpaceTest.java | 4 +- .../com/salesforce/apollo/thoth/KerlTest.java | 4 +- .../java/com/chiralbehaviors/tron/Fsm.java | 313 ++++++++---------- 33 files changed, 577 insertions(+), 561 deletions(-) diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index 10a1b8fc7c..55f0aac294 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -100,9 +100,11 @@ public class CHOAM { private final TransSubmission txnSubmission = new TransSubmission(); private final AtomicReference view = new AtomicReference<>(); private final PendingViews pendingViews = new PendingViews(); + private final ScheduledExecutorService scheduler; private volatile AtomicBoolean ongoingJoin; public CHOAM(Parameters params) { + scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); this.store = new Store(params.digestAlgorithm(), params.mvBuilder().clone().build()); this.params = params; executions = Executors.newVirtualThreadPerTaskExecutor(); @@ -351,11 +353,19 @@ public void stop() { if (!started.compareAndSet(true, false)) { return; } - session.cancelAll(); try { linear.shutdownNow(); } catch (Throwable e) { } + try { + scheduler.shutdownNow(); + } catch (Throwable e) { + } + session.cancelAll(); + try { + session.stop(); + } catch (Throwable e) { + } try { executions.shutdownNow(); } catch (Throwable e) { @@ -1393,31 +1403,28 @@ private void join(View view) { var joined = new AtomicInteger(); var halt = new AtomicBoolean(false); ongoingJoin = halt; - Thread.ofVirtual().start(Utils.wrapped(() -> { - log.trace("Starting join of: {} diadem {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), - params.member().getId()); - - var scheduler = Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory()); - AtomicReference action = new AtomicReference<>(); - var attempts = new AtomicInteger(); - action.set(() -> { - log.trace("Join attempt: {} halt: {} joined: {} majority: {} on: {}", attempts.incrementAndGet(), - halt.get(), joined.get(), view.getMajority(), params.member().getId()); - if (!halt.get() & joined.get() < view.getMajority()) { - join(view, servers, joined); - if (joined.get() >= view.getMajority()) { - ongoingJoin = null; - log.trace("Finished join of: {} diadem: {} joins: {} on: {}", nextViewId.get(), - Digest.from(view.getDiadem()), joined.get(), params.member().getId()); - } else if (!halt.get()) { - log.trace("Rescheduling join of: {} diadem: {} joins: {} on: {}", nextViewId.get(), - Digest.from(view.getDiadem()), joined.get(), params.member().getId()); - scheduler.schedule(action.get(), 50, TimeUnit.MILLISECONDS); - } + log.trace("Starting join of: {} diadem {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), + params.member().getId()); + var scheduler = Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory()); + AtomicReference action = new AtomicReference<>(); + var attempts = new AtomicInteger(); + action.set(() -> { + log.trace("Join attempt: {} halt: {} joined: {} majority: {} on: {}", attempts.incrementAndGet(), + halt.get(), joined.get(), view.getMajority(), params.member().getId()); + if (!halt.get() & joined.get() < view.getMajority()) { + join(view, servers, joined); + if (joined.get() >= view.getMajority()) { + ongoingJoin = null; + log.trace("Finished join of: {} diadem: {} joins: {} on: {}", nextViewId.get(), + Digest.from(view.getDiadem()), joined.get(), params.member().getId()); + } else if (!halt.get()) { + log.trace("Rescheduling join of: {} diadem: {} joins: {} on: {}", nextViewId.get(), + Digest.from(view.getDiadem()), joined.get(), params.member().getId()); + scheduler.schedule(action.get(), 50, TimeUnit.MILLISECONDS); } - }); - scheduler.schedule(action.get(), 50, TimeUnit.MILLISECONDS); - }, log())); + } + }); + scheduler.schedule(action.get(), 50, TimeUnit.MILLISECONDS); } private void join(View view, Collection members, AtomicInteger joined) { @@ -1520,7 +1527,7 @@ private class Associate extends Administration { var pv = pendingViews(); producer = new Producer(nextViewId.get(), new ViewContext(context, params, pv, signer, validators, constructBlock()), - head.get(), checkpoint.get(), getLabel()); + head.get(), checkpoint.get(), getLabel(), scheduler); producer.start(); } @@ -1575,7 +1582,7 @@ private Formation() { .setVm(inView) .setSignature(params.member().sign(inView.toByteString()).toSig()) .build(); - assembly = new GenesisAssembly(vc, comm, svm, getLabel()); + assembly = new GenesisAssembly(vc, comm, svm, getLabel(), scheduler); log.info("Setting next view id to genesis: {} on: {}", params.genesisViewId(), params.member().getId()); nextViewId.set(params.genesisViewId()); } else { diff --git a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java index a13861270c..85638710f9 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java @@ -16,7 +16,6 @@ import com.salesforce.apollo.choam.support.HashedBlock; import com.salesforce.apollo.choam.support.HashedCertifiedBlock; import com.salesforce.apollo.choam.support.HashedCertifiedBlock.NullBlock; -import com.salesforce.apollo.choam.support.OneShot; import com.salesforce.apollo.context.Context; import com.salesforce.apollo.context.StaticContext; import com.salesforce.apollo.cryptography.Digest; @@ -32,7 +31,7 @@ import java.security.PublicKey; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -45,26 +44,27 @@ * @author hal.hildebrand */ public class GenesisAssembly implements Genesis { - private static final Logger log = LoggerFactory.getLogger(GenesisAssembly.class); - private final Ethereal controller; - private final ChRbcGossip coordinator; - private final SignedViewMember genesisMember; - private final Map nextAssembly; - private final AtomicBoolean published = new AtomicBoolean(); - private final Map slate = new ConcurrentHashMap<>(); - private final AtomicBoolean started = new AtomicBoolean(); - private final Transitions transitions; - private final ViewContext view; - private final Map witnesses = new ConcurrentHashMap<>(); - private final OneShot ds; - private final List pendingValidations = new ArrayList<>(); - private volatile Thread blockingThread; - private volatile HashedBlock reconfiguration; + private static final Logger log = LoggerFactory.getLogger(GenesisAssembly.class); + private final Ethereal controller; + private final ChRbcGossip coordinator; + private final SignedViewMember genesisMember; + private final Map nextAssembly; + private final AtomicBoolean published = new AtomicBoolean(); + private final Map slate = new ConcurrentHashMap<>(); + private final AtomicBoolean started = new AtomicBoolean(); + private final Transitions transitions; + private final ViewContext view; + private final Map witnesses = new ConcurrentHashMap<>(); + private final BlockingDeque ds; + private final List pendingValidations = new ArrayList<>(); + private final ScheduledExecutorService scheduler; + private volatile HashedBlock reconfiguration; public GenesisAssembly(ViewContext vc, CommonCommunications comms, SignedViewMember genesisMember, - String label) { + String label, ScheduledExecutorService scheduler) { view = vc; - ds = new OneShot(); + this.scheduler = scheduler; + ds = new LinkedBlockingDeque<>(1024); Digest hash = view.context().getId(); nextAssembly = ((Set) ((Context) view.pendingViews().last().context()).bftSubset( hash)).stream().collect(Collectors.toMap(Member::getId, m -> m)); @@ -99,7 +99,8 @@ public GenesisAssembly(ViewContext vc, CommonCommunications comms, transitions::process, transitions::nextEpoch, label); coordinator = new ChRbcGossip(reContext.getId(), params().member(), nextAssembly.values(), controller.processor(), params().communications(), - params().metrics() == null ? null : params().metrics().getGensisMetrics()); + params().metrics() == null ? null : params().metrics().getGensisMetrics(), + scheduler); log.debug("Genesis Assembly: {} recontext: {} next assembly: {} on: {}", view.context().getId(), reContext.getId(), nextAssembly.keySet(), params().member().getId()); } @@ -117,7 +118,7 @@ public void certify() { var validate = view.generateValidation(reconfiguration); log.debug("Certifying genesis block: {} for: {} slate: {} on: {}", reconfiguration.hash, view.context().getId(), slate.keySet().stream().sorted().toList(), params().member().getId()); - ds.setValue(validate.toByteString()); + ds.add(validate.toByteString()); witnesses.put(params().member(), validate); pendingValidations.forEach(v -> certify(v)); } @@ -140,7 +141,7 @@ public void gather() { var join = Join.newBuilder().setMember(genesisMember).setKerl(params().kerl().get()).build(); slate.put(params().member().getId(), join); - ds.setValue(join.toByteString()); + ds.add(join.toByteString()); coordinator.start(params().producer().gossipDuration()); controller.start(); } @@ -230,11 +231,6 @@ public void stop() { log.trace("Stopping genesis assembly: {} on: {}", view.context().getId(), params().member().getId()); coordinator.stop(); controller.stop(); - final var cur = blockingThread; - blockingThread = null; - if (cur != null) { - cur.interrupt(); - } } private void certify(Validate v) { @@ -262,11 +258,11 @@ private DataSource dataSource() { return ByteString.EMPTY; } try { - blockingThread = Thread.currentThread(); - final var take = ds.get(); - return take; - } finally { - blockingThread = null; + var data = ds.poll(100, TimeUnit.MILLISECONDS); + return data == null ? ByteString.EMPTY : data; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; } }; } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java index 9538fe8ce9..b1405093d6 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java @@ -52,13 +52,15 @@ public class Producer { private final Transitions transitions; private final ViewContext view; private final Digest nextViewId; - private final Executor serialize = Executors.newSingleThreadExecutor( - Thread.ofVirtual().factory()); + private final ExecutorService serialize = Executors.newSingleThreadExecutor(); private final ViewAssembly assembly; private final int maxEpoch; + private final ScheduledExecutorService scheduler; private volatile boolean assembled = false; - public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint, String label) { + public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint, String label, + ScheduledExecutorService scheduler) { + this.scheduler = scheduler; assert view != null; this.view = view; this.previousBlock.set(lastBlock); @@ -99,7 +101,7 @@ public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, Hash controller = new Ethereal(config.build(), params().producer().maxBatchByteSize() + (8 * 1024), ds, this::serial, this::newEpoch, label); coordinator = new ChRbcGossip(view.context().getId(), params().member(), view.membership(), - controller.processor(), params().communications(), producerMetrics); + controller.processor(), params().communications(), producerMetrics, scheduler); log.debug("Roster for: {} is: {} on: {}", getViewId(), view.roster(), params().member().getId()); var onConsensus = new CompletableFuture(); @@ -148,6 +150,7 @@ public void stop() { return; } log.trace("Closing producer for: {} on: {}", getViewId(), params().member().getId()); + serialize.shutdown(); controller.stop(); coordinator.stop(); ds.close(); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Session.java b/choam/src/main/java/com/salesforce/apollo/choam/Session.java index 70d2148eae..f21d652adc 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Session.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Session.java @@ -117,6 +117,10 @@ public void setView(HashedCertifiedBlock v) { } } + public void stop() { + scheduler.shutdown(); + } + /** * Submit a transaction. * diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java b/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java index 4d4f8fd903..e66789285a 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java @@ -118,7 +118,7 @@ private void gossip(ScheduledExecutorService scheduler, Duration duration) { diadem.compactWrapped(), member.getId()); var ringer = new SliceIterator<>("Assembly[%s:%s]".formatted(diadem.compactWrapped(), member.getId()), member, - committee, comms); + committee, comms, scheduler); ringer.iterate((link) -> { log.debug("Requesting Seeding from: {} on: {}", link.getMember().getId(), member.getId()); return gossip(link); diff --git a/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java b/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java index b113b13c8e..c44a2416e3 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java @@ -40,6 +40,7 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -175,7 +176,8 @@ public Block reconfigure(Map joining, Digest nextViewId, HashedBlo .setVm(vm) .setSignature(((SigningMember) m).sign(vm.toByteString()).toSig()) .build(); - genii.put(m, new GenesisAssembly(view, comms.get(m), svm, m.getId().toString())); + genii.put(m, new GenesisAssembly(view, comms.get(m), svm, m.getId().toString(), + Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()))); }); try { diff --git a/cryptography/src/main/java/com/salesforce/apollo/cryptography/Digest.java b/cryptography/src/main/java/com/salesforce/apollo/cryptography/Digest.java index 2c1f7761d3..15bb3aa2b1 100644 --- a/cryptography/src/main/java/com/salesforce/apollo/cryptography/Digest.java +++ b/cryptography/src/main/java/com/salesforce/apollo/cryptography/Digest.java @@ -16,13 +16,15 @@ import java.util.UUID; import java.util.stream.Stream; +import static com.salesforce.apollo.cryptography.DigestAlgorithm.EMPTY; + /** * A computed digest * * @author hal.hildebrand */ public class Digest implements Comparable { - public static final Digest NONE = new Digest(DigestAlgorithm.NONE, new byte[0]) { + public static final Digest NONE = new Digest(DigestAlgorithm.NONE, new long[] { 0L }) { @Override public String toString() { @@ -74,11 +76,15 @@ public Digest(DigestAlgorithm algo, long[] hash) { public Digest(Digeste d) { algorithm = DigestAlgorithm.fromDigestCode(d.getType()); - assert d.getHashCount() == algorithm.longLength(); - hash = new long[d.getHashCount()]; - int i = 0; - for (long l : d.getHashList()) { - hash[i++] = l; + if (algorithm.equals(DigestAlgorithm.NONE)) { + hash = EMPTY; + } else { + assert d.getHashCount() == algorithm.longLength(); + hash = new long[d.getHashCount()]; + int i = 0; + for (long l : d.getHashList()) { + hash[i++] = l; + } } } diff --git a/cryptography/src/main/java/com/salesforce/apollo/cryptography/DigestAlgorithm.java b/cryptography/src/main/java/com/salesforce/apollo/cryptography/DigestAlgorithm.java index 25bb3adb77..7f64934d7c 100644 --- a/cryptography/src/main/java/com/salesforce/apollo/cryptography/DigestAlgorithm.java +++ b/cryptography/src/main/java/com/salesforce/apollo/cryptography/DigestAlgorithm.java @@ -222,7 +222,12 @@ public byte digestCode() { @Override public int digestLength() { - return 0; + return 8; + } + + @Override + public int longLength() { + return 1; } @Override @@ -237,12 +242,12 @@ public Digest getOrigin() { @Override public byte[] hashOf(byte[] bytes, int len) { - return EMPTY; + return EMPTY_BYTES; } @Override public byte[] hashOf(InputStream is) { - return EMPTY; + return EMPTY_BYTES; } }, SHA2_256 { @Override @@ -316,7 +321,8 @@ public int digestLength() { public static final DigestAlgorithm DEFAULT = BLAKE2B_256; public static final long MAX_UNSIGNED_LONG = -1L; - private static final byte[] EMPTY = new byte[0]; + public static final long[] EMPTY = new long[] { 0L }; + public static final byte[] EMPTY_BYTES = new byte[] { 0, 0, 0, 0, 0, 0, 0, 0 }; private static final long[] LAST_32 = new long[4]; private static final long[] LAST_64 = new long[8]; private static final ThreadLocal MESSAGE_DIGEST = ThreadLocal.withInitial(() -> new DigestCache()); diff --git a/ethereal/src/main/java/com/salesforce/apollo/ethereal/Creator.java b/ethereal/src/main/java/com/salesforce/apollo/ethereal/Creator.java index 2a8d8de5fd..307851b7eb 100644 --- a/ethereal/src/main/java/com/salesforce/apollo/ethereal/Creator.java +++ b/ethereal/src/main/java/com/salesforce/apollo/ethereal/Creator.java @@ -173,8 +173,10 @@ assert parentsOnPreviousLevel(u) >= quorum : "Parents: " + Arrays.asList(u.paren private ByteString getData(int level) { if (level < conf.lastLevel()) { if (ds != null) { + log.info("Requesting timing unit: {} on: {}", level, conf.logLabel()); return ds.getData(); } + log.info("No datasource for timing unit: {} on: {}", level, conf.logLabel()); return ByteString.EMPTY; } Unit timingUnit = lastTiming.poll(); diff --git a/ethereal/src/main/java/com/salesforce/apollo/ethereal/DataSource.java b/ethereal/src/main/java/com/salesforce/apollo/ethereal/DataSource.java index a61d8afb55..4c19d3a077 100644 --- a/ethereal/src/main/java/com/salesforce/apollo/ethereal/DataSource.java +++ b/ethereal/src/main/java/com/salesforce/apollo/ethereal/DataSource.java @@ -10,8 +10,8 @@ /** * @author hal.hildebrand - * */ +@FunctionalInterface public interface DataSource { ByteString getData(); diff --git a/ethereal/src/main/java/com/salesforce/apollo/ethereal/Ethereal.java b/ethereal/src/main/java/com/salesforce/apollo/ethereal/Ethereal.java index 6cac3b2bbf..67ea81f4e3 100644 --- a/ethereal/src/main/java/com/salesforce/apollo/ethereal/Ethereal.java +++ b/ethereal/src/main/java/com/salesforce/apollo/ethereal/Ethereal.java @@ -73,7 +73,7 @@ private Ethereal(String label, Config conf, int maxSerializedSize, DataSource ds } private static ThreadPoolExecutor consumer(String label) { - return new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, new PriorityBlockingQueue<>(), + return new ThreadPoolExecutor(1, 1, 10, TimeUnit.MINUTES, new PriorityBlockingQueue<>(), Thread.ofVirtual().name("Ethereal Consumer[" + label + "]").factory(), (r, t) -> log.trace("Shutdown, cannot consume unit")); } @@ -218,6 +218,7 @@ public void stop() { } log.trace("Stopping Ethereal on: {}", config.logLabel()); completeIt(); + consumer.shutdown(); consumer.getQueue().clear(); // Flush any pending consumers creator.stop(); epochs.values().forEach(epoch::close); diff --git a/ethereal/src/main/java/com/salesforce/apollo/ethereal/memberships/ChRbcGossip.java b/ethereal/src/main/java/com/salesforce/apollo/ethereal/memberships/ChRbcGossip.java index e44fdc1ff2..482b267a55 100644 --- a/ethereal/src/main/java/com/salesforce/apollo/ethereal/memberships/ChRbcGossip.java +++ b/ethereal/src/main/java/com/salesforce/apollo/ethereal/memberships/ChRbcGossip.java @@ -60,7 +60,7 @@ public class ChRbcGossip { private volatile ScheduledFuture scheduled; public ChRbcGossip(Digest id, SigningMember member, Collection membership, Processor processor, - Router communications, EtherealMetrics m) { + Router communications, EtherealMetrics m, ScheduledExecutorService scheduler) { this.processor = processor; this.member = member; this.metrics = m; @@ -68,7 +68,8 @@ public ChRbcGossip(Digest id, SigningMember member, Collection membershi comm = communications.create(member, id, terminal, getClass().getCanonicalName(), r -> new GossiperServer(communications.getClientIdentityProvider(), metrics, r), getCreate(metrics), Gossiper.getLocalLoopback(member)); - ring = new SliceIterator<>("ChRbcGossip[%s on: %s]".formatted(id, member.getId()), member, membership, comm); + ring = new SliceIterator<>("ChRbcGossip[%s on: %s]".formatted(id, member.getId()), member, membership, comm, + scheduler); } /** diff --git a/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java b/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java index 7f8443c270..9a7e21c760 100644 --- a/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java +++ b/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java @@ -34,6 +34,7 @@ import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; @@ -137,7 +138,8 @@ public void unbounded() throws NoSuchAlgorithmException, InterruptedException, I }, "Test: " + i); var gossiper = new ChRbcGossip(context.getId(), (SigningMember) member, members, controller.processor(), - com, metrics); + com, metrics, + Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory())); gossipers.add(gossiper); dataSources.add(ds); controllers.add(controller); @@ -265,7 +267,8 @@ private void one(int iteration) }, "Test: " + i); var gossiper = new ChRbcGossip(context.getId(), (SigningMember) member, members, controller.processor(), - com, metrics); + com, metrics, + Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory())); gossipers.add(gossiper); dataSources.add(ds); controllers.add(controller); diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java index c8f84fdb5d..5da22769e4 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java @@ -58,10 +58,12 @@ class Binding { private final Parameters params; private final List seeds; private final View view; + private final ScheduledExecutorService scheduler; public Binding(View view, List seeds, Duration duration, DynamicContext context, CommonCommunications approaches, Node node, Parameters params, - FireflyMetrics metrics, DigestAlgorithm digestAlgo) { + FireflyMetrics metrics, DigestAlgorithm digestAlgo, ScheduledExecutorService scheduler) { + this.scheduler = scheduler; assert node != null; this.view = view; this.duration = duration; @@ -98,7 +100,7 @@ void seeding() { .map(nw -> view.new Participant(nw)) .filter(p -> !node.getId().equals(p.getId())) .collect(Collectors.toList()); - var seedlings = new SliceIterator<>("Seedlings", node, bootstrappers, approaches); + var seedlings = new SliceIterator<>("Seedlings", node, bootstrappers, approaches, scheduler); AtomicReference reseed = new AtomicReference<>(); var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); reseed.set(() -> { @@ -110,6 +112,8 @@ void seeding() { if (!redirect.isDone()) { scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(reseed.get(), log)), params.retryDelay().toNanos(), TimeUnit.NANOSECONDS); + } else { + scheduler.shutdown(); } }, params.retryDelay()); }); @@ -141,7 +145,7 @@ private boolean complete(CompletableFuture redirect, Optional gateway, HashMultiset trusts, - Set initialSeedSet, Digest v, int majority, CompletableFuture complete, + Set iss, Digest v, int majority, CompletableFuture complete, AtomicInteger remaining, ListenableFuture futureSailor) { if (complete.isDone()) { return; @@ -182,6 +186,7 @@ private void complete(Member member, CompletableFuture gateway, HashMulti return; } trusts.add(new Bootstrapping(g.getTrust())); + var initialSeedSet = new HashSet<>(iss); initialSeedSet.addAll(g.getInitialSeedSetList()); log.trace("Initial seed set count: {} view: {} from: {} on: {}", g.getInitialSeedSetCount(), v, member.getId(), node.getId()); @@ -322,7 +327,7 @@ private void join(Redirect redirect, Digest v, Duration duration) { this.context.rebalance(cardinality); node.nextNote(v); - final var redirecting = new SliceIterator<>("Gateways", node, sample, approaches); + final var redirecting = new SliceIterator<>("Gateways", node, sample, approaches, scheduler); var majority = redirect.getBootstrap() ? 1 : Context.minimalQuorum(redirect.getRings(), this.context.getBias()); final var join = join(v); var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); @@ -336,9 +341,11 @@ private void join(Redirect redirect, Digest v, Duration duration) { complete.whenComplete((success, error) -> { if (error != null) { log.info("Failed Join on: {}", node.getId(), error); + scheduler.shutdown(); return; } if (success) { + scheduler.shutdown(); return; } log.info("Join unsuccessful, abandoned: {} trusts: {} on: {}", abandon.get(), trusts.entrySet() @@ -359,6 +366,7 @@ private void join(Redirect redirect, Digest v, Duration duration) { scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(regate.get(), log)), Entropy.nextBitsStreamLong(params.retryDelay().toNanos()), TimeUnit.NANOSECONDS); } else { + scheduler.shutdown(); log.error("Failed to join view: {} cannot obtain majority Gateway on: {}", view, node.getId()); view.stop(); } @@ -375,6 +383,7 @@ private void join(Redirect redirect, Digest v, Duration duration) { log.debug( "Abandoning Gateway view: {} abandons: {} majority: {} reseeding on: {}", v, abandon.get(), majority, node.getId()); + scheduler.shutdown(); complete.completeExceptionally(new TimeoutException("Failed Join")); seeding(); } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java index 044d8e8545..9005abd90f 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java @@ -109,6 +109,7 @@ public class View { private final ViewManagement viewManagement; private final EventValidation validation; private final Verifiers verifiers; + private final ScheduledExecutorService scheduler; private volatile ScheduledFuture futureGossip; private volatile boolean boostrap = false; @@ -122,13 +123,14 @@ public View(DynamicContext context, ControlledIdentifierMember memb public View(DynamicContext context, ControlledIdentifierMember member, String endpoint, EventValidation validation, Verifiers verifiers, Router communications, Parameters params, Router gateway, DigestAlgorithm digestAlgo, FireflyMetrics metrics) { + scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); this.metrics = metrics; this.params = params; this.digestAlgo = digestAlgo; this.context = context; this.roundTimers = new RoundScheduler(String.format("Timers for: %s", context.getId()), context.timeToLive()); this.node = new Node(member, endpoint); - viewManagement = new ViewManagement(this, context, params, metrics, node, digestAlgo); + viewManagement = new ViewManagement(this, context, params, metrics, node, digestAlgo, scheduler); var service = new Service(); this.comm = communications.create(node, context.getId(), service, r -> new FfServer(communications.getClientIdentityProvider(), r, metrics), @@ -217,12 +219,10 @@ public void start(CompletableFuture onJoin, Duration d, List seedpod context.clear(); node.reset(); - var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); - var initial = Entropy.nextBitsStreamLong(d.toNanos()); - scheduler.schedule(() -> Thread.ofVirtual() - .start(Utils.wrapped( - () -> new Binding(this, seeds, d, context, approaches, node, params, metrics, - digestAlgo).seeding(), log)), initial, TimeUnit.NANOSECONDS); + Thread.ofVirtual() + .start(Utils.wrapped( + () -> new Binding(this, seeds, d, context, approaches, node, params, metrics, digestAlgo, + scheduler).seeding(), log)); log.info("{} started on: {}", context.getId(), node.getId()); } @@ -247,6 +247,7 @@ public void stop() { comm.deregister(context.getId()); pendingRebuttals.clear(); context.active().forEach(context::offline); + scheduler.shutdown(); final var current = futureGossip; futureGossip = null; if (current != null) { @@ -494,10 +495,7 @@ void resetBootstrapView() { } void schedule(final Duration duration) { - var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); - futureGossip = scheduler.schedule( - () -> Thread.ofVirtual().start(Utils.wrapped(() -> gossip(duration, scheduler), log)), - Entropy.nextBitsStreamLong(duration.toNanos()), TimeUnit.NANOSECONDS); + Thread.ofVirtual().start(Utils.wrapped(() -> gossip(duration), log)); } void scheduleClearObservations() { @@ -1046,9 +1044,8 @@ private BloomFilter getObservationsBff(long seed, double p) { * Execute one round of gossip * * @param duration - * @param scheduler */ - private void gossip(Duration duration, ScheduledExecutorService scheduler) { + private void gossip(Duration duration) { if (!started.get()) { return; } @@ -1057,7 +1054,7 @@ private void gossip(Duration duration, ScheduledExecutorService scheduler) { tick(); } gossiper.execute((link, ring) -> gossip(link, ring), - (result, destination) -> gossip(result, destination, duration, scheduler)); + (result, destination) -> gossip(result, destination, duration)); } /** @@ -1066,10 +1063,9 @@ private void gossip(Duration duration, ScheduledExecutorService scheduler) { * @param result * @param destination * @param duration - * @param scheduler */ private void gossip(Optional result, RingCommunications.Destination destination, - Duration duration, ScheduledExecutorService scheduler) { + Duration duration) { try { if (result.isPresent()) { final var member = destination.member(); @@ -1115,9 +1111,13 @@ private void gossip(Optional result, RingCommunications.Destination Thread.ofVirtual().start(Utils.wrapped(() -> gossip(duration, scheduler), log)), duration.toNanos(), - TimeUnit.NANOSECONDS); + try { + futureGossip = scheduler.schedule( + () -> Thread.ofVirtual().start(Utils.wrapped(() -> gossip(duration), log)), duration.toNanos(), + TimeUnit.NANOSECONDS); + } catch (RejectedExecutionException e) { + // ignore + } } } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java index 2008a0c05a..f850a852ac 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java @@ -15,7 +15,6 @@ import com.salesforce.apollo.fireflies.Binding.Bound; import com.salesforce.apollo.fireflies.View.Node; import com.salesforce.apollo.fireflies.View.Participant; -import com.salesforce.apollo.fireflies.comm.gossip.Fireflies; import com.salesforce.apollo.fireflies.proto.*; import com.salesforce.apollo.fireflies.proto.Update.Builder; import com.salesforce.apollo.membership.Member; @@ -63,17 +62,19 @@ public class ViewManagement { private final AtomicReference vote = new AtomicReference<>(); private final Lock joinLock = new ReentrantLock(); private final AtomicReference currentView = new AtomicReference<>(); + private final ScheduledExecutorService scheduler; private volatile boolean bootstrap; private volatile CompletableFuture onJoined; ViewManagement(View view, DynamicContext context, Parameters params, FireflyMetrics metrics, Node node, - DigestAlgorithm digestAlgo) { + DigestAlgorithm digestAlgo, ScheduledExecutorService scheduler) { this.node = node; this.view = view; this.context = context; this.params = params; this.metrics = metrics; this.digestAlgo = digestAlgo; + this.scheduler = scheduler; resetBootstrapView(); bootstrapView = currentView.get(); } @@ -400,7 +401,8 @@ void join(Join join, Digest from, StreamObserver responseObserver, Time log.debug("Member pending join: {} view: {} context: {} on: {}", from, currentView(), context.getId(), node.getId()); var enjoining = new SliceIterator<>("Enjoining[%s:%s]".formatted(currentView(), from), node, - observers.stream().map(context::getActiveMember).toList(), view.comm); + observers.stream().map(context::getActiveMember).toList(), view.comm, + scheduler); enjoining.iterate(t -> t.enjoin(join), (_, _, _, _) -> true, () -> { }, Duration.ofMillis(1)); }); @@ -497,29 +499,6 @@ List observersList() { return observers().stream().toList(); } - void populate(List sample) { - var populate = new SliceIterator("Populate: " + context.getId(), node, sample, view.comm); - var repopulate = new AtomicReference(); - var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); - repopulate.set(() -> populate.iterate((link) -> view.gossip(link, 0), (futureSailor, _, link, m) -> { - futureSailor.ifPresent(g -> { - if (!g.getRedirect().equals(SignedNote.getDefaultInstance())) { - final Participant member = (Participant) link.getMember(); - view.stable(() -> view.redirect(member, g, 0)); - } else { - view.stable(() -> view.processUpdates(g)); - } - }); - return !joined(); - }, () -> { - if (!joined()) { - scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(repopulate.get(), log)), - params.populateDuration().toNanos(), TimeUnit.NANOSECONDS); - } - }, params.populateDuration())); - repopulate.get().run(); - } - JoinGossip.Builder processJoins(BloomFilter bff) { JoinGossip.Builder builder = JoinGossip.newBuilder(); diff --git a/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Gorgoneion.java b/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Gorgoneion.java index b23bf786d6..475b71b033 100644 --- a/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Gorgoneion.java +++ b/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Gorgoneion.java @@ -51,9 +51,7 @@ import java.util.HashSet; import java.util.Optional; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.*; import java.util.function.Predicate; import static com.salesforce.apollo.stereotomy.event.protobuf.ProtobufEventFactory.digestOf; @@ -73,6 +71,7 @@ public class Gorgoneion { private final Parameters parameters; private final Predicate verifier; private final boolean bootstrap; + private final ScheduledExecutorService scheduler; public Gorgoneion(boolean bootstrap, Predicate verifier, Parameters parameters, ControlledIdentifierMember member, Context context, ProtoEventObserver observer, @@ -89,6 +88,7 @@ public Gorgoneion(boolean bootstrap, Predicate verifier, Para this.context = context; this.parameters = parameters; this.observer = observer; + this.scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); admissionsComm = admissionsRouter.create(member, context.getId(), new Admit(), ":admissions", r -> new AdmissionsServer(admissionsRouter.getClientIdentityProvider(), @@ -159,7 +159,8 @@ private SignedNonce generateNonce(KERL_ application) { var successors = context.size() == 1 ? Collections.singletonList(member) : context.bftSubset(digestOf(ident, parameters.digestAlgorithm())); final var majority = context.size() == 1 ? 1 : context.majority(); - final var redirecting = new SliceIterator<>("Nonce Endorsement", member, successors, endorsementComm); + final var redirecting = new SliceIterator<>("Nonce Endorsement", member, successors, endorsementComm, + scheduler); Set endorsements = Collections.newSetFromMap(new ConcurrentHashMap<>()); var generated = new CompletableFuture(); redirecting.iterate((link) -> { @@ -221,7 +222,8 @@ private CompletableFuture notarize(Credentials credentials, Validat var successors = context.bftSubset(digestOf(identifier.toIdent(), parameters.digestAlgorithm())); final var majority = context.size() == 1 ? 1 : context.majority(); - SliceIterator redirecting = new SliceIterator<>("Enrollment", member, successors, endorsementComm); + SliceIterator redirecting = new SliceIterator<>("Enrollment", member, successors, endorsementComm, + scheduler); var completed = new HashSet(); var result = new CompletableFuture(); redirecting.iterate((link) -> { @@ -253,7 +255,8 @@ private Validations register(Credentials request) { var successors = context.bftSubset(digestOf(identifier.toIdent(), parameters.digestAlgorithm())); final var majority = context.size() == 1 ? 1 : context.majority(); - final var redirecting = new SliceIterator<>("Credential verification", member, successors, endorsementComm); + final var redirecting = new SliceIterator<>("Credential verification", member, successors, endorsementComm, + scheduler); var verifications = new HashSet(); redirecting.iterate((link) -> { log.debug("Validating credentials for: {} contacting: {} on: {}", identifier, link.getMember().getId(), diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java index 8f5b2a7be5..8b5d4957e5 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java @@ -19,12 +19,16 @@ import io.netty.handler.ssl.ClientAuth; import java.net.SocketAddress; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; /** * @author hal.hildebrand */ public class MtlsClient { + private final static Executor executor = Executors.newVirtualThreadPerTaskExecutor(); + private final ManagedChannel channel; public MtlsClient(SocketAddress address, ClientAuth clientAuth, String alias, ClientContextSupplier supplier, @@ -32,6 +36,7 @@ public MtlsClient(SocketAddress address, ClientAuth clientAuth, String alias, Cl Limiter limiter = new GrpcClientLimiterBuilder().blockOnLimit(false).build(); channel = NettyChannelBuilder.forAddress(address) + // .executor(executor) .withOption(ChannelOption.TCP_NODELAY, true) .sslContext(supplier.forClient(clientAuth, alias, validator, MtlsServer.TL_SV1_3)) .intercept(new ConcurrencyLimitClientInterceptor(limiter, diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java index 4cf97b97a0..3cac8d6219 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java @@ -140,15 +140,15 @@ public static SslContext forServer(ClientAuth clientAuth, String alias, X509Cert public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier serverLimit, LimitsRegistry limitsRegistry, List interceptors, Predicate validator, ExecutorService executor) { - if (executor == null) { - executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); - } + // if (executor == null) { + // executor = Executors.newVirtualThreadPerTaskExecutor(); + // } var limitsBuilder = new GrpcServerLimiterBuilder().limit(serverLimit.get()); if (limitsRegistry != null) { limitsBuilder.metricRegistry(limitsRegistry); } NettyServerBuilder serverBuilder = NettyServerBuilder.forAddress(epProvider.getBindAddress()) - .executor(executor) + // .executor(executor) .withOption(ChannelOption.SO_REUSEADDR, true) .sslContext(supplier.forServer(ClientAuth.REQUIRE, epProvider.getAlias(), diff --git a/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java b/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java index cc0b7ec450..0942cee040 100644 --- a/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java +++ b/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java @@ -19,10 +19,7 @@ import java.io.IOException; import java.time.Duration; import java.util.*; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; @@ -170,8 +167,12 @@ private void proceed(Runnable onMajority, final boolean allow, Runnable proceed, } } log.trace("Proceeding for: <{}> on: {}", label, member.getId()); - scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(proceed, log)), frequency.toNanos(), - TimeUnit.NANOSECONDS); + try { + scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(proceed, log)), frequency.toNanos(), + TimeUnit.NANOSECONDS); + } catch (RejectedExecutionException e) { + // ignore + } } else { log.trace("Termination for: <{}> on: {}", label, member.getId()); } diff --git a/memberships/src/test/java/com/salesforce/apollo/context/ContextTests.java b/memberships/src/test/java/com/salesforce/apollo/context/ContextTests.java index 03903b5d3d..f9f5d4dc88 100644 --- a/memberships/src/test/java/com/salesforce/apollo/context/ContextTests.java +++ b/memberships/src/test/java/com/salesforce/apollo/context/ContextTests.java @@ -41,11 +41,11 @@ public void consistency() throws Exception { } List predecessors = context.predecessors(members.get(0)); - assertEquals(predecessors.get(2), members.get(9)); + assertEquals(predecessors.get(2), members.get(3)); List successors = context.successors(members.get(1)); - assertEquals(members.get(0), successors.get(0)); - assertEquals(members.get(1), context.successor(1, members.get(0))); + assertEquals(members.get(8), successors.get(0)); + assertEquals(members.get(9), context.successor(1, members.get(0))); } @Test diff --git a/memberships/src/test/java/com/salesforce/apollo/context/StaticContextTest.java b/memberships/src/test/java/com/salesforce/apollo/context/StaticContextTest.java index 7a1b2d5bd0..2a165d6266 100644 --- a/memberships/src/test/java/com/salesforce/apollo/context/StaticContextTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/context/StaticContextTest.java @@ -36,11 +36,11 @@ public void consistency() throws Exception { var context = prototype.asStatic(); var predecessors = context.predecessors(members.get(0).getId()); - assertEquals(members.get(9), predecessors.get(2)); + assertEquals(members.get(3), predecessors.get(2)); var successors = context.successors(members.get(1).getId()); - assertEquals(members.get(0), successors.get(0)); - assertEquals(members.get(1), context.successor(1, members.get(0).getId())); + assertEquals(members.get(8), successors.get(0)); + assertEquals(members.get(9), context.successor(1, members.get(0).getId())); } @Test diff --git a/model/src/test/java/com/salesforce/apollo/model/stereotomy/ShardedKERLTest.java b/model/src/test/java/com/salesforce/apollo/model/stereotomy/ShardedKERLTest.java index 2334f18e4c..10827bcb0c 100644 --- a/model/src/test/java/com/salesforce/apollo/model/stereotomy/ShardedKERLTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/stereotomy/ShardedKERLTest.java @@ -63,10 +63,10 @@ public void delegated() throws Exception { ControlledIdentifier identifier = opti2; // identifier - assertTrue(identifier.getIdentifier() instanceof SelfAddressingIdentifier); + assertInstanceOf(SelfAddressingIdentifier.class, identifier.getIdentifier()); var sap = (SelfAddressingIdentifier) identifier.getIdentifier(); assertEquals(DigestAlgorithm.DEFAULT, sap.getDigest().getAlgorithm()); - assertEquals("092126af01f80ca28e7a99bbdce229c029be3bbfcb791e29ccb7a64e8019a36f", + assertEquals("6000b1b611a2a6cb27b6c569c056cf56e04da4905168020fc054d133181d379b", Hex.hex(sap.getDigest().getBytes())); assertEquals(1, ((Unweighted) identifier.getSigningThreshold()).getThreshold()); @@ -108,7 +108,7 @@ public void delegated() throws Exception { assertEquals(lastEstablishmentEvent.hash(DigestAlgorithm.DEFAULT), identifier.getDigest()); // lastEvent - assertTrue(kerl.getKeyEvent(identifier.getLastEvent()) == null); + assertNull(kerl.getKeyEvent(identifier.getLastEvent())); // delegation assertTrue(identifier.getDelegatingIdentifier().isPresent()); diff --git a/schemas/src/main/resources/sql-state/internal.xml b/schemas/src/main/resources/sql-state/internal.xml index a3b7e3c74b..f3a5ee7768 100644 --- a/schemas/src/main/resources/sql-state/internal.xml +++ b/schemas/src/main/resources/sql-state/internal.xml @@ -32,7 +32,7 @@ - + diff --git a/schemas/src/main/resources/stereotomy/stereotomy.xml b/schemas/src/main/resources/stereotomy/stereotomy.xml index 187ea9e74c..a77cd4070c 100644 --- a/schemas/src/main/resources/stereotomy/stereotomy.xml +++ b/schemas/src/main/resources/stereotomy/stereotomy.xml @@ -1,175 +1,176 @@ + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://www.liquibase.org/xml/ns/dbchangelog" + xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.4.xsd"> create schema if not exists stereotomy + schemaName="stereotomy"> + type="IDENTITY"> + primaryKey="true" primaryKeyName="identifier_pkey"/> - - + + + tableName="identifier" schemaName="stereotomy"/> + schemaName="stereotomy"> + type="IDENTITY"> + primaryKey="true" primaryKeyName="coordinates_pkey"/> - + - - + + - + - + - alter table stereotomy.coordinates add constraint - coordinates_ilk_validate check (ilk in ('dip', 'drt', 'icp', - 'ixn', 'nan', 'rct', 'vrc', 'rot')) + alter table stereotomy.coordinates + add constraint + coordinates_ilk_validate check (ilk in ('dip', 'drt', 'icp', + 'ixn', 'nan', 'rct', 'vrc', 'rot')) + + onDelete="CASCADE" baseColumnNames="identifier" + baseTableName="coordinates" baseTableSchemaName="stereotomy" + constraintName="coordinates_identifier_fk" + referencedTableName="identifier" referencedColumnNames="id" + referencedTableSchemaName="stereotomy"/> + columnNames="identifier, sequence_number, digest, ilk" + tableName="coordinates" schemaName="stereotomy" constraintName="unique_id_dig_seq_ilk"/> + schemaName="stereotomy"> + primaryKey="true"/> - - + + - - + + - + + columnNames="digest" + tableName="event" schemaName="stereotomy"/> + schemaName="stereotomy"> + primaryKey="true"/> - + + onDelete="CASCADE" baseColumnNames="identifier" + baseTableName="current_key_state" + baseTableSchemaName="stereotomy" + constraintName="current_key_state_identifier_fk" + referencedTableName="identifier" referencedColumnNames="id" + referencedTableSchemaName="stereotomy"/> + onDelete="CASCADE" baseColumnNames="current" + baseTableName="current_key_state" + baseTableSchemaName="stereotomy" + constraintName="current_key_state_current_fk" + referencedTableName="event" + referencedColumnNames="coordinates" + referencedTableSchemaName="stereotomy"/> + schemaName="stereotomy"> + primaryKey="true"/> + primaryKey="true"/> - - + + + onDelete="CASCADE" baseColumnNames="for" + baseTableName="receipt" baseTableSchemaName="stereotomy" + constraintName="receipt_for_fk" referencedTableName="coordinates" + referencedColumnNames="id" + referencedTableSchemaName="stereotomy"/> + schemaName="stereotomy"> - + - - + + + onDelete="CASCADE" baseColumnNames="for" + baseTableName="attachment" baseTableSchemaName="stereotomy" + constraintName="attachment_for_fk" + referencedTableName="coordinates" + referencedColumnNames="id" + referencedTableSchemaName="stereotomy"/> + tableName="attachment" schemaName="stereotomy"/> + schemaName="stereotomy"> + primaryKey="true"/> - + + primaryKey="true"/> - - + + + onDelete="CASCADE" baseColumnNames="for" + baseTableName="validation" baseTableSchemaName="stereotomy" + constraintName="validation_for_fk" referencedTableName="coordinates" + referencedColumnNames="id" + referencedTableSchemaName="stereotomy"/> - \ No newline at end of file + diff --git a/schemas/src/main/resources/thoth/thoth.xml b/schemas/src/main/resources/thoth/thoth.xml index 29b4f5ef2a..57520eb938 100644 --- a/schemas/src/main/resources/thoth/thoth.xml +++ b/schemas/src/main/resources/thoth/thoth.xml @@ -1,125 +1,125 @@ - - - create schema if not exists thoth - - - - - - - - - - - + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://www.liquibase.org/xml/ns/dbchangelog" + xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.4.xsd"> + + + create schema if not exists thoth + + + + + + + + + + + - + schemaName="thoth" tableName="identifier_location_hash"> + + + + + + + + + + + + + + + + + + + + + + alter table thoth.pending_coordinates + add constraint + pending_coordinates_ilk_validate check (ilk in ('dip', 'drt', 'icp', + 'ixn', 'nan', 'rct', 'vrc', 'rot')) + + + + + + + + - - - - - - - - - - - - - - - - - - - alter table thoth.pending_coordinates add constraint - pending_coordinates_ilk_validate check (ilk in ('dip', 'drt', 'icp', - 'ixn', 'nan', 'rct', 'vrc', 'rot')) - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + - + - - + + - + baseColumnNames="coordinates" baseTableName="pending_attachment" + baseTableSchemaName="thoth" + constraintName="pending_attachment_coordinates_fk" + referencedTableName="pending_coordinates" referencedColumnNames="id" + referencedTableSchemaName="thoth"/> + - + - - + + - - - \ No newline at end of file + baseColumnNames="coordinates" baseTableName="pending_validations" + baseTableSchemaName="thoth" + constraintName="pending_validations_coordinates_fk" + referencedTableName="pending_coordinates" referencedColumnNames="id" + referencedTableSchemaName="thoth"/> + + + diff --git a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/db/UniKERL.java b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/db/UniKERL.java index 484ccd8dcc..c4100c5822 100644 --- a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/db/UniKERL.java +++ b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/db/UniKERL.java @@ -62,7 +62,7 @@ * @author hal.hildebrand */ abstract public class UniKERL implements DigestKERL { - public static final byte[] DIGEST_NONE_BYTES = Digest.NONE.toDigeste().toByteArray(); + public static final byte[] DIGEST_NONE_BYTES = Digest.NONE.getBytes(); private static final Logger log = LoggerFactory.getLogger(UniKERL.class); protected final DigestAlgorithm digestAlgorithm; @@ -103,7 +103,7 @@ public static void append(DSLContext dsl, AttachmentEvent attachment) { .join(IDENTIFIER) .on(IDENTIFIER.PREFIX.eq(coordinates.getIdentifier().toIdent().toByteArray())) .where(COORDINATES.IDENTIFIER.eq(IDENTIFIER.ID)) - .and(COORDINATES.DIGEST.eq(coordinates.getDigest().toDigeste().toByteArray())) + .and(COORDINATES.DIGEST.eq(coordinates.getDigest().getBytes())) .and(COORDINATES.SEQUENCE_NUMBER.eq(coordinates.getSequenceNumber().toBigInteger())) .and(COORDINATES.ILK.eq(coordinates.getIlk())) .fetchOne(); @@ -147,7 +147,7 @@ public static void append(DSLContext context, KeyEvent event, KeyState newState, IDENTIFIER.PREFIX.eq(prevCoords.getIdentifier().toIdent().toByteArray())); final var prev = context.select(COORDINATES.ID) .from(COORDINATES) - .where(COORDINATES.DIGEST.eq(prevCoords.getDigest().toDigeste().toByteArray())) + .where(COORDINATES.DIGEST.eq(prevCoords.getDigest().getBytes())) .and(COORDINATES.IDENTIFIER.eq(preIdentifier)) .and(COORDINATES.SEQUENCE_NUMBER.eq(prevCoords.getSequenceNumber().toBigInteger())) .and(COORDINATES.ILK.eq(prevCoords.getIlk())) @@ -203,7 +203,7 @@ public static void append(DSLContext context, KeyEvent event, KeyState newState, .join(IDENTIFIER) .on(IDENTIFIER.PREFIX.eq(coordinates.getIdentifier().toIdent().toByteArray())) .where(COORDINATES.IDENTIFIER.eq(IDENTIFIER.ID)) - .and(COORDINATES.DIGEST.eq(coordinates.getDigest().toDigeste().toByteArray())) + .and(COORDINATES.DIGEST.eq(coordinates.getDigest().getBytes())) .and(COORDINATES.SEQUENCE_NUMBER.eq(coordinates.getSequenceNumber().toBigInteger())) .and(COORDINATES.ILK.eq(coordinates.getIlk())) .fetchOne() @@ -214,7 +214,7 @@ public static void append(DSLContext context, KeyEvent event, KeyState newState, try { context.insertInto(EVENT) .set(EVENT.COORDINATES, id) - .set(EVENT.DIGEST, digest.toDigeste().toByteArray()) + .set(EVENT.DIGEST, digest.getBytes()) .set(EVENT.CONTENT, compress(event.getBytes())) .set(EVENT.CURRENT_STATE, compress(newState.getBytes())) .execute(); @@ -261,7 +261,6 @@ public static void appendValidations(DSLContext dsl, EventCoordinates coordinate return; } final var identBytes = coordinates.getIdentifier().toIdent().toByteArray(); - try { dsl.mergeInto(IDENTIFIER) .using(dsl.selectOne()) @@ -290,10 +289,16 @@ public static void appendValidations(DSLContext dsl, EventCoordinates coordinate .join(IDENTIFIER) .on(IDENTIFIER.PREFIX.eq(coordinates.getIdentifier().toIdent().toByteArray())) .where(COORDINATES.IDENTIFIER.eq(IDENTIFIER.ID)) - .and(COORDINATES.DIGEST.eq(coordinates.getDigest().toDigeste().toByteArray())) + .and(COORDINATES.DIGEST.eq(coordinates.getDigest().getBytes())) .and(COORDINATES.SEQUENCE_NUMBER.eq(coordinates.getSequenceNumber().toBigInteger())) .and(COORDINATES.ILK.eq(coordinates.getIlk())) .fetchOne(); + if (id == null) { + log.info("Not found!: {}", coordinates.getIdentifier()); + } + } + if (id == null) { + log.info("Not inserted!: {}", coordinates.getIdentifier()); } var result = new AtomicInteger(); var l = id.value1(); @@ -349,7 +354,7 @@ public static void initialize(DSLContext dsl) { .using(context.selectOne()) .on(EVENT.COORDINATES.eq(0L)) .whenNotMatchedThenInsert(EVENT.COORDINATES, EVENT.DIGEST, EVENT.CONTENT) - .values(0L, ecNone.getDigest().toDigeste().toByteArray(), compress(new byte[0])) + .values(0L, ecNone.getDigest().getBytes(), compress(new byte[0])) .execute(); context.mergeInto(COORDINATES) @@ -357,8 +362,8 @@ public static void initialize(DSLContext dsl) { .on(COORDINATES.ID.eq(0L)) .whenNotMatchedThenInsert(COORDINATES.ID, COORDINATES.DIGEST, COORDINATES.IDENTIFIER, COORDINATES.SEQUENCE_NUMBER, COORDINATES.ILK) - .values(0L, ecNone.getDigest().toDigeste().toByteArray(), 0L, - ecNone.getSequenceNumber().toBigInteger(), ecNone.getIlk()) + .values(0L, ecNone.getDigest().getBytes(), 0L, ecNone.getSequenceNumber().toBigInteger(), + ecNone.getIlk()) .execute(); }); } @@ -374,7 +379,7 @@ public Attachment getAttachment(EventCoordinates coordinates) { .join(IDENTIFIER) .on(IDENTIFIER.PREFIX.eq(coordinates.getIdentifier().toIdent().toByteArray())) .where(COORDINATES.IDENTIFIER.eq(IDENTIFIER.ID)) - .and(COORDINATES.DIGEST.eq(coordinates.getDigest().toDigeste().toByteArray())) + .and(COORDINATES.DIGEST.eq(coordinates.getDigest().getBytes())) .and(COORDINATES.SEQUENCE_NUMBER.eq(coordinates.getSequenceNumber().toBigInteger())) .and(COORDINATES.ILK.eq(coordinates.getIlk())) .and(COORDINATES.SEQUENCE_NUMBER.eq(coordinates.getSequenceNumber().toBigInteger())) @@ -430,7 +435,7 @@ public KeyEvent getKeyEvent(Digest digest) { .from(EVENT) .join(COORDINATES) .on(COORDINATES.ID.eq(EVENT.COORDINATES)) - .where(EVENT.DIGEST.eq(digest.toDigeste().toByteString().toByteArray())) + .where(EVENT.DIGEST.eq(digest.getBytes())) .fetchOptional() .map(r -> toKeyEvent(decompress(r.value1()), r.value2())) .orElse(null); @@ -447,7 +452,7 @@ public KeyEvent getKeyEvent(EventCoordinates coordinates) { .join(IDENTIFIER) .on(IDENTIFIER.PREFIX.eq(coordinates.getIdentifier().toIdent().toByteArray())) .where(COORDINATES.IDENTIFIER.eq(IDENTIFIER.ID)) - .and(COORDINATES.DIGEST.eq(coordinates.getDigest().toDigeste().toByteArray())) + .and(COORDINATES.DIGEST.eq(coordinates.getDigest().getBytes())) .and(COORDINATES.SEQUENCE_NUMBER.eq(coordinates.getSequenceNumber().toBigInteger())) .and(COORDINATES.ILK.eq(coordinates.getIlk())) .and(COORDINATES.SEQUENCE_NUMBER.eq(coordinates.getSequenceNumber().toBigInteger())) @@ -467,7 +472,7 @@ public KeyState getKeyState(EventCoordinates coordinates) { .join(IDENTIFIER) .on(IDENTIFIER.PREFIX.eq(coordinates.getIdentifier().toIdent().toByteArray())) .where(COORDINATES.IDENTIFIER.eq(IDENTIFIER.ID)) - .and(COORDINATES.DIGEST.eq(coordinates.getDigest().toDigeste().toByteArray())) + .and(COORDINATES.DIGEST.eq(coordinates.getDigest().getBytes())) .and(COORDINATES.SEQUENCE_NUMBER.eq(coordinates.getSequenceNumber().toBigInteger())) .and(COORDINATES.ILK.eq(coordinates.getIlk())) .fetchOptional() @@ -539,7 +544,7 @@ public Map getValidations(EventCoordinates coordi .join(IDENTIFIER) .on(IDENTIFIER.PREFIX.eq(coordinates.getIdentifier().toIdent().toByteArray())) .where(COORDINATES.IDENTIFIER.eq(IDENTIFIER.ID)) - .and(COORDINATES.DIGEST.eq(coordinates.getDigest().toDigeste().toByteArray())) + .and(COORDINATES.DIGEST.eq(coordinates.getDigest().getBytes())) .and(COORDINATES.SEQUENCE_NUMBER.eq(coordinates.getSequenceNumber().toBigInteger())) .and(COORDINATES.ILK.eq(coordinates.getIlk())) .and(COORDINATES.SEQUENCE_NUMBER.eq(coordinates.getSequenceNumber().toBigInteger())) diff --git a/stereotomy/src/test/java/com/salesforce/apollo/stereotomy/StereotomyTests.java b/stereotomy/src/test/java/com/salesforce/apollo/stereotomy/StereotomyTests.java index 862dc7efbf..7bf98d25e3 100644 --- a/stereotomy/src/test/java/com/salesforce/apollo/stereotomy/StereotomyTests.java +++ b/stereotomy/src/test/java/com/salesforce/apollo/stereotomy/StereotomyTests.java @@ -109,7 +109,7 @@ public void newIdentifier() throws Exception { assertInstanceOf(SelfAddressingIdentifier.class, identifier.getIdentifier()); var sap = (SelfAddressingIdentifier) identifier.getIdentifier(); assertEquals(DigestAlgorithm.DEFAULT, sap.getDigest().getAlgorithm()); - assertEquals("4cb6958622749694aedff3d48b8e402524562813bf2bdd11894a528edc965b4d", + assertEquals("9f207937484c3e47833f7f78d22974b3b543f6363c138ebeda20793c4a5c082b", Hex.hex(sap.getDigest().getBytes())); assertEquals(1, ((Unweighted) identifier.getSigningThreshold()).getThreshold()); @@ -170,7 +170,7 @@ public void newIdentifierFromIdentifier() throws Exception { assertInstanceOf(SelfAddressingIdentifier.class, identifier.getIdentifier()); var sap = (SelfAddressingIdentifier) identifier.getIdentifier(); assertEquals(DigestAlgorithm.DEFAULT, sap.getDigest().getAlgorithm()); - assertEquals("092126af01f80ca28e7a99bbdce229c029be3bbfcb791e29ccb7a64e8019a36f", + assertEquals("6000b1b611a2a6cb27b6c569c056cf56e04da4905168020fc054d133181d379b", Hex.hex(sap.getDigest().getBytes())); assertEquals(1, ((Unweighted) identifier.getSigningThreshold()).getThreshold()); diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java index adf0e8d3da..831a114121 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java @@ -146,7 +146,7 @@ public KerlDHT(Duration operationsFrequency, Context context, this.connectionPool = connectionPool; kerlPool = new UniKERLDirectPooled(connectionPool, digestAlgorithm); this.reconcile = new RingCommunications<>(this.context, member, reconcileComms); - this.kerlSpace = new KerlSpace(connectionPool, member.getId()); + this.kerlSpace = new KerlSpace(connectionPool, member.getId(), digestAlgorithm); initializeSchema(); kerl = new CachingKERL(f -> { diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlSpace.java b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlSpace.java index 7efd785d57..8953060466 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlSpace.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlSpace.java @@ -30,6 +30,7 @@ import org.jooq.Record1; import org.jooq.SQLDialect; import org.jooq.exception.DataAccessException; +import org.jooq.exception.IntegrityConstraintViolationException; import org.jooq.impl.DSL; import org.joou.ULong; import org.slf4j.Logger; @@ -61,10 +62,12 @@ public class KerlSpace { private static final Logger log = LoggerFactory.getLogger(KerlSpace.class); private final JdbcConnectionPool connectionPool; private final Digest member; + private final DigestAlgorithm algorithm; - public KerlSpace(JdbcConnectionPool connectionPool, Digest member) { + public KerlSpace(JdbcConnectionPool connectionPool, Digest member, DigestAlgorithm algorithm) { this.connectionPool = connectionPool; this.member = member; + this.algorithm = algorithm; } public static void upsert(DSLContext dsl, EventCoords coordinates, Attachment attachment, Digest member) { @@ -77,7 +80,7 @@ public static void upsert(DSLContext dsl, EventCoords coordinates, Attachment at Record1 id; try { id = dsl.insertInto(PENDING_COORDINATES) - .set(PENDING_COORDINATES.DIGEST, coordinates.getDigest().toByteArray()) + .set(PENDING_COORDINATES.DIGEST, Digest.from(coordinates.getDigest()).getBytes()) .set(PENDING_COORDINATES.IDENTIFIER, dsl.select(IDENTIFIER.ID).from(IDENTIFIER).where(IDENTIFIER.PREFIX.eq(identBytes))) .set(PENDING_COORDINATES.ILK, coordinates.getIlk()) @@ -85,14 +88,14 @@ public static void upsert(DSLContext dsl, EventCoords coordinates, Attachment at ULong.valueOf(coordinates.getSequenceNumber()).toBigInteger()) .returningResult(PENDING_COORDINATES.ID) .fetchOne(); - } catch (DataAccessException e) { + } catch (IntegrityConstraintViolationException e) { // Already exists id = dsl.select(PENDING_COORDINATES.ID) .from(PENDING_COORDINATES) .join(IDENTIFIER) .on(IDENTIFIER.PREFIX.eq(coordinates.getIdentifier().toByteArray())) .where(PENDING_COORDINATES.IDENTIFIER.eq(IDENTIFIER.ID)) - .and(PENDING_COORDINATES.DIGEST.eq(coordinates.getDigest().toByteArray())) + .and(PENDING_COORDINATES.DIGEST.eq(Digest.from(coordinates.getDigest()).getBytes())) .and(PENDING_COORDINATES.SEQUENCE_NUMBER.eq( ULong.valueOf(coordinates.getSequenceNumber()).toBigInteger())) .and(PENDING_COORDINATES.ILK.eq(coordinates.getIlk())) @@ -118,7 +121,7 @@ public static void upsert(DSLContext context, KeyEvent event, DigestAlgorithm di long id; try { id = context.insertInto(PENDING_COORDINATES) - .set(PENDING_COORDINATES.DIGEST, prevCoords.getDigest().toDigeste().toByteArray()) + .set(PENDING_COORDINATES.DIGEST, prevCoords.getDigest().getBytes()) .set(PENDING_COORDINATES.IDENTIFIER, context.select(IDENTIFIER.ID).from(IDENTIFIER).where(IDENTIFIER.PREFIX.eq(identBytes))) .set(PENDING_COORDINATES.ILK, event.getIlk()) @@ -126,26 +129,30 @@ public static void upsert(DSLContext context, KeyEvent event, DigestAlgorithm di .returningResult(PENDING_COORDINATES.ID) .fetchOne() .value1(); - } catch (DataAccessException e) { + } catch (IntegrityConstraintViolationException e) { // Already exists var coordinates = event.getCoordinates(); - id = context.select(PENDING_COORDINATES.ID) - .from(PENDING_COORDINATES) - .join(IDENTIFIER) - .on(IDENTIFIER.PREFIX.eq(coordinates.getIdentifier().toIdent().toByteArray())) - .where(PENDING_COORDINATES.IDENTIFIER.eq(IDENTIFIER.ID)) - .and(PENDING_COORDINATES.DIGEST.eq(coordinates.getDigest().toDigeste().toByteArray())) - .and(PENDING_COORDINATES.SEQUENCE_NUMBER.eq(coordinates.getSequenceNumber().toBigInteger())) - .and(PENDING_COORDINATES.ILK.eq(coordinates.getIlk())) - .fetchOne() - .value1(); + var result = context.select(PENDING_COORDINATES.ID) + .from(PENDING_COORDINATES) + .join(IDENTIFIER) + .on(IDENTIFIER.PREFIX.eq(identBytes)) + .where(PENDING_COORDINATES.IDENTIFIER.eq(IDENTIFIER.ID)) + .and(PENDING_COORDINATES.DIGEST.eq(prevCoords.getDigest().getBytes())) + .and( + PENDING_COORDINATES.SEQUENCE_NUMBER.eq(coordinates.getSequenceNumber().toBigInteger())) + .and(PENDING_COORDINATES.ILK.eq(coordinates.getIlk())) + .fetchOne(); + if (result == null) { + throw new IllegalStateException("upsert failed", e); + } + id = result.value1(); } final var digest = event.hash(digestAlgorithm); try { context.insertInto(PENDING_EVENT) .set(PENDING_EVENT.COORDINATES, id) - .set(PENDING_EVENT.DIGEST, digest.toDigeste().toByteArray()) + .set(PENDING_EVENT.DIGEST, digest.getBytes()) .set(PENDING_EVENT.EVENT, event.getBytes()) .execute(); } catch (DataAccessException e) { @@ -173,7 +180,7 @@ public static void upsert(DSLContext dsl, Validations validations, Digest member Record1 id; try { id = dsl.insertInto(PENDING_COORDINATES) - .set(PENDING_COORDINATES.DIGEST, coordinates.getDigest().toByteArray()) + .set(PENDING_COORDINATES.DIGEST, Digest.from(coordinates.getDigest()).getBytes()) .set(PENDING_COORDINATES.IDENTIFIER, dsl.select(IDENTIFIER.ID).from(IDENTIFIER).where(IDENTIFIER.PREFIX.eq(identBytes))) .set(PENDING_COORDINATES.ILK, coordinates.getIlk()) @@ -190,7 +197,7 @@ public static void upsert(DSLContext dsl, Validations validations, Digest member .join(IDENTIFIER) .on(IDENTIFIER.PREFIX.eq(coordinates.getIdentifier().toByteArray())) .where(PENDING_COORDINATES.IDENTIFIER.eq(IDENTIFIER.ID)) - .and(PENDING_COORDINATES.DIGEST.eq(coordinates.getDigest().toByteArray())) + .and(PENDING_COORDINATES.DIGEST.eq(Digest.from(coordinates.getDigest()).getBytes())) .and(PENDING_COORDINATES.SEQUENCE_NUMBER.eq( ULong.valueOf(coordinates.getSequenceNumber()).toBigInteger())) .and(PENDING_COORDINATES.ILK.eq(coordinates.getIlk())) @@ -402,13 +409,7 @@ private Stream eventDigestsIn(KeyInterval interval, DSLContext dsl) { .where(IDENTIFIER_LOCATION_HASH.DIGEST.ge(interval.getBegin().getBytes())) .and(IDENTIFIER_LOCATION_HASH.DIGEST.le(interval.getEnd().getBytes())) .stream() - .map(r -> { - try { - return Digest.from(Digeste.parseFrom(r.value1())); - } catch (InvalidProtocolBufferException e) { - return null; - } - }) + .map(r -> new Digest(algorithm, r.value1())) .filter(d -> d != null), dsl.select(PENDING_EVENT.DIGEST) .from(PENDING_EVENT) .join(PENDING_COORDINATES) diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/KerlSpaceTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/KerlSpaceTest.java index f6431f27f1..a02000f3c3 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/KerlSpaceTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/KerlSpaceTest.java @@ -68,11 +68,11 @@ public void smokin() throws Exception { JdbcConnectionPool connectionPoolB = JdbcConnectionPool.create("jdbc:h2:mem:B;DB_CLOSE_DELAY=-1", "", ""); connectionPoolB.setMaxConnections(10); - var spaceA = new KerlSpace(connectionPoolA, DigestAlgorithm.DEFAULT.getOrigin()); + var spaceA = new KerlSpace(connectionPoolA, DigestAlgorithm.DEFAULT.getOrigin(), DigestAlgorithm.DEFAULT); var stereotomyA = new StereotomyImpl(new MemKeyStore(), new UniKERLDirectPooled(connectionPoolA, digestAlgorithm).create(), entropy); - var spaceB = new KerlSpace(connectionPoolB, DigestAlgorithm.DEFAULT.getLast()); + var spaceB = new KerlSpace(connectionPoolB, DigestAlgorithm.DEFAULT.getLast(), DigestAlgorithm.DEFAULT); var stereotomyB = new StereotomyImpl(new MemKeyStore(), new UniKERLDirectPooled(connectionPoolB, digestAlgorithm).create(), entropy); diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/KerlTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/KerlTest.java index 481ab9057c..34123a1292 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/KerlTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/KerlTest.java @@ -60,10 +60,10 @@ public void delegated() throws Exception { ControlledIdentifier delegated = opti2; // identifier - assertTrue(delegated.getIdentifier() instanceof SelfAddressingIdentifier); + assertInstanceOf(SelfAddressingIdentifier.class, delegated.getIdentifier()); var sap = (SelfAddressingIdentifier) delegated.getIdentifier(); assertEquals(DigestAlgorithm.DEFAULT, sap.getDigest().getAlgorithm()); - assertEquals("092126af01f80ca28e7a99bbdce229c029be3bbfcb791e29ccb7a64e8019a36f", + assertEquals("6000b1b611a2a6cb27b6c569c056cf56e04da4905168020fc054d133181d379b", Hex.hex(sap.getDigest().getBytes())); assertEquals(1, ((Unweighted) delegated.getSigningThreshold()).getThreshold()); diff --git a/tron/src/main/java/com/chiralbehaviors/tron/Fsm.java b/tron/src/main/java/com/chiralbehaviors/tron/Fsm.java index d979fcccce..3f93e681af 100644 --- a/tron/src/main/java/com/chiralbehaviors/tron/Fsm.java +++ b/tron/src/main/java/com/chiralbehaviors/tron/Fsm.java @@ -1,6 +1,6 @@ /* * Copyright (c) 2013 ChiralBehaviors LLC, all rights reserved. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,6 +15,9 @@ */ package com.chiralbehaviors.tron; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -25,59 +28,51 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * A Finite State Machine implementation. - * - * @author hhildebrand - * + * * @param the transition interface * @param the fsm context interface + * @author hhildebrand */ public final class Fsm { - private static class State { - private final Context context; - private final Transitions transitions; - - public State(Context context, Transitions transitions) { - this.context = context; - this.transitions = transitions; - } + private static final Logger DEFAULT_LOG = LoggerFactory.getLogger(Fsm.class); + private static final ThreadLocal> thisFsm = new ThreadLocal<>(); + private final Transitions proxy; + private final Deque> stack = new ArrayDeque<>(); + private final Lock sync; + private final Class transitionsType; + private Context context; + private Transitions current; + private Logger log; + private String name = ""; + private boolean pendingPop = false; + private State pendingPush; + private PendingTransition popTransition; + private Transitions previous; + private PendingTransition pushTransition; + private String transition; + Fsm(Context context, boolean sync, Class transitionsType, ClassLoader transitionsCL) { + this.setContext(context); + this.sync = sync ? new ReentrantLock() : null; + this.transitionsType = transitionsType; + this.log = DEFAULT_LOG; + @SuppressWarnings("unchecked") + Transitions facade = (Transitions) Proxy.newProxyInstance(transitionsCL, new Class[] { transitionsType }, + transitionsHandler()); + proxy = facade; } - private static class PendingTransition implements InvocationHandler { - private volatile Object[] args; - private volatile Method method; - - @Override - public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { - if (this.method != null) { - throw new IllegalStateException(String.format("Pop transition '%s' has already been established", - method.toGenericString())); - } - this.method = method; - this.args = args; - return null; - } - } - - private static final Logger DEFAULT_LOG = LoggerFactory.getLogger(Fsm.class); - private static final ThreadLocal> thisFsm = new ThreadLocal<>(); - /** * Construct a new instance of a finite state machine. - * + * * @param fsmContext - the object used as the action context for this FSM - * @param transitions - the interface class used to define the transitions for - * this FSM - * @param transitionsCL - the class loader to be used to load the transitions - * interface class + * @param transitions - the interface class used to define the transitions for this FSM + * @param transitionsCL - the class loader to be used to load the transitions interface class * @param initialState - the initial state of the FSM - * @param sync - true if this FSM is to synchronize state transitions. - * This is required for multi-threaded use of the FSM + * @param sync - true if this FSM is to synchronize state transitions. This is required for multi-threaded + * use of the FSM * @return the Fsm instance */ public static Fsm construct(Context fsmContext, @@ -85,8 +80,9 @@ public static Fsm construct(Context ClassLoader transitionsCL, Enum initialState, boolean sync) { if (!transitions.isAssignableFrom(initialState.getClass())) { - throw new IllegalArgumentException(String.format("Supplied initial state '%s' does not implement the transitions interface '%s'", - initialState, transitions)); + throw new IllegalArgumentException( + String.format("Supplied initial state '%s' does not implement the transitions interface '%s'", initialState, + transitions)); } Fsm fsm = new Fsm<>(fsmContext, sync, transitions, transitionsCL); @SuppressWarnings("unchecked") @@ -96,8 +92,7 @@ public static Fsm construct(Context } /** - * Construct a new instance of a finite state machine with a default - * ClassLoader. + * Construct a new instance of a finite state machine with a default ClassLoader. */ public static Fsm construct(Context fsmContext, Class transitions, @@ -106,7 +101,6 @@ public static Fsm construct(Context } /** - * * @return the Context of the currently executing Fsm */ public static Context thisContext() { @@ -116,7 +110,6 @@ public static Context thisContext() { } /** - * * @return the currrently executing Fsm */ public static Fsm thisFsm() { @@ -125,35 +118,8 @@ public static Fsm thisFsm() { return fsm; } - private Context context; - private Transitions current; - private Logger log; - private String name = ""; - private boolean pendingPop = false; - private State pendingPush; - private PendingTransition popTransition; - private Transitions previous; - private final Transitions proxy; - private PendingTransition pushTransition; - private final Deque> stack = new ArrayDeque<>(); - private final Lock sync; - private String transition; - private final Class transitionsType; - - Fsm(Context context, boolean sync, Class transitionsType, ClassLoader transitionsCL) { - this.setContext(context); - this.sync = sync ? new ReentrantLock() : null; - this.transitionsType = transitionsType; - this.log = DEFAULT_LOG; - @SuppressWarnings("unchecked") - Transitions facade = (Transitions) Proxy.newProxyInstance(transitionsCL, new Class[] { transitionsType }, - transitionsHandler()); - proxy = facade; - } - /** - * Execute the initial state's entry action. Note that we do not guard against - * multiple invocations. + * Execute the initial state's entry action. Note that we do not guard against multiple invocations. */ public void enterStartState() { if (log.isTraceEnabled()) { @@ -163,7 +129,6 @@ public void enterStartState() { } /** - * * @return the action context object of this Fsm */ public Context getContext() { @@ -171,18 +136,21 @@ public Context getContext() { } /** - * + * Set the Context of the FSM + */ + public void setContext(Context context) { + this.context = context; + } + + /** * @return the current state of the Fsm */ public Transitions getCurrentState() { - return locked(() -> { - Transitions transitions = current; - return transitions; - }); + Transitions transitions = current; + return transitions; } /** - * * @return the logger used by this Fsm */ public Logger getLog() { @@ -192,12 +160,24 @@ public Logger getLog() { }); } + /** + * Set the Logger for this Fsm. + * + * @param log - the Logger of this Fsm + */ + public void setLog(Logger log) { + this.log = log; + } + public String getName() { return name; } + public void setName(String name) { + this.name = name; + } + /** - * * @return the previous state of the Fsm, or null if no previous state */ public Transitions getPreviousState() { @@ -208,7 +188,6 @@ public Transitions getPreviousState() { } /** - * * @return the String representation of the current transition */ public String getTransition() { @@ -218,7 +197,6 @@ public String getTransition() { } /** - * * @return the Transitions object that drives this Fsm through its transitions */ public Transitions getTransitions() { @@ -233,12 +211,10 @@ public InvalidTransition invalidTransitionOn() { } /** - * Pop the state off of the stack of pushed states. This state will become the - * current state of the Fsm. Answer the Transitions object that may be used to - * send a transition to the popped state. - * - * @return the Transitions object that may be used to send a transition to the - * popped state. + * Pop the state off of the stack of pushed states. This state will become the current state of the Fsm. Answer the + * Transitions object that may be used to send a transition to the popped state. + * + * @return the Transitions object that may be used to send a transition to the popped state. */ public Transitions pop() { if (pendingPop) { @@ -248,8 +224,9 @@ public Transitions pop() { throw new IllegalStateException(String.format("[%s] Cannot pop after pushing", name)); } if (stack.size() == 0) { - throw new IllegalStateException(String.format("[%s] State stack is empty, current state: %s, transition: %s", - name, prettyPrint(current), transition)); + throw new IllegalStateException( + String.format("[%s] State stack is empty, current state: %s, transition: %s", name, prettyPrint(current), + transition)); } pendingPop = true; popTransition = new PendingTransition(); @@ -270,9 +247,8 @@ public String prettyPrint(Transitions state) { } /** - * Push the current state of the Fsm on the state stack. The supplied state - * becomes the current state of the Fsm - * + * Push the current state of the Fsm on the state stack. The supplied state becomes the current state of the Fsm + * * @param state - the new current state of the Fsm. */ public Transitions push(Transitions state) { @@ -280,9 +256,8 @@ public Transitions push(Transitions state) { } /** - * Push the current state of the Fsm on the state stack. The supplied state - * becomes the current state of the Fsm - * + * Push the current state of the Fsm on the state stack. The supplied state becomes the current state of the Fsm + * * @param state - the new current state of the Fsm. * @param context - the new current context of the FSM */ @@ -305,26 +280,6 @@ public Transitions push(Transitions state, Context context) { return pendingTransition; } - /** - * Set the Context of the FSM - */ - public void setContext(Context context) { - this.context = context; - } - - /** - * Set the Logger for this Fsm. - * - * @param log - the Logger of this Fsm - */ - public void setLog(Logger log) { - this.log = log; - } - - public void setName(String name) { - this.name = name; - } - public R synchonizeOnState(Callable call) throws Exception { return locked(call); } @@ -347,15 +302,15 @@ private void executeEntryAction() { if (action.isAnnotationPresent(Entry.class)) { action.setAccessible(true); if (log.isTraceEnabled()) { - log.trace(String.format("[%s] Entry action: %s.%s", name, prettyPrint(current), - prettyPrint(action))); + log.trace( + String.format("[%s] Entry action: %s.%s", name, prettyPrint(current), prettyPrint(action))); } try { // For entry actions with parameters, inject the context if (action.getParameterTypes().length > 0) action.invoke(current, getContext()); else - action.invoke(current, new Object[] {}); + action.invoke(current); return; } catch (IllegalAccessException | IllegalArgumentException e) { throw new IllegalStateException(e); @@ -375,15 +330,15 @@ private void executeExitAction() { if (action.isAnnotationPresent(Exit.class)) { action.setAccessible(true); if (log.isTraceEnabled()) { - log.trace(String.format("[%s] Exit action: %s.%s", name, prettyPrint(current), - prettyPrint(action))); + log.trace( + String.format("[%s] Exit action: %s.%s", name, prettyPrint(current), prettyPrint(action))); } try { // For exit action with parameters, inject the context if (action.getParameterTypes().length > 0) action.invoke(current, getContext()); else - action.invoke(current, new Object[] {}); + action.invoke(current); return; } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { throw new IllegalStateException(e); @@ -394,7 +349,7 @@ private void executeExitAction() { /** * The Jesus Nut - * + * * @param t - the transition to fire * @param arguments - the transition arguments * @return @@ -427,8 +382,9 @@ private Object fire(Method t, Object[] arguments) { transitionTo(nextState); } else { if (nextState != null && log.isTraceEnabled()) { - log.trace(String.format("[%s] Eliding Transition %s -> %s, pinned state: %s", name, - prettyPrint(current), prettyPrint(nextState), prettyPrint(pinned))); + log.trace( + String.format("[%s] Eliding Transition %s -> %s, pinned state: %s", name, prettyPrint(current), + prettyPrint(nextState), prettyPrint(pinned))); } } return null; @@ -439,10 +395,9 @@ private Object fire(Method t, Object[] arguments) { /** * Fire the concrete transition of the current state - * + * * @param stateTransition - the transition method to execute * @param arguments - the arguments of the method - * * @return the next state */ @SuppressWarnings("unchecked") @@ -454,9 +409,9 @@ private Transitions fireTransition(Method stateTransition, Object[] arguments) { try { return (Transitions) stateTransition.invoke(current, (Object[]) null); } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { - throw new IllegalStateException(String.format("Unable to invoke transition %s,%s", prettyPrint(current), - prettyPrint(stateTransition)), - e); + throw new IllegalStateException( + String.format("Unable to invoke transition %s,%s", prettyPrint(current), prettyPrint(stateTransition)), + e); } } if (log.isTraceEnabled()) { @@ -465,23 +420,23 @@ private Transitions fireTransition(Method stateTransition, Object[] arguments) { try { return (Transitions) stateTransition.invoke(current, arguments); } catch (IllegalAccessException | IllegalArgumentException e) { - throw new IllegalStateException(String.format("Unable to invoke transition %s.%s", prettyPrint(current), - prettyPrint(stateTransition)), - e.getCause()); + throw new IllegalStateException( + String.format("Unable to invoke transition %s.%s", prettyPrint(current), prettyPrint(stateTransition)), + e.getCause()); } catch (InvocationTargetException e) { if (e.getTargetException() instanceof InvalidTransition) { if (log.isTraceEnabled()) { - log.trace(String.format("[%s] Invalid transition %s.%s", name, prettyPrint(current), - getTransition())); + log.trace( + String.format("[%s] Invalid transition %s.%s", name, prettyPrint(current), getTransition())); } throw (InvalidTransition) e.getTargetException(); } if (e.getTargetException() instanceof RuntimeException) { throw (RuntimeException) e.getTargetException(); } - throw new IllegalStateException(String.format("[%s] Unable to invoke transition %s.%s", name, - prettyPrint(current), prettyPrint(stateTransition)), - e.getTargetException()); + throw new IllegalStateException( + String.format("[%s] Unable to invoke transition %s.%s", name, prettyPrint(current), + prettyPrint(stateTransition)), e.getTargetException()); } } @@ -527,10 +482,9 @@ private Method lookupDefaultTransition(InvalidTransition previousException, Meth /** * Lookup the transition. - * + * * @param t - the transition defined in the interface - * @return the transition Method for the current state matching the interface - * definition + * @return the transition Method for the current state matching the interface definition */ private Method lookupTransition(Method t) { Method stateTransition = null; @@ -538,8 +492,9 @@ private Method lookupTransition(Method t) { // First we try declared methods on the state stateTransition = current.getClass().getMethod(t.getName(), t.getParameterTypes()); } catch (NoSuchMethodException | SecurityException e1) { - throw new IllegalStateException(String.format("Inconcievable! The state %s does not implement the transition %s", - prettyPrint(current), prettyPrint(t))); + throw new IllegalStateException( + String.format("Inconcievable! The state %s does not implement the transition %s", prettyPrint(current), + prettyPrint(t))); } stateTransition.setAccessible(true); return stateTransition; @@ -547,7 +502,7 @@ private Method lookupTransition(Method t) { /** * Ye olde tyme state transition - * + * * @param nextState - the next state of the Fsm */ private void normalTransition(Transitions nextState) { @@ -559,16 +514,16 @@ private void normalTransition(Transitions nextState) { } executeExitAction(); if (log.isTraceEnabled()) { - log.trace(String.format("[%s] State transition: %s -> %s", name, prettyPrint(current), - prettyPrint(nextState))); + log.trace( + String.format("[%s] State transition: %s -> %s", name, prettyPrint(current), prettyPrint(nextState))); } current = nextState; executeEntryAction(); } /** - * Execute the exit action of the current state. Set current state to popped - * state of the stack. Execute any pending transition on the current state. + * Execute the exit action of the current state. Set current state to popped state of the stack. Execute any pending + * transition on the current state. */ private void popTransition() { pendingPop = false; @@ -620,11 +575,10 @@ private String prettyPrint(Method transition) { } /** - * Push the current state of the Fsm to the stack, with the supplied context as - * the new current context of the FSM, if non null. Transition the Fsm to the - * nextState, execute the entry action of that state. Set the current state of - * the Fsm to the pending push state, executing the entry action on that state - * + * Push the current state of the Fsm to the stack, with the supplied context as the new current context of the FSM, + * if non null. Transition the Fsm to the nextState, execute the entry action of that state. Set the current state + * of the Fsm to the pending push state, executing the entry action on that state + * * @param nextState */ private void pushTransition(Transitions nextState) { @@ -658,18 +612,9 @@ private void pushTransition(Transitions nextState) { } } - private InvocationHandler transitionsHandler() { - return new InvocationHandler() { - @Override - public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { - return locked(() -> fire(method, args)); - } - }; - } - /** * Transition to the next state - * + * * @param nextState */ private void transitionTo(Transitions nextState) { @@ -681,4 +626,40 @@ private void transitionTo(Transitions nextState) { normalTransition(nextState); } } + + private InvocationHandler transitionsHandler() { + return new InvocationHandler() { + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + return locked(() -> fire(method, args)); + } + }; + } + + private static class State { + private final Context context; + private final Transitions transitions; + + public State(Context context, Transitions transitions) { + this.context = context; + this.transitions = transitions; + } + + } + + private static class PendingTransition implements InvocationHandler { + private volatile Object[] args; + private volatile Method method; + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if (this.method != null) { + throw new IllegalStateException( + String.format("Pop transition '%s' has already been established", method.toGenericString())); + } + this.method = method; + this.args = args; + return null; + } + } }