Skip to content

Commit

Permalink
don't use parallel stream on join.
Browse files Browse the repository at this point in the history
suppress metric reporting, gated with system prop
  • Loading branch information
Hellblazer committed Jun 29, 2024
1 parent a799b86 commit a6ae389
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 48 deletions.
2 changes: 1 addition & 1 deletion choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -1445,7 +1445,7 @@ private void join(View view, Collection<Member> members, AtomicInteger joined) {
.setSignature(params.member().sign(inView.toByteString()).toSig())
.build();
var countdown = new CountDownLatch(sampled.size());
sampled.parallelStream().map(m -> {
sampled.stream().map(m -> {
var connection = comm.connect(m);
log.trace("connect to: {} is: {} on: {}", m.getId(), connection, params.member().getId());
return connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.salesforce.apollo.cryptography.proto.PubKey;
import com.salesforce.apollo.ethereal.Dag;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -199,7 +200,9 @@ boolean complete() {
+ selected.assembly.size();
log.debug("View Assembly: {} completed assembly: {} on: {}", nextViewId,
slate.keySet().stream().sorted().toList(), params().member().getId());
transitions.complete();
Thread.ofVirtual().start(Utils.wrapped(() -> {
transitions.complete();
}, log));
return true;
}

Expand Down
12 changes: 7 additions & 5 deletions choam/src/test/java/com/salesforce/apollo/choam/SessionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,12 @@ public void scalingTest() throws Exception {
}
}
System.out.println();
ConsoleReporter.forRegistry(reg)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
.report();
if (Boolean.getBoolean("reportMetrics")) {
ConsoleReporter.forRegistry(reg)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
.report();
}
}
}
13 changes: 7 additions & 6 deletions choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,13 @@ public void submitMultiplTxn() throws Exception {
choams.values().forEach(e -> e.stop());

System.out.println();

ConsoleReporter.forRegistry(registry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
.report();
if (Boolean.getBoolean("reportMetrics")) {
ConsoleReporter.forRegistry(registry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
.report();
}
}
assertTrue(checkpointOccurred.get(5, TimeUnit.SECONDS));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,12 +260,14 @@ public void churn() throws Exception {
assertTrue(testGraph.isSC());
}

System.out.println("Node 0 metrics");
ConsoleReporter.forRegistry(node0Registry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
.report();
if (Boolean.getBoolean("reportMetrics")) {
System.out.println("Node 0 metrics");
ConsoleReporter.forRegistry(node0Registry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
.report();
}
}

private void initialize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,14 @@ private void initialize() {
private void post() {
communications.forEach(e -> e.close(Duration.ofSeconds(0)));
views.forEach(view -> view.stop());
System.out.println("Node 0 metrics");
ConsoleReporter.forRegistry(node0Registry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
.report();
if (Boolean.getBoolean("reportMetrics")) {
System.out.println("Node 0 metrics");
ConsoleReporter.forRegistry(node0Registry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
.report();
}
}

private void validateConstraints() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,13 @@ public void smoke() throws Exception {
System.out.println("Stoping views");
views.forEach(view -> view.stop());

ConsoleReporter.forRegistry(node0Registry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
.report();
if (Boolean.getBoolean("reportMetrics")) {
ConsoleReporter.forRegistry(node0Registry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
.report();
}
}

private Function<Member, ClientContextSupplier> clientContextSupplier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,14 @@ public void swarm() throws Exception {
}
communications.forEach(e -> e.close(Duration.ofSeconds(1)));
views.forEach(view -> view.stop());
System.out.println("Node 0 metrics");
ConsoleReporter.forRegistry(node0Registry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
.report();
if (Boolean.getBoolean("reportMetrics")) {
System.out.println("Node 0 metrics");
ConsoleReporter.forRegistry(node0Registry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
.report();
}
}

private void initialize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,13 @@ public void broadcast() throws Exception {

System.out.println();

ConsoleReporter.forRegistry(registry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
.report();
if (Boolean.getBoolean("reportMetrics")) {
ConsoleReporter.forRegistry(registry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
.report();
}
}

class Receiver implements MessageHandler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,13 @@ public void after() throws Exception {
members = null;
System.out.println();

ConsoleReporter.forRegistry(registry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
.report();
if (Boolean.getBoolean("reportMetrics")) {
ConsoleReporter.forRegistry(registry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
.report();
}
registry = null;
}

Expand Down Expand Up @@ -328,8 +330,7 @@ public void endBlock(ULong height, Digest hash) {
@Override
public void execute(int i, Digest hash, Transaction tx,
@SuppressWarnings("rawtypes") CompletableFuture onComplete) {
up.getExecutor()
.execute(i, hash, tx, onComplete);
up.getExecutor().execute(i, hash, tx, onComplete);
}

@Override
Expand Down

0 comments on commit a6ae389

Please sign in to comment.