Skip to content

Commit

Permalink
further thread pool clean up. Fix BootstrappingTest
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Nov 1, 2023
1 parent ba0321a commit f61ea27
Show file tree
Hide file tree
Showing 10 changed files with 269 additions and 298 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,9 @@
* @author hal.hildebrand
*/
public class Enclave implements RouterSupplier {
protected final static Executor executor = Executors.newVirtualThreadPerTaskExecutor();
private final static Class<? extends io.netty.channel.Channel> channelType = getChannelType();
private static final Logger log = LoggerFactory.getLogger(
Enclave.class);
private final static Executor executor = Executors.newVirtualThreadPerTaskExecutor();
private final static Class<? extends io.netty.channel.Channel> channelType = getChannelType();
private static final Logger log = LoggerFactory.getLogger(Enclave.class);

private final DomainSocketAddress bridge;
private final Consumer<Digest> contextRegistration;
Expand Down Expand Up @@ -103,7 +102,7 @@ public Digest getAgent() {
public Digest getFrom() {
return Router.SERVER_CLIENT_ID_KEY.get();
}
}, contextRegistration, executor);
}, contextRegistration);
}

private ManagedChannel connectTo(Member to) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* @author hal.hildebrand
*/
public class LocalServer implements RouterSupplier {
private static final Executor executor = Executors.newVirtualThreadPerTaskExecutor();
private final static Executor executor = Executors.newVirtualThreadPerTaskExecutor();
private static final Logger log = LoggerFactory.getLogger(LocalServer.class);
private static final String NAME_TEMPLATE = "%s-%s";

Expand Down Expand Up @@ -87,7 +87,8 @@ public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier<Li
public Digest getFrom() {
return Router.SERVER_CLIENT_ID_KEY.get();
}
}, executor);
}, d -> {
});
}

private ManagedChannel connectTo(Member to) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,27 @@
*/
package com.salesforce.apollo.archipelago;

import static com.salesforce.apollo.archipelago.Router.SERVER_CONTEXT_KEY;
import com.salesforce.apollo.crypto.Digest;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.Consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.salesforce.apollo.crypto.Digest;

import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import static com.salesforce.apollo.archipelago.Router.SERVER_CONTEXT_KEY;

/**
* Service implementation routable by Digest context
*
* @author hal.hildebrand
*
*/
public class RoutableService<Service> {
private static final Logger log = LoggerFactory.getLogger(RoutableService.class);

private final Executor executor;
private final Map<Digest, Service> services = new ConcurrentHashMap<>();

public RoutableService(Executor executor) {
this.executor = executor;
}
private static final Logger log = LoggerFactory.getLogger(RoutableService.class);
private final Map<Digest, Service> services = new ConcurrentHashMap<>();

public void bind(Digest context, Service service) {
services.put(context, service);
Expand All @@ -54,14 +44,12 @@ public void evaluate(StreamObserver<?> responseObserver, Consumer<Service> c) {
log.trace("No service for context {}", context);
responseObserver.onError(new StatusRuntimeException(Status.NOT_FOUND));
} else {
executor.execute(() -> {
try {
c.accept(service);
} catch (Throwable t) {
log.error("Uncaught exception in service evaluation for context: {}", context, t);
responseObserver.onError(t);
}
});
try {
c.accept(service);
} catch (Throwable t) {
log.error("Uncaught exception in service evaluation for context: {}", context, t);
responseObserver.onError(t);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,111 +6,58 @@
*/
package com.salesforce.apollo.archipelago;

import static com.salesforce.apollo.crypto.QualifiedBase64.digest;
import static com.salesforce.apollo.crypto.QualifiedBase64.qb64;
import com.netflix.concurrency.limits.Limit;
import com.netflix.concurrency.limits.limit.AIMDLimit;
import com.salesforce.apollo.archipelago.ServerConnectionCache.CreateClientCommunications;
import com.salesforce.apollo.crypto.Digest;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.protocols.ClientIdentity;
import io.grpc.*;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.util.MutableHandlerRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.concurrency.limits.Limit;
import com.netflix.concurrency.limits.limit.AIMDLimit;
import com.salesforce.apollo.archipelago.ServerConnectionCache.CreateClientCommunications;
import com.salesforce.apollo.crypto.Digest;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.protocols.ClientIdentity;

import io.grpc.BindableService;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.util.MutableHandlerRegistry;
import static com.salesforce.apollo.crypto.QualifiedBase64.digest;
import static com.salesforce.apollo.crypto.QualifiedBase64.qb64;

/**
* Context based GRPC routing
*
* @author hal.hildebrand
*
*/
public class RouterImpl implements Router {

public class CommonCommunications<Client extends Link, Service> implements Router.ClientConnector<Client> {
public static <Client> Client vanilla(Member from) {
@SuppressWarnings("unchecked")
Client client = (Client) new Link() {

@Override
public void close() throws IOException {
}

@Override
public Member getMember() {
return from;
}
};
return client;
}

private final Digest context;
private final CreateClientCommunications<Client> createFunction;
private final Member from;
private final Client localLoopback;

private final RoutableService<Service> routing;

public <T extends Member> CommonCommunications(Digest context, Member from, RoutableService<Service> routing) {
this(context, from, routing, m -> vanilla(from), vanilla(from));

}

public <T extends Member> CommonCommunications(Digest context, Member from, RoutableService<Service> routing,
CreateClientCommunications<Client> createFunction,
Client localLoopback) {
this.context = context;
this.routing = routing;
this.createFunction = createFunction;
this.localLoopback = localLoopback;
this.from = from;
}

@Override
public Client connect(Member to) {
if (to == null) {
return null;
}
return started.get() ? (to.equals(from) ? localLoopback : cache.borrow(context, to, createFunction)) : null;
}

public void deregister(Digest context) {
routing.unbind(context);
}

public void register(Digest context, Service service) {
routing.bind(context, service);
}
}

private final static Logger log = LoggerFactory.getLogger(RouterImpl.class);
private final ServerConnectionCache cache;
private final ClientIdentity clientIdentityProvider;
private final Consumer<Digest> contextRegistration;
private final Member from;
private final MutableHandlerRegistry registry = new MutableHandlerRegistry();
private final Server server;
private final Map<String, RoutableService<?>> services = new ConcurrentHashMap<>();
private final AtomicBoolean started = new AtomicBoolean();
public RouterImpl(Member from, ServerBuilder<?> serverBuilder, ServerConnectionCache.Builder cacheBuilder,
ClientIdentity clientIdentityProvider) {
this(from, serverBuilder, cacheBuilder, clientIdentityProvider, d -> {});
}
public RouterImpl(Member from, ServerBuilder<?> serverBuilder, ServerConnectionCache.Builder cacheBuilder,
ClientIdentity clientIdentityProvider, Consumer<Digest> contextRegistration) {
this.server = serverBuilder.fallbackHandlerRegistry(registry).intercept(serverInterceptor()).build();
this.cache = cacheBuilder.build();
this.clientIdentityProvider = clientIdentityProvider;
this.contextRegistration = contextRegistration;
this.from = from;
}

public static ClientInterceptor clientInterceptor(Digest ctx) {
return new ClientInterceptor() {
Expand Down Expand Up @@ -151,37 +98,6 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, Re
};
}

private final ServerConnectionCache cache;
private final ClientIdentity clientIdentityProvider;
private final Consumer<Digest> contextRegistration;
private final Executor executor;
private final Member from;
private final MutableHandlerRegistry registry = new MutableHandlerRegistry();
private final Server server;
private final Map<String, RoutableService<?>> services = new ConcurrentHashMap<>();
private final AtomicBoolean started = new AtomicBoolean();

public RouterImpl(Member from, ServerBuilder<?> serverBuilder, ServerConnectionCache.Builder cacheBuilder,
ClientIdentity clientIdentityProvider) {
this(from, serverBuilder, cacheBuilder, clientIdentityProvider, r -> r.run());
}

public RouterImpl(Member from, ServerBuilder<?> serverBuilder, ServerConnectionCache.Builder cacheBuilder,
ClientIdentity clientIdentityProvider, Consumer<Digest> contextRegistration, Executor executor) {
this.server = serverBuilder.fallbackHandlerRegistry(registry).intercept(serverInterceptor()).build();
this.cache = cacheBuilder.build();
this.clientIdentityProvider = clientIdentityProvider;
this.contextRegistration = contextRegistration;
this.executor = executor;
this.from = from;
}

public RouterImpl(Member from, ServerBuilder<?> serverBuilder, ServerConnectionCache.Builder cacheBuilder,
ClientIdentity clientIdentityProvider, Executor executor) {
this(from, serverBuilder, cacheBuilder, clientIdentityProvider, d -> {
}, executor);
}

@Override
public void close(Duration await) {
if (!started.compareAndSet(true, false)) {
Expand All @@ -197,12 +113,9 @@ public void close(Duration await) {
}

@Override
public <Client extends Link, Service extends Router.ServiceRouting> CommonCommunications<Client, Service> create(Member member,
Digest context,
Service service,
Function<RoutableService<Service>, BindableService> factory,
CreateClientCommunications<Client> createFunction,
Client localLoopback) {
public <Client extends Link, Service extends Router.ServiceRouting> CommonCommunications<Client, Service> create(
Member member, Digest context, Service service, Function<RoutableService<Service>, BindableService> factory,
CreateClientCommunications<Client> createFunction, Client localLoopback) {
return create(member, context, service, service.routing(), factory, createFunction, localLoopback);
}

Expand All @@ -213,7 +126,7 @@ public <Service, Client extends Link> CommonCommunications<Client, Service> crea
Function<RoutableService<Service>, BindableService> factory) {
@SuppressWarnings("unchecked")
RoutableService<Service> routing = (RoutableService<Service>) services.computeIfAbsent(routingLabel, c -> {
var route = new RoutableService<Service>(executor);
var route = new RoutableService<Service>();
BindableService bindableService = factory.apply(route);
registry.addService(bindableService);
return route;
Expand All @@ -233,7 +146,7 @@ public <Client extends Link, Service> CommonCommunications<Client, Service> crea
Client localLoopback) {
@SuppressWarnings("unchecked")
RoutableService<Service> routing = (RoutableService<Service>) services.computeIfAbsent(routingLabel, c -> {
var route = new RoutableService<Service>(executor);
var route = new RoutableService<Service>();
BindableService bindableService = factory.apply(route);
registry.addService(bindableService);
return route;
Expand Down Expand Up @@ -266,4 +179,59 @@ public void start() {
}
log.info("Started router: {}", server.getListenSockets());
}

public class CommonCommunications<Client extends Link, Service> implements Router.ClientConnector<Client> {
private final Digest context;
private final CreateClientCommunications<Client> createFunction;
private final Member from;
private final Client localLoopback;
private final RoutableService<Service> routing;

public <T extends Member> CommonCommunications(Digest context, Member from, RoutableService<Service> routing) {
this(context, from, routing, m -> vanilla(from), vanilla(from));

}

public <T extends Member> CommonCommunications(Digest context, Member from, RoutableService<Service> routing,
CreateClientCommunications<Client> createFunction,
Client localLoopback) {
this.context = context;
this.routing = routing;
this.createFunction = createFunction;
this.localLoopback = localLoopback;
this.from = from;
}

public static <Client> Client vanilla(Member from) {
@SuppressWarnings("unchecked")
Client client = (Client) new Link() {

@Override
public void close() throws IOException {
}

@Override
public Member getMember() {
return from;
}
};
return client;
}

@Override
public Client connect(Member to) {
if (to == null) {
return null;
}
return started.get() ? (to.equals(from) ? localLoopback : cache.borrow(context, to, createFunction)) : null;
}

public void deregister(Digest context) {
routing.unbind(context);
}

public void register(Digest context, Service service) {
routing.bind(context, service);
}
}
}
Loading

0 comments on commit f61ea27

Please sign in to comment.