Skip to content

Commit

Permalink
further thread pool clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Oct 28, 2023
1 parent 944d98d commit ba0321a
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void func() throws Exception {

@Test
public void scalingTest() throws Exception {
var exec = Executors.newFixedThreadPool(2);
var exec = Executors.newVirtualThreadPerTaskExecutor();
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
Context<Member> context = new ContextImpl<>(DigestAlgorithm.DEFAULT.getOrigin(), 9, 0.2, 3);
var entropy = SecureRandom.getInstance("SHA1PRNG");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,6 @@ private void one(int iteration, List<ThreadPoolExecutor> consumers)
var ds = new SimpleDataSource();
final short pid = i;
List<PreBlock> output = produced.get(pid);
final var exec = Executors.newFixedThreadPool(2, Thread.ofVirtual().factory());
executors.add(exec);
final var member = members.get(i);
var com = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder());
comms.add(com);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,24 @@
* @since 220
*/
public class MtlsTest {
private static final int CARDINALITY;
private static final Map<Digest, CertificateWithPrivateKey> certs = new HashMap<>();
private static final Map<Digest, InetSocketAddress> endpoints = new HashMap<>();
private static final boolean LARGE_TESTS = Boolean.getBoolean("large_tests");
private static Map<Digest, ControlledIdentifier<SelfAddressingIdentifier>> identities;
private static final int CARDINALITY;
private static final Map<Digest, CertificateWithPrivateKey> certs = new HashMap<>();
private static final Map<Digest, InetSocketAddress> endpoints = new HashMap<>();
private static final boolean LARGE_TESTS = Boolean.getBoolean(
"large_tests");
private static Map<Digest, ControlledIdentifier<SelfAddressingIdentifier>> identities;

static {
CARDINALITY = LARGE_TESTS ? 100 : 10;
}

private List<Router> communications = new ArrayList<>();
private List<View> views;
private List<View> views;

@BeforeAll
public static void beforeClass() throws Exception {
var entropy = SecureRandom.getInstance("SHA1PRNG");
entropy.setSeed(new byte[]{6, 6, 6});
entropy.setSeed(new byte[] { 6, 6, 6 });
String localhost = InetAddress.getLoopbackAddress().getHostName();
var stereotomy = new StereotomyImpl(new MemKeyStore(), new MemKERL(DigestAlgorithm.DEFAULT), entropy);
identities = IntStream.range(0, CARDINALITY).mapToObj(i -> {
Expand All @@ -86,7 +87,7 @@ public static void beforeClass() throws Exception {
identities.entrySet().forEach(e -> {
InetSocketAddress endpoint = new InetSocketAddress(localhost, Utils.allocatePort());
certs.put(e.getKey(),
e.getValue().provision(Instant.now(), Duration.ofDays(1), SignatureAlgorithm.DEFAULT));
e.getValue().provision(Instant.now(), Duration.ofDays(1), SignatureAlgorithm.DEFAULT));
endpoints.put(e.getKey(), endpoint);
});
}
Expand Down Expand Up @@ -114,9 +115,9 @@ public void smoke() throws Exception {
var ctxBuilder = Context.<Participant>newBuilder().setCardinality(CARDINALITY);

var seeds = members.stream()
.map(m -> new Seed(m.getEvent().getCoordinates(), endpoints.get(m.getId())))
.limit(LARGE_TESTS ? 24 : 3)
.toList();
.map(m -> new Seed(m.getEvent().getCoordinates(), endpoints.get(m.getId())))
.limit(LARGE_TESTS ? 24 : 3)
.toList();

var builder = ServerConnectionCache.newBuilder().setTarget(30);
var frist = new AtomicBoolean(true);
Expand All @@ -126,17 +127,16 @@ public void smoke() throws Exception {
views = members.stream().map(node -> {
Context<Participant> context = ctxBuilder.build();
FireflyMetricsImpl metrics = new FireflyMetricsImpl(context.getId(),
frist.getAndSet(false) ? node0Registry : registry);
frist.getAndSet(false) ? node0Registry : registry);
EndpointProvider ep = new StandardEpProvider(endpoints.get(node.getId()), ClientAuth.REQUIRE,
CertificateValidator.NONE, resolver);
CertificateValidator.NONE, resolver);
builder.setMetrics(new ServerConnectionCacheMetricsImpl(frist.getAndSet(false) ? node0Registry : registry));
CertificateWithPrivateKey certWithKey = certs.get(node.getId());
Router comms = new MtlsServer(node, ep, clientContextSupplier, serverContextSupplier(certWithKey),
Executors.newFixedThreadPool(2, Thread.ofVirtual().factory())).router(
builder);
Router comms = new MtlsServer(node, ep, clientContextSupplier, serverContextSupplier(certWithKey)).router(
builder);
communications.add(comms);
return new View(context, node, endpoints.get(node.getId()), EventValidation.NONE, comms, parameters,
DigestAlgorithm.DEFAULT, metrics);
DigestAlgorithm.DEFAULT, metrics);
}).collect(Collectors.toList());

var then = System.currentTimeMillis();
Expand All @@ -145,8 +145,8 @@ public void smoke() throws Exception {
var countdown = new AtomicReference<>(new CountDownLatch(1));

views.get(0)
.start(() -> countdown.get().countDown(), duration, Collections.emptyList(),
Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory()));
.start(() -> countdown.get().countDown(), duration, Collections.emptyList(),
Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory()));

assertTrue(countdown.get().await(30, TimeUnit.SECONDS), "KERNEL did not stabilize");

Expand All @@ -156,40 +156,42 @@ public void smoke() throws Exception {
countdown.set(new CountDownLatch(seedlings.size()));

seedlings.forEach(view -> view.start(() -> countdown.get().countDown(), duration, kernel,
Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory())));
Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory())));

assertTrue(countdown.get().await(30, TimeUnit.SECONDS), "Seeds did not stabilize");

countdown.set(new CountDownLatch(views.size() - seeds.size()));
views.forEach(view -> view.start(() -> countdown.get().countDown(), duration, seeds,
Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory())));
Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory())));

assertTrue(Utils.waitForCondition(120_000, 1_000, () -> {
return views.stream()
.map(view -> view.getContext().activeCount() != views.size() ? view : null)
.filter(view -> view != null)
.count() == 0;
}), "view did not stabilize: "
+ views.stream().map(view -> view.getContext().activeCount()).collect(Collectors.toList()));
System.out.println("View has stabilized in " + (System.currentTimeMillis() - then) + " Ms across all "
+ views.size() + " members");
.map(view -> view.getContext().activeCount() != views.size() ? view : null)
.filter(view -> view != null)
.count() == 0;
}), "view did not stabilize: " + views.stream()
.map(view -> view.getContext().activeCount())
.collect(Collectors.toList()));
System.out.println(
"View has stabilized in " + (System.currentTimeMillis() - then) + " Ms across all " + views.size()
+ " members");

System.out.println("Checking views for consistency");
var failed = views.stream()
.filter(e -> e.getContext().activeCount() != views.size())
.map(v -> String.format("%s : %s ", v.getNode().getId(), v.getContext().activeCount()))
.toList();
.filter(e -> e.getContext().activeCount() != views.size())
.map(v -> String.format("%s : %s ", v.getNode().getId(), v.getContext().activeCount()))
.toList();
assertEquals(0, failed.size(),
" expected: " + views.size() + " failed: " + failed.size() + " views: " + failed);
" expected: " + views.size() + " failed: " + failed.size() + " views: " + failed);

System.out.println("Stoping views");
views.forEach(view -> view.stop());

ConsoleReporter.forRegistry(node0Registry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
.report();
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
.report();
}

private Function<Member, ClientContextSupplier> clientContextSupplier() {
Expand All @@ -200,7 +202,7 @@ public SslContext forClient(ClientAuth clientAuth, String alias, CertificateVali
String tlsVersion) {
CertificateWithPrivateKey certWithKey = certs.get(m.getId());
return MtlsServer.forClient(clientAuth, alias, certWithKey.getX509Certificate(),
certWithKey.getPrivateKey(), validator);
certWithKey.getPrivateKey(), validator);
}
};
};
Expand All @@ -212,15 +214,15 @@ private ServerContextSupplier serverContextSupplier(CertificateWithPrivateKey ce
public SslContext forServer(ClientAuth clientAuth, String alias, CertificateValidator validator,
Provider provider) {
return MtlsServer.forServer(clientAuth, alias, certWithKey.getX509Certificate(),
certWithKey.getPrivateKey(), validator);
certWithKey.getPrivateKey(), validator);
}

@Override
public Digest getMemberId(X509Certificate key) {
return ((SelfAddressingIdentifier) Stereotomy.decode(key)
.get()
.coordinates()
.getIdentifier()).getDigest();
.get()
.coordinates()
.getIdentifier()).getDigest();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.net.SocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import com.netflix.concurrency.limits.Limiter;
import com.netflix.concurrency.limits.grpc.client.ConcurrencyLimitClientInterceptor;
Expand All @@ -26,11 +27,12 @@
*
*/
public class MtlsClient {
private final static Executor exec = Executors.newVirtualThreadPerTaskExecutor();

private final ManagedChannel channel;

public MtlsClient(SocketAddress address, ClientAuth clientAuth, String alias, ClientContextSupplier supplier,
CertificateValidator validator, Executor exec) {
CertificateValidator validator) {

Limiter<GrpcClientRequestContext> limiter = new GrpcClientLimiterBuilder().blockOnLimit(false).build();
channel = NettyChannelBuilder.forAddress(address)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,15 @@ public class MtlsServer implements RouterSupplier {
private final LoadingCache<X509Certificate, Digest> cachedMembership;
private final Function<Member, ClientContextSupplier> contextSupplier;
private final EndpointProvider epProvider;
private final Executor exec;
private final Member from;
private final Context.Key<SSLSession> sslSessionContext = Context.key("SSLSession");
private final ServerContextSupplier supplier;

public MtlsServer(Member from, EndpointProvider epProvider, Function<Member, ClientContextSupplier> contextSupplier,
ServerContextSupplier supplier, Executor exec) {
ServerContextSupplier supplier) {
this.from = from;
this.epProvider = epProvider;
this.contextSupplier = contextSupplier;
this.exec = exec;
this.supplier = supplier;
cachedMembership = CacheBuilder.newBuilder().build(new CacheLoader<X509Certificate, Digest>() {
@Override
Expand Down Expand Up @@ -167,7 +165,7 @@ public Digest getFrom() {

private ManagedChannel connectTo(Member to) {
return new MtlsClient(epProvider.addressFor(to), epProvider.getClientAuth(), epProvider.getAlias(),
contextSupplier.apply(from), epProvider.getValiator(), exec).getChannel();
contextSupplier.apply(from), epProvider.getValiator()).getChannel();
}

private X509Certificate getCert() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void bind() throws Exception {
var clientMember = new ControlledIdentifierMember(stereotomy.newIdentifier());

var builder = ServerConnectionCache.newBuilder();
final var exec = Executors.newFixedThreadPool(3, Thread.ofVirtual().factory());
final var exec = Executors.newVirtualThreadPerTaskExecutor();
serverRouter = new LocalServer(prefix, serverMember).router(builder);
clientRouter = new LocalServer(prefix, clientMember).router(builder);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void observer() throws Exception {
var clientMember = new ControlledIdentifierMember(stereotomy.newIdentifier());

var builder = ServerConnectionCache.newBuilder();
final var exec = Executors.newFixedThreadPool(3, Thread.ofVirtual().factory());
final var exec = Executors.newVirtualThreadPerTaskExecutor();
serverRouter = new LocalServer(prefix, serverMember).router(builder);
clientRouter = new LocalServer(prefix, clientMember).router(builder);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void validation() throws Exception {
var clientMember = new ControlledIdentifierMember(stereotomy.newIdentifier());

var builder = ServerConnectionCache.newBuilder();
final var exec = Executors.newFixedThreadPool(3, Thread.ofVirtual().factory());
final var exec = Executors.newVirtualThreadPerTaskExecutor();
serverRouter = new LocalServer(prefix, serverMember).router(builder);
clientRouter = new LocalServer(prefix, clientMember).router(builder);

Expand Down
42 changes: 1 addition & 41 deletions thoth/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,51 +35,11 @@
<appender-ref ref="STDOUT" />
</logger>

<logger name="com.salesforce.apollo.ring.SliceIterator" level="warn" additivity="false">
<appender-ref ref="STDOUT" />
</logger>

<logger name="com.salesforce.apollo.ring.RingIterator" level="warn" additivity="false">
<appender-ref ref="STDOUT" />
</logger>

<logger name="com.salesforce.apollo.ring.RingCommunications" level="warn" additivity="false">
<appender-ref ref="STDOUT" />
</logger>

<logger name="com.salesforce.apollo.membership" level="warn" additivity="false">
<appender-ref ref="STDOUT" />
</logger>

<logger name="com.salesforce.apollo.stereotomy" level="trace" additivity="false">
<appender-ref ref="STDOUT" />
</logger>

<logger name="com.salesforce.apollo.stereotomy.db" level="info" additivity="false">
<appender-ref ref="STDOUT" />
</logger>

<logger name="com.salesforce.apollo.thoth" level="trace" additivity="false">
<appender-ref ref="STDOUT" />
</logger>

<logger name="com.salesforce.apollo.thoth.KerlDHT" level="info" additivity="false">
<appender-ref ref="STDOUT" />
</logger>

<logger name="com.salesforce.apollo.thoth.KerlSpace" level="info" additivity="false">
<appender-ref ref="STDOUT" />
</logger>

<logger name="com.salesforce.apollo.thoth.grpc" level="info" additivity="false">
<appender-ref ref="STDOUT" />
</logger>

<logger name="com.salesforce.apollo.thoth.metrics" level="info" additivity="false">
<appender-ref ref="STDOUT" />
</logger>

<logger name="com.salesforce.apollo.gorgoneion" level="trace" additivity="false">
<logger name="com.salesforce.apollo.thoth" level="info" additivity="false">
<appender-ref ref="STDOUT" />
</logger>

Expand Down

0 comments on commit ba0321a

Please sign in to comment.