From a6ae3893c24babc291dccb919722e0577830a836 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Fri, 28 Jun 2024 18:06:20 -0700 Subject: [PATCH] don't use parallel stream on join. suppress metric reporting, gated with system prop --- .../java/com/salesforce/apollo/choam/CHOAM.java | 2 +- .../com/salesforce/apollo/choam/ViewAssembly.java | 5 ++++- .../com/salesforce/apollo/choam/SessionTest.java | 12 +++++++----- .../com/salesforce/apollo/choam/TestCHOAM.java | 13 +++++++------ .../salesforce/apollo/fireflies/ChurnTest.java | 14 ++++++++------ .../com/salesforce/apollo/fireflies/E2ETest.java | 14 ++++++++------ .../com/salesforce/apollo/fireflies/MtlsTest.java | 12 +++++++----- .../salesforce/apollo/fireflies/SwarmTest.java | 14 ++++++++------ .../salesforce/apollo/messaging/rbc/RbcTest.java | 12 +++++++----- .../com/salesforce/apollo/state/CHOAMTest.java | 15 ++++++++------- 10 files changed, 65 insertions(+), 48 deletions(-) diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index f65003a97..42cf1dce1 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -1445,7 +1445,7 @@ private void join(View view, Collection members, AtomicInteger joined) { .setSignature(params.member().sign(inView.toByteString()).toSig()) .build(); var countdown = new CountDownLatch(sampled.size()); - sampled.parallelStream().map(m -> { + sampled.stream().map(m -> { var connection = comm.connect(m); log.trace("connect to: {} is: {} on: {}", m.getId(), connection, params.member().getId()); return connection; diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java index 25e4cb2bb..12d4b4e3d 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java @@ -20,6 +20,7 @@ import com.salesforce.apollo.cryptography.proto.PubKey; import com.salesforce.apollo.ethereal.Dag; import com.salesforce.apollo.membership.Member; +import com.salesforce.apollo.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -199,7 +200,9 @@ boolean complete() { + selected.assembly.size(); log.debug("View Assembly: {} completed assembly: {} on: {}", nextViewId, slate.keySet().stream().sorted().toList(), params().member().getId()); - transitions.complete(); + Thread.ofVirtual().start(Utils.wrapped(() -> { + transitions.complete(); + }, log)); return true; } diff --git a/choam/src/test/java/com/salesforce/apollo/choam/SessionTest.java b/choam/src/test/java/com/salesforce/apollo/choam/SessionTest.java index 315a836a6..c8b052316 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/SessionTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/SessionTest.java @@ -178,10 +178,12 @@ public void scalingTest() throws Exception { } } System.out.println(); - ConsoleReporter.forRegistry(reg) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .build() - .report(); + if (Boolean.getBoolean("reportMetrics")) { + ConsoleReporter.forRegistry(reg) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build() + .report(); + } } } diff --git a/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java b/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java index 4c5866d4d..30a727ae9 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java @@ -203,12 +203,13 @@ public void submitMultiplTxn() throws Exception { choams.values().forEach(e -> e.stop()); System.out.println(); - - ConsoleReporter.forRegistry(registry) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .build() - .report(); + if (Boolean.getBoolean("reportMetrics")) { + ConsoleReporter.forRegistry(registry) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build() + .report(); + } } assertTrue(checkpointOccurred.get(5, TimeUnit.SECONDS)); } diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java index 913623451..921a54ff2 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java @@ -260,12 +260,14 @@ public void churn() throws Exception { assertTrue(testGraph.isSC()); } - System.out.println("Node 0 metrics"); - ConsoleReporter.forRegistry(node0Registry) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .build() - .report(); + if (Boolean.getBoolean("reportMetrics")) { + System.out.println("Node 0 metrics"); + ConsoleReporter.forRegistry(node0Registry) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build() + .report(); + } } private void initialize() { diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java index 6b4121346..7a9893293 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java @@ -221,12 +221,14 @@ private void initialize() { private void post() { communications.forEach(e -> e.close(Duration.ofSeconds(0))); views.forEach(view -> view.stop()); - System.out.println("Node 0 metrics"); - ConsoleReporter.forRegistry(node0Registry) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .build() - .report(); + if (Boolean.getBoolean("reportMetrics")) { + System.out.println("Node 0 metrics"); + ConsoleReporter.forRegistry(node0Registry) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build() + .report(); + } } private void validateConstraints() { diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java index 9a1176cb7..3361d7084 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java @@ -183,11 +183,13 @@ public void smoke() throws Exception { System.out.println("Stoping views"); views.forEach(view -> view.stop()); - ConsoleReporter.forRegistry(node0Registry) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .build() - .report(); + if (Boolean.getBoolean("reportMetrics")) { + ConsoleReporter.forRegistry(node0Registry) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build() + .report(); + } } private Function clientContextSupplier() { diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java index 2746534ce..fa0d75aea 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java @@ -204,12 +204,14 @@ public void swarm() throws Exception { } communications.forEach(e -> e.close(Duration.ofSeconds(1))); views.forEach(view -> view.stop()); - System.out.println("Node 0 metrics"); - ConsoleReporter.forRegistry(node0Registry) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .build() - .report(); + if (Boolean.getBoolean("reportMetrics")) { + System.out.println("Node 0 metrics"); + ConsoleReporter.forRegistry(node0Registry) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build() + .report(); + } } private void initialize() { diff --git a/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java b/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java index 2d6eb50c2..ed5c8ff08 100644 --- a/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java @@ -145,11 +145,13 @@ public void broadcast() throws Exception { System.out.println(); - ConsoleReporter.forRegistry(registry) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .build() - .report(); + if (Boolean.getBoolean("reportMetrics")) { + ConsoleReporter.forRegistry(registry) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build() + .report(); + } } class Receiver implements MessageHandler { diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java index d0dca0785..12a79a49c 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java @@ -117,11 +117,13 @@ public void after() throws Exception { members = null; System.out.println(); - ConsoleReporter.forRegistry(registry) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .build() - .report(); + if (Boolean.getBoolean("reportMetrics")) { + ConsoleReporter.forRegistry(registry) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build() + .report(); + } registry = null; } @@ -328,8 +330,7 @@ public void endBlock(ULong height, Digest hash) { @Override public void execute(int i, Digest hash, Transaction tx, @SuppressWarnings("rawtypes") CompletableFuture onComplete) { - up.getExecutor() - .execute(i, hash, tx, onComplete); + up.getExecutor().execute(i, hash, tx, onComplete); } @Override