Skip to content

Commit

Permalink
add outer routing agent id to header. general cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Apr 24, 2023
1 parent c57b98c commit a6097bc
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 79 deletions.
2 changes: 1 addition & 1 deletion eclipse-format.xml
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_switch" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.comment.align_tags_descriptions_grouped" value="true"/>
<setting id="org.eclipse.jdt.core.formatter.comment.line_length" value="80"/>
<setting id="org.eclipse.jdt.core.formatter.use_on_off_tags" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.use_on_off_tags" value="true"/>
<setting id="org.eclipse.jdt.core.formatter.keep_method_body_on_one_line" value="one_line_never"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_brackets_in_array_allocation_expression" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.keep_loop_body_block_on_one_line" value="one_line_never"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, Re
ServerCallHandler<ReqT, RespT> next) {
String route = requestHeaders.get(routing);
if (route == null) {
log.error("No route id in call header: {}", routing.name());
log.error("No route in call header: {}", routing.name());
throw new StatusRuntimeException(Status.UNKNOWN.withDescription("No route ID in call, missing header: "
+ routing.name()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@
*
*/
public class Enclave implements RouterSupplier {
interface RoutingClientIdentity extends ClientIdentity {
Digest getAgent();
}

private final static Class<? extends io.netty.channel.Channel> channelType = getChannelType();
private static final Logger log = LoggerFactory.getLogger(Enclave.class);

Expand Down Expand Up @@ -92,7 +96,7 @@ public DomainSocketAddress getEndpoint() {

@Override
public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier<Limit> serverLimit, Executor executor,
LimitsRegistry limitsRegistry) {
LimitsRegistry limitsRegistry) {
var limitsBuilder = new GrpcServerLimiterBuilder().limit(serverLimit.get());
if (limitsRegistry != null) {
limitsBuilder.metricRegistry(limitsRegistry);
Expand All @@ -107,12 +111,18 @@ public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier<Li
.statusSupplier(() -> Status.RESOURCE_EXHAUSTED.withDescription("Enclave server concurrency limit reached"))
.build())
.intercept(serverInterceptor());
return new RouterImpl(from, serverBuilder, cacheBuilder.setFactory(t -> connectTo(t)), new ClientIdentity() {
@Override
public Digest getFrom() {
return Router.SERVER_CLIENT_ID_KEY.get();
}
}, contextRegistration, executor);
return new RouterImpl(from, serverBuilder, cacheBuilder.setFactory(t -> connectTo(t)),
new RoutingClientIdentity() {
@Override
public Digest getAgent() {
return Router.SERVER_AGENT_ID_KEY.get();
}

@Override
public Digest getFrom() {
return Router.SERVER_CLIENT_ID_KEY.get();
}
}, contextRegistration, executor);
}

private ManagedChannel connectTo(Member to) {
Expand Down Expand Up @@ -151,7 +161,14 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, Re
log.error("No member id in call headers: {}", requestHeaders.keys());
throw new IllegalStateException("No member ID in call");
}
Context ctx = Context.current().withValue(Router.SERVER_CLIENT_ID_KEY, digest(id));
String agent = requestHeaders.get(Router.METADATA_AGENT_KEY);
if (agent == null) {
log.error("No agent id in call headers: {}", requestHeaders.keys());
throw new IllegalStateException("No agent ID in call");
}
Context ctx = Context.current()
.withValue(Router.SERVER_AGENT_ID_KEY, digest(agent))
.withValue(Router.SERVER_CLIENT_ID_KEY, digest(id));
return Contexts.interceptCall(ctx, call, requestHeaders, next);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,18 @@
import java.util.function.Function;

import com.salesforce.apollo.comm.grpc.DomainSocketServerInterceptor;
import com.salesforce.apollo.crypto.Digest;
import com.salesforce.apollo.crypto.QualifiedBase64;
import com.salesforce.apollo.membership.Member;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerBuilder;
import io.grpc.netty.DomainSocketNegotiatorHandler.DomainSocketNegotiator;
import io.grpc.netty.NettyChannelBuilder;
Expand All @@ -39,14 +48,15 @@
public class Portal<To extends Member> {
private final static Class<? extends io.netty.channel.Channel> channelType = getChannelType();

private final DomainSocketAddress bridge;
private final EventLoopGroup eventLoopGroup = getEventLoopGroup();
private final Demultiplexer inbound;
private final Duration keepAlive;
private final Demultiplexer outbound;
private final String agent;
private final EventLoopGroup eventLoopGroup = getEventLoopGroup();
private final Demultiplexer inbound;
private final Duration keepAlive;
private final Demultiplexer outbound;

public Portal(ServerBuilder<?> inbound, Function<String, ManagedChannel> outbound, DomainSocketAddress bridge,
Executor executor, Duration keepAlive, Function<String, DomainSocketAddress> router) {
public Portal(Digest agent, ServerBuilder<?> inbound, Function<String, ManagedChannel> outbound,
DomainSocketAddress bridge, Executor executor, Duration keepAlive,
Function<String, DomainSocketAddress> router) {
this.inbound = new Demultiplexer(inbound, Router.METADATA_CONTEXT_KEY, d -> handler(router.apply(d)));
this.outbound = new Demultiplexer(NettyServerBuilder.forAddress(bridge)
.executor(executor)
Expand All @@ -56,33 +66,40 @@ public Portal(ServerBuilder<?> inbound, Function<String, ManagedChannel> outboun
.bossEventLoopGroup(getEventLoopGroup())
.intercept(new DomainSocketServerInterceptor()),
Router.METADATA_TARGET_KEY, outbound);
this.bridge = bridge;
this.keepAlive = keepAlive;
this.agent = QualifiedBase64.qb64(agent);
}

public void close(Duration await) {
inbound.close(await);
outbound.close(await);
}

/**
*
* @return the domain socket address for outbound demultiplexing
*/
public DomainSocketAddress getBridge() {
return bridge;
}

public void start() throws IOException {
outbound.start();
inbound.start();
}

private ManagedChannel handler(DomainSocketAddress address) {
var clientInterceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
ClientCall<ReqT, RespT> newCall = next.newCall(method, callOptions);
return new SimpleForwardingClientCall<ReqT, RespT>(newCall) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
headers.put(Router.METADATA_AGENT_KEY, agent);
super.start(responseListener, headers);
}
};
}
};
return NettyChannelBuilder.forAddress(address)
.eventLoopGroup(eventLoopGroup)
.channelType(channelType)
.keepAliveTime(keepAlive.toNanos(), TimeUnit.NANOSECONDS)
.intercept(clientInterceptor)
.usePlaintext()
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,6 @@
*/
public interface Router {

public static final String COM_SALESFORCE_APOLLO_ARCHIPELIGO_TO_ID_SERVER = "com.salesforce.apollo.archipeligo.to.id.server";
public static final String COM_SALESFORCE_APOLLO_ARCHIPELIGO_CONTEXT_ID_SERVER = "com.salesforce.apollo.archipeligo.context.id.server";
public static final String COM_SALESFORCE_APOLLO_ARCHIPELIGO_FROM_ID_SERVER = "com.salesforce.apollo.archipeligo.from.id.server";
public static final String COM_SALESFORCE_APOLLO_ARCHIPELIGO_TO_ID = "com.salesforce.apollo.archipeligo.to.id";
public static final String COM_SALESFORCE_APOLLO_ARCHIPELIGO_CONTEXT_ID = "com.salesforce.apollo.archipeligo.context.id";
public static final String COM_SALESFORCE_APOLLO_ARCHIPELIGO_FROM_ID = "com.salesforce.apollo.archipeligo.from.id";
public static final String COM_SALESFORCE_APOLLO_ARCHIPELIGO_FROM_ID_CLIENT = "com.salesforce.apollo.archipeligo.from.id.client";

@FunctionalInterface
interface ClientConnector<Client> {
Client connect(Member to);
Expand All @@ -44,16 +36,28 @@ default String routing() {
}
}

Context.Key<Digest> CLIENT_CLIENT_ID_KEY = Context.key(COM_SALESFORCE_APOLLO_ARCHIPELIGO_FROM_ID_CLIENT);
Metadata.Key<String> METADATA_CLIENT_ID_KEY = Metadata.Key.of(COM_SALESFORCE_APOLLO_ARCHIPELIGO_FROM_ID,
Metadata.ASCII_STRING_MARSHALLER);
Metadata.Key<String> METADATA_CONTEXT_KEY = Metadata.Key.of(COM_SALESFORCE_APOLLO_ARCHIPELIGO_CONTEXT_ID,
Metadata.ASCII_STRING_MARSHALLER);
Metadata.Key<String> METADATA_TARGET_KEY = Metadata.Key.of(COM_SALESFORCE_APOLLO_ARCHIPELIGO_TO_ID,
Metadata.ASCII_STRING_MARSHALLER);
Context.Key<Digest> SERVER_CLIENT_ID_KEY = Context.key(COM_SALESFORCE_APOLLO_ARCHIPELIGO_FROM_ID_SERVER);
Context.Key<Digest> SERVER_CONTEXT_KEY = Context.key(COM_SALESFORCE_APOLLO_ARCHIPELIGO_CONTEXT_ID_SERVER);
Context.Key<Digest> SERVER_TARGET_KEY = Context.key(COM_SALESFORCE_APOLLO_ARCHIPELIGO_TO_ID_SERVER);
String COM_SALESFORCE_APOLLO_ARCHIPELIGO_AGENT_ID = "agent.id";
String COM_SALESFORCE_APOLLO_ARCHIPELIGO_AGENT_ID_SERVER = "agent.id.server";
String COM_SALESFORCE_APOLLO_ARCHIPELIGO_CONTEXT_ID = "context.id";
String COM_SALESFORCE_APOLLO_ARCHIPELIGO_CONTEXT_ID_SERVER = "context.id.server";
String COM_SALESFORCE_APOLLO_ARCHIPELIGO_FROM_ID = "from.id";
String COM_SALESFORCE_APOLLO_ARCHIPELIGO_FROM_ID_CLIENT = "from.id.client";
String COM_SALESFORCE_APOLLO_ARCHIPELIGO_FROM_ID_SERVER = "from.id.server";
String COM_SALESFORCE_APOLLO_ARCHIPELIGO_TO_ID = "to.id";
String COM_SALESFORCE_APOLLO_ARCHIPELIGO_TO_ID_SERVER = "to.id.server";
Context.Key<Digest> CLIENT_CLIENT_ID_KEY = Context.key(COM_SALESFORCE_APOLLO_ARCHIPELIGO_FROM_ID_CLIENT);
Metadata.Key<String> METADATA_AGENT_KEY = Metadata.Key.of(COM_SALESFORCE_APOLLO_ARCHIPELIGO_AGENT_ID,
Metadata.ASCII_STRING_MARSHALLER);
Metadata.Key<String> METADATA_CLIENT_ID_KEY = Metadata.Key.of(COM_SALESFORCE_APOLLO_ARCHIPELIGO_FROM_ID,
Metadata.ASCII_STRING_MARSHALLER);
Metadata.Key<String> METADATA_CONTEXT_KEY = Metadata.Key.of(COM_SALESFORCE_APOLLO_ARCHIPELIGO_CONTEXT_ID,
Metadata.ASCII_STRING_MARSHALLER);
Metadata.Key<String> METADATA_TARGET_KEY = Metadata.Key.of(COM_SALESFORCE_APOLLO_ARCHIPELIGO_TO_ID,
Metadata.ASCII_STRING_MARSHALLER);
Context.Key<Digest> SERVER_AGENT_ID_KEY = Context.key(COM_SALESFORCE_APOLLO_ARCHIPELIGO_AGENT_ID_SERVER);
Context.Key<Digest> SERVER_CLIENT_ID_KEY = Context.key(COM_SALESFORCE_APOLLO_ARCHIPELIGO_FROM_ID_SERVER);
Context.Key<Digest> SERVER_CONTEXT_KEY = Context.key(COM_SALESFORCE_APOLLO_ARCHIPELIGO_CONTEXT_ID_SERVER);
Context.Key<Digest> SERVER_TARGET_KEY = Context.key(COM_SALESFORCE_APOLLO_ARCHIPELIGO_TO_ID_SERVER);

void close(Duration await);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,14 @@
import com.salesforce.apollo.archipelago.ManagedServerChannel;
import com.salesforce.apollo.archipelago.Portal;
import com.salesforce.apollo.archipelago.RoutableService;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
import com.salesforce.apollo.comm.grpc.DomainSocketServerInterceptor;
import com.salesforce.apollo.crypto.Digest;
import com.salesforce.apollo.crypto.DigestAlgorithm;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.membership.impl.SigningMemberImpl;
import com.salesforce.apollo.utils.Utils;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.netty.DomainSocketNegotiatorHandler.DomainSocketNegotiator;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
Expand Down Expand Up @@ -134,23 +125,6 @@ public static interface TestItService extends Link {

private final static Class<? extends io.netty.channel.Channel> channelType = getChannelType();

public static ClientInterceptor clientInterceptor(Digest ctx) {
return new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
ClientCall<ReqT, RespT> newCall = next.newCall(method, callOptions);
return new SimpleForwardingClientCall<ReqT, RespT>(newCall) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
headers.put(Router.METADATA_CLIENT_ID_KEY, qb64(ctx));
super.start(responseListener, headers);
}
};
}
};
}

private EventLoopGroup eventLoopGroup;
private final TestItService local = new TestItService() {

Expand Down Expand Up @@ -198,7 +172,9 @@ public void smokin() throws Exception {
final var portalEndpoint = new DomainSocketAddress(Path.of("target")
.resolve(UUID.randomUUID().toString())
.toFile());
final var portal = new Portal<>(NettyServerBuilder.forAddress(portalEndpoint)
final var agent = DigestAlgorithm.DEFAULT.getLast();
final var portal = new Portal<>(agent,
NettyServerBuilder.forAddress(portalEndpoint)
.protocolNegotiator(new DomainSocketNegotiator())
.channelType(getServerDomainSocketChannelClass())
.workerEventLoopGroup(getEventLoopGroup())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public ProcessDomain(Digest group, ControlledIdentifierMember member, Builder bu
bridge = new DomainSocketAddress(communicationsDirectory.resolve(UUID.randomUUID().toString()).toFile());
portalEndpoint = new DomainSocketAddress(communicationsDirectory.resolve(UUID.randomUUID().toString())
.toFile());
portal = new Portal<Member>(NettyServerBuilder.forAddress(portalEndpoint)
portal = new Portal<Member>(member.getId(),
NettyServerBuilder.forAddress(portalEndpoint)
.protocolNegotiator(new DomainSocketNegotiator())
.channelType(getServerDomainSocketChannelClass())
.workerEventLoopGroup(portalEventLoopGroup)
Expand Down
Loading

0 comments on commit a6097bc

Please sign in to comment.