diff --git a/choam/src/test/java/com/salesforce/apollo/choam/SessionTest.java b/choam/src/test/java/com/salesforce/apollo/choam/SessionTest.java index b6754d0c8..bcaf8bdb2 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/SessionTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/SessionTest.java @@ -93,7 +93,7 @@ public void func() throws Exception { @Test public void scalingTest() throws Exception { - var exec = Executors.newFixedThreadPool(2); + var exec = Executors.newVirtualThreadPerTaskExecutor(); ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); Context context = new ContextImpl<>(DigestAlgorithm.DEFAULT.getOrigin(), 9, 0.2, 3); var entropy = SecureRandom.getInstance("SHA1PRNG"); diff --git a/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java b/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java index 2afcdf036..985499bb6 100644 --- a/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java +++ b/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java @@ -130,8 +130,6 @@ private void one(int iteration, List consumers) var ds = new SimpleDataSource(); final short pid = i; List output = produced.get(pid); - final var exec = Executors.newFixedThreadPool(2, Thread.ofVirtual().factory()); - executors.add(exec); final var member = members.get(i); var com = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder()); comms.add(com); diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java index 252d421c8..bce23f714 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java @@ -61,23 +61,24 @@ * @since 220 */ public class MtlsTest { - private static final int CARDINALITY; - private static final Map certs = new HashMap<>(); - private static final Map endpoints = new HashMap<>(); - private static final boolean LARGE_TESTS = Boolean.getBoolean("large_tests"); - private static Map> identities; + private static final int CARDINALITY; + private static final Map certs = new HashMap<>(); + private static final Map endpoints = new HashMap<>(); + private static final boolean LARGE_TESTS = Boolean.getBoolean( + "large_tests"); + private static Map> identities; static { CARDINALITY = LARGE_TESTS ? 100 : 10; } private List communications = new ArrayList<>(); - private List views; + private List views; @BeforeAll public static void beforeClass() throws Exception { var entropy = SecureRandom.getInstance("SHA1PRNG"); - entropy.setSeed(new byte[]{6, 6, 6}); + entropy.setSeed(new byte[] { 6, 6, 6 }); String localhost = InetAddress.getLoopbackAddress().getHostName(); var stereotomy = new StereotomyImpl(new MemKeyStore(), new MemKERL(DigestAlgorithm.DEFAULT), entropy); identities = IntStream.range(0, CARDINALITY).mapToObj(i -> { @@ -86,7 +87,7 @@ public static void beforeClass() throws Exception { identities.entrySet().forEach(e -> { InetSocketAddress endpoint = new InetSocketAddress(localhost, Utils.allocatePort()); certs.put(e.getKey(), - e.getValue().provision(Instant.now(), Duration.ofDays(1), SignatureAlgorithm.DEFAULT)); + e.getValue().provision(Instant.now(), Duration.ofDays(1), SignatureAlgorithm.DEFAULT)); endpoints.put(e.getKey(), endpoint); }); } @@ -114,9 +115,9 @@ public void smoke() throws Exception { var ctxBuilder = Context.newBuilder().setCardinality(CARDINALITY); var seeds = members.stream() - .map(m -> new Seed(m.getEvent().getCoordinates(), endpoints.get(m.getId()))) - .limit(LARGE_TESTS ? 24 : 3) - .toList(); + .map(m -> new Seed(m.getEvent().getCoordinates(), endpoints.get(m.getId()))) + .limit(LARGE_TESTS ? 24 : 3) + .toList(); var builder = ServerConnectionCache.newBuilder().setTarget(30); var frist = new AtomicBoolean(true); @@ -126,17 +127,16 @@ public void smoke() throws Exception { views = members.stream().map(node -> { Context context = ctxBuilder.build(); FireflyMetricsImpl metrics = new FireflyMetricsImpl(context.getId(), - frist.getAndSet(false) ? node0Registry : registry); + frist.getAndSet(false) ? node0Registry : registry); EndpointProvider ep = new StandardEpProvider(endpoints.get(node.getId()), ClientAuth.REQUIRE, - CertificateValidator.NONE, resolver); + CertificateValidator.NONE, resolver); builder.setMetrics(new ServerConnectionCacheMetricsImpl(frist.getAndSet(false) ? node0Registry : registry)); CertificateWithPrivateKey certWithKey = certs.get(node.getId()); - Router comms = new MtlsServer(node, ep, clientContextSupplier, serverContextSupplier(certWithKey), - Executors.newFixedThreadPool(2, Thread.ofVirtual().factory())).router( - builder); + Router comms = new MtlsServer(node, ep, clientContextSupplier, serverContextSupplier(certWithKey)).router( + builder); communications.add(comms); return new View(context, node, endpoints.get(node.getId()), EventValidation.NONE, comms, parameters, - DigestAlgorithm.DEFAULT, metrics); + DigestAlgorithm.DEFAULT, metrics); }).collect(Collectors.toList()); var then = System.currentTimeMillis(); @@ -145,8 +145,8 @@ public void smoke() throws Exception { var countdown = new AtomicReference<>(new CountDownLatch(1)); views.get(0) - .start(() -> countdown.get().countDown(), duration, Collections.emptyList(), - Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory())); + .start(() -> countdown.get().countDown(), duration, Collections.emptyList(), + Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory())); assertTrue(countdown.get().await(30, TimeUnit.SECONDS), "KERNEL did not stabilize"); @@ -156,40 +156,42 @@ public void smoke() throws Exception { countdown.set(new CountDownLatch(seedlings.size())); seedlings.forEach(view -> view.start(() -> countdown.get().countDown(), duration, kernel, - Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory()))); + Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory()))); assertTrue(countdown.get().await(30, TimeUnit.SECONDS), "Seeds did not stabilize"); countdown.set(new CountDownLatch(views.size() - seeds.size())); views.forEach(view -> view.start(() -> countdown.get().countDown(), duration, seeds, - Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory()))); + Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory()))); assertTrue(Utils.waitForCondition(120_000, 1_000, () -> { return views.stream() - .map(view -> view.getContext().activeCount() != views.size() ? view : null) - .filter(view -> view != null) - .count() == 0; - }), "view did not stabilize: " - + views.stream().map(view -> view.getContext().activeCount()).collect(Collectors.toList())); - System.out.println("View has stabilized in " + (System.currentTimeMillis() - then) + " Ms across all " - + views.size() + " members"); + .map(view -> view.getContext().activeCount() != views.size() ? view : null) + .filter(view -> view != null) + .count() == 0; + }), "view did not stabilize: " + views.stream() + .map(view -> view.getContext().activeCount()) + .collect(Collectors.toList())); + System.out.println( + "View has stabilized in " + (System.currentTimeMillis() - then) + " Ms across all " + views.size() + + " members"); System.out.println("Checking views for consistency"); var failed = views.stream() - .filter(e -> e.getContext().activeCount() != views.size()) - .map(v -> String.format("%s : %s ", v.getNode().getId(), v.getContext().activeCount())) - .toList(); + .filter(e -> e.getContext().activeCount() != views.size()) + .map(v -> String.format("%s : %s ", v.getNode().getId(), v.getContext().activeCount())) + .toList(); assertEquals(0, failed.size(), - " expected: " + views.size() + " failed: " + failed.size() + " views: " + failed); + " expected: " + views.size() + " failed: " + failed.size() + " views: " + failed); System.out.println("Stoping views"); views.forEach(view -> view.stop()); ConsoleReporter.forRegistry(node0Registry) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .build() - .report(); + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build() + .report(); } private Function clientContextSupplier() { @@ -200,7 +202,7 @@ public SslContext forClient(ClientAuth clientAuth, String alias, CertificateVali String tlsVersion) { CertificateWithPrivateKey certWithKey = certs.get(m.getId()); return MtlsServer.forClient(clientAuth, alias, certWithKey.getX509Certificate(), - certWithKey.getPrivateKey(), validator); + certWithKey.getPrivateKey(), validator); } }; }; @@ -212,15 +214,15 @@ private ServerContextSupplier serverContextSupplier(CertificateWithPrivateKey ce public SslContext forServer(ClientAuth clientAuth, String alias, CertificateValidator validator, Provider provider) { return MtlsServer.forServer(clientAuth, alias, certWithKey.getX509Certificate(), - certWithKey.getPrivateKey(), validator); + certWithKey.getPrivateKey(), validator); } @Override public Digest getMemberId(X509Certificate key) { return ((SelfAddressingIdentifier) Stereotomy.decode(key) - .get() - .coordinates() - .getIdentifier()).getDigest(); + .get() + .coordinates() + .getIdentifier()).getDigest(); } }; } diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java index c6e35aac9..1f6122f3e 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java @@ -8,6 +8,7 @@ import java.net.SocketAddress; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import com.netflix.concurrency.limits.Limiter; import com.netflix.concurrency.limits.grpc.client.ConcurrencyLimitClientInterceptor; @@ -26,11 +27,12 @@ * */ public class MtlsClient { + private final static Executor exec = Executors.newVirtualThreadPerTaskExecutor(); private final ManagedChannel channel; public MtlsClient(SocketAddress address, ClientAuth clientAuth, String alias, ClientContextSupplier supplier, - CertificateValidator validator, Executor exec) { + CertificateValidator validator) { Limiter limiter = new GrpcClientLimiterBuilder().blockOnLimit(false).build(); channel = NettyChannelBuilder.forAddress(address) diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java index dc5599eda..44499db15 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java @@ -54,17 +54,15 @@ public class MtlsServer implements RouterSupplier { private final LoadingCache cachedMembership; private final Function contextSupplier; private final EndpointProvider epProvider; - private final Executor exec; private final Member from; private final Context.Key sslSessionContext = Context.key("SSLSession"); private final ServerContextSupplier supplier; public MtlsServer(Member from, EndpointProvider epProvider, Function contextSupplier, - ServerContextSupplier supplier, Executor exec) { + ServerContextSupplier supplier) { this.from = from; this.epProvider = epProvider; this.contextSupplier = contextSupplier; - this.exec = exec; this.supplier = supplier; cachedMembership = CacheBuilder.newBuilder().build(new CacheLoader() { @Override @@ -167,7 +165,7 @@ public Digest getFrom() { private ManagedChannel connectTo(Member to) { return new MtlsClient(epProvider.addressFor(to), epProvider.getClientAuth(), epProvider.getAlias(), - contextSupplier.apply(from), epProvider.getValiator(), exec).getChannel(); + contextSupplier.apply(from), epProvider.getValiator()).getChannel(); } private X509Certificate getCert() { diff --git a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestBinder.java b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestBinder.java index 779eeaaf1..999daac31 100644 --- a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestBinder.java +++ b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestBinder.java @@ -64,7 +64,7 @@ public void bind() throws Exception { var clientMember = new ControlledIdentifierMember(stereotomy.newIdentifier()); var builder = ServerConnectionCache.newBuilder(); - final var exec = Executors.newFixedThreadPool(3, Thread.ofVirtual().factory()); + final var exec = Executors.newVirtualThreadPerTaskExecutor(); serverRouter = new LocalServer(prefix, serverMember).router(builder); clientRouter = new LocalServer(prefix, clientMember).router(builder); diff --git a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestEventObserver.java b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestEventObserver.java index c2125297d..4d9188f67 100644 --- a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestEventObserver.java +++ b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestEventObserver.java @@ -65,7 +65,7 @@ public void observer() throws Exception { var clientMember = new ControlledIdentifierMember(stereotomy.newIdentifier()); var builder = ServerConnectionCache.newBuilder(); - final var exec = Executors.newFixedThreadPool(3, Thread.ofVirtual().factory()); + final var exec = Executors.newVirtualThreadPerTaskExecutor(); serverRouter = new LocalServer(prefix, serverMember).router(builder); clientRouter = new LocalServer(prefix, clientMember).router(builder); diff --git a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestEventValidation.java b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestEventValidation.java index f87c2d45d..cb9a74aa5 100644 --- a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestEventValidation.java +++ b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestEventValidation.java @@ -61,7 +61,7 @@ public void validation() throws Exception { var clientMember = new ControlledIdentifierMember(stereotomy.newIdentifier()); var builder = ServerConnectionCache.newBuilder(); - final var exec = Executors.newFixedThreadPool(3, Thread.ofVirtual().factory()); + final var exec = Executors.newVirtualThreadPerTaskExecutor(); serverRouter = new LocalServer(prefix, serverMember).router(builder); clientRouter = new LocalServer(prefix, clientMember).router(builder); diff --git a/thoth/src/test/resources/logback-test.xml b/thoth/src/test/resources/logback-test.xml index 147b32f2e..07bb88f65 100644 --- a/thoth/src/test/resources/logback-test.xml +++ b/thoth/src/test/resources/logback-test.xml @@ -35,51 +35,11 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +