diff --git a/eclipse-format.xml b/eclipse-format.xml index fb910b61b..d82475235 100644 --- a/eclipse-format.xml +++ b/eclipse-format.xml @@ -245,7 +245,7 @@ - + diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/Demultiplexer.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/Demultiplexer.java index 2a9daeacc..f0618ce71 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/Demultiplexer.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/Demultiplexer.java @@ -53,7 +53,7 @@ public ServerCall.Listener interceptCall(ServerCall next) { String route = requestHeaders.get(routing); if (route == null) { - log.error("No route id in call header: {}", routing.name()); + log.error("No route in call header: {}", routing.name()); throw new StatusRuntimeException(Status.UNKNOWN.withDescription("No route ID in call, missing header: " + routing.name())); } diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java index 9fd4d6e60..e9138595f 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java @@ -57,6 +57,10 @@ * */ public class Enclave implements RouterSupplier { + interface RoutingClientIdentity extends ClientIdentity { + Digest getAgent(); + } + private final static Class channelType = getChannelType(); private static final Logger log = LoggerFactory.getLogger(Enclave.class); @@ -92,7 +96,7 @@ public DomainSocketAddress getEndpoint() { @Override public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier serverLimit, Executor executor, - LimitsRegistry limitsRegistry) { + LimitsRegistry limitsRegistry) { var limitsBuilder = new GrpcServerLimiterBuilder().limit(serverLimit.get()); if (limitsRegistry != null) { limitsBuilder.metricRegistry(limitsRegistry); @@ -107,12 +111,18 @@ public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier
  • Status.RESOURCE_EXHAUSTED.withDescription("Enclave server concurrency limit reached")) .build()) .intercept(serverInterceptor()); - return new RouterImpl(from, serverBuilder, cacheBuilder.setFactory(t -> connectTo(t)), new ClientIdentity() { - @Override - public Digest getFrom() { - return Router.SERVER_CLIENT_ID_KEY.get(); - } - }, contextRegistration, executor); + return new RouterImpl(from, serverBuilder, cacheBuilder.setFactory(t -> connectTo(t)), + new RoutingClientIdentity() { + @Override + public Digest getAgent() { + return Router.SERVER_AGENT_ID_KEY.get(); + } + + @Override + public Digest getFrom() { + return Router.SERVER_CLIENT_ID_KEY.get(); + } + }, contextRegistration, executor); } private ManagedChannel connectTo(Member to) { @@ -151,7 +161,14 @@ public ServerCall.Listener interceptCall(ServerCall { private final static Class channelType = getChannelType(); - private final DomainSocketAddress bridge; - private final EventLoopGroup eventLoopGroup = getEventLoopGroup(); - private final Demultiplexer inbound; - private final Duration keepAlive; - private final Demultiplexer outbound; + private final String agent; + private final EventLoopGroup eventLoopGroup = getEventLoopGroup(); + private final Demultiplexer inbound; + private final Duration keepAlive; + private final Demultiplexer outbound; - public Portal(ServerBuilder inbound, Function outbound, DomainSocketAddress bridge, - Executor executor, Duration keepAlive, Function router) { + public Portal(Digest agent, ServerBuilder inbound, Function outbound, + DomainSocketAddress bridge, Executor executor, Duration keepAlive, + Function router) { this.inbound = new Demultiplexer(inbound, Router.METADATA_CONTEXT_KEY, d -> handler(router.apply(d))); this.outbound = new Demultiplexer(NettyServerBuilder.forAddress(bridge) .executor(executor) @@ -56,8 +66,8 @@ public Portal(ServerBuilder inbound, Function outboun .bossEventLoopGroup(getEventLoopGroup()) .intercept(new DomainSocketServerInterceptor()), Router.METADATA_TARGET_KEY, outbound); - this.bridge = bridge; this.keepAlive = keepAlive; + this.agent = QualifiedBase64.qb64(agent); } public void close(Duration await) { @@ -65,24 +75,31 @@ public void close(Duration await) { outbound.close(await); } - /** - * - * @return the domain socket address for outbound demultiplexing - */ - public DomainSocketAddress getBridge() { - return bridge; - } - public void start() throws IOException { outbound.start(); inbound.start(); } private ManagedChannel handler(DomainSocketAddress address) { + var clientInterceptor = new ClientInterceptor() { + @Override + public ClientCall interceptCall(MethodDescriptor method, + CallOptions callOptions, Channel next) { + ClientCall newCall = next.newCall(method, callOptions); + return new SimpleForwardingClientCall(newCall) { + @Override + public void start(Listener responseListener, Metadata headers) { + headers.put(Router.METADATA_AGENT_KEY, agent); + super.start(responseListener, headers); + } + }; + } + }; return NettyChannelBuilder.forAddress(address) .eventLoopGroup(eventLoopGroup) .channelType(channelType) .keepAliveTime(keepAlive.toNanos(), TimeUnit.NANOSECONDS) + .intercept(clientInterceptor) .usePlaintext() .build(); } diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/Router.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/Router.java index 851a4408f..eab392eb1 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/Router.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/Router.java @@ -25,14 +25,6 @@ */ public interface Router { - public static final String COM_SALESFORCE_APOLLO_ARCHIPELIGO_TO_ID_SERVER = "com.salesforce.apollo.archipeligo.to.id.server"; - public static final String COM_SALESFORCE_APOLLO_ARCHIPELIGO_CONTEXT_ID_SERVER = "com.salesforce.apollo.archipeligo.context.id.server"; - public static final String COM_SALESFORCE_APOLLO_ARCHIPELIGO_FROM_ID_SERVER = "com.salesforce.apollo.archipeligo.from.id.server"; - public static final String COM_SALESFORCE_APOLLO_ARCHIPELIGO_TO_ID = "com.salesforce.apollo.archipeligo.to.id"; - public static final String COM_SALESFORCE_APOLLO_ARCHIPELIGO_CONTEXT_ID = "com.salesforce.apollo.archipeligo.context.id"; - public static final String COM_SALESFORCE_APOLLO_ARCHIPELIGO_FROM_ID = "com.salesforce.apollo.archipeligo.from.id"; - public static final String COM_SALESFORCE_APOLLO_ARCHIPELIGO_FROM_ID_CLIENT = "com.salesforce.apollo.archipeligo.from.id.client"; - @FunctionalInterface interface ClientConnector { Client connect(Member to); @@ -44,16 +36,28 @@ default String routing() { } } - Context.Key CLIENT_CLIENT_ID_KEY = Context.key(COM_SALESFORCE_APOLLO_ARCHIPELIGO_FROM_ID_CLIENT); - Metadata.Key METADATA_CLIENT_ID_KEY = Metadata.Key.of(COM_SALESFORCE_APOLLO_ARCHIPELIGO_FROM_ID, - Metadata.ASCII_STRING_MARSHALLER); - Metadata.Key METADATA_CONTEXT_KEY = Metadata.Key.of(COM_SALESFORCE_APOLLO_ARCHIPELIGO_CONTEXT_ID, - Metadata.ASCII_STRING_MARSHALLER); - Metadata.Key METADATA_TARGET_KEY = Metadata.Key.of(COM_SALESFORCE_APOLLO_ARCHIPELIGO_TO_ID, - Metadata.ASCII_STRING_MARSHALLER); - Context.Key SERVER_CLIENT_ID_KEY = Context.key(COM_SALESFORCE_APOLLO_ARCHIPELIGO_FROM_ID_SERVER); - Context.Key SERVER_CONTEXT_KEY = Context.key(COM_SALESFORCE_APOLLO_ARCHIPELIGO_CONTEXT_ID_SERVER); - Context.Key SERVER_TARGET_KEY = Context.key(COM_SALESFORCE_APOLLO_ARCHIPELIGO_TO_ID_SERVER); + String COM_SALESFORCE_APOLLO_ARCHIPELIGO_AGENT_ID = "agent.id"; + String COM_SALESFORCE_APOLLO_ARCHIPELIGO_AGENT_ID_SERVER = "agent.id.server"; + String COM_SALESFORCE_APOLLO_ARCHIPELIGO_CONTEXT_ID = "context.id"; + String COM_SALESFORCE_APOLLO_ARCHIPELIGO_CONTEXT_ID_SERVER = "context.id.server"; + String COM_SALESFORCE_APOLLO_ARCHIPELIGO_FROM_ID = "from.id"; + String COM_SALESFORCE_APOLLO_ARCHIPELIGO_FROM_ID_CLIENT = "from.id.client"; + String COM_SALESFORCE_APOLLO_ARCHIPELIGO_FROM_ID_SERVER = "from.id.server"; + String COM_SALESFORCE_APOLLO_ARCHIPELIGO_TO_ID = "to.id"; + String COM_SALESFORCE_APOLLO_ARCHIPELIGO_TO_ID_SERVER = "to.id.server"; + Context.Key CLIENT_CLIENT_ID_KEY = Context.key(COM_SALESFORCE_APOLLO_ARCHIPELIGO_FROM_ID_CLIENT); + Metadata.Key METADATA_AGENT_KEY = Metadata.Key.of(COM_SALESFORCE_APOLLO_ARCHIPELIGO_AGENT_ID, + Metadata.ASCII_STRING_MARSHALLER); + Metadata.Key METADATA_CLIENT_ID_KEY = Metadata.Key.of(COM_SALESFORCE_APOLLO_ARCHIPELIGO_FROM_ID, + Metadata.ASCII_STRING_MARSHALLER); + Metadata.Key METADATA_CONTEXT_KEY = Metadata.Key.of(COM_SALESFORCE_APOLLO_ARCHIPELIGO_CONTEXT_ID, + Metadata.ASCII_STRING_MARSHALLER); + Metadata.Key METADATA_TARGET_KEY = Metadata.Key.of(COM_SALESFORCE_APOLLO_ARCHIPELIGO_TO_ID, + Metadata.ASCII_STRING_MARSHALLER); + Context.Key SERVER_AGENT_ID_KEY = Context.key(COM_SALESFORCE_APOLLO_ARCHIPELIGO_AGENT_ID_SERVER); + Context.Key SERVER_CLIENT_ID_KEY = Context.key(COM_SALESFORCE_APOLLO_ARCHIPELIGO_FROM_ID_SERVER); + Context.Key SERVER_CONTEXT_KEY = Context.key(COM_SALESFORCE_APOLLO_ARCHIPELIGO_CONTEXT_ID_SERVER); + Context.Key SERVER_TARGET_KEY = Context.key(COM_SALESFORCE_APOLLO_ARCHIPELIGO_TO_ID_SERVER); void close(Duration await); diff --git a/memberships/src/test/java/com/salesforce/apollo/archipeligo/EnclaveTest.java b/memberships/src/test/java/com/salesforce/apollo/archipeligo/EnclaveTest.java index ba94c7b5b..a55df834b 100644 --- a/memberships/src/test/java/com/salesforce/apollo/archipeligo/EnclaveTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/archipeligo/EnclaveTest.java @@ -37,23 +37,14 @@ import com.salesforce.apollo.archipelago.ManagedServerChannel; import com.salesforce.apollo.archipelago.Portal; import com.salesforce.apollo.archipelago.RoutableService; -import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; import com.salesforce.apollo.comm.grpc.DomainSocketServerInterceptor; -import com.salesforce.apollo.crypto.Digest; import com.salesforce.apollo.crypto.DigestAlgorithm; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.impl.SigningMemberImpl; import com.salesforce.apollo.utils.Utils; -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ClientCall; -import io.grpc.ClientInterceptor; -import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.ManagedChannel; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; import io.grpc.netty.DomainSocketNegotiatorHandler.DomainSocketNegotiator; import io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.NettyServerBuilder; @@ -134,23 +125,6 @@ public static interface TestItService extends Link { private final static Class channelType = getChannelType(); - public static ClientInterceptor clientInterceptor(Digest ctx) { - return new ClientInterceptor() { - @Override - public ClientCall interceptCall(MethodDescriptor method, - CallOptions callOptions, Channel next) { - ClientCall newCall = next.newCall(method, callOptions); - return new SimpleForwardingClientCall(newCall) { - @Override - public void start(Listener responseListener, Metadata headers) { - headers.put(Router.METADATA_CLIENT_ID_KEY, qb64(ctx)); - super.start(responseListener, headers); - } - }; - } - }; - } - private EventLoopGroup eventLoopGroup; private final TestItService local = new TestItService() { @@ -198,7 +172,9 @@ public void smokin() throws Exception { final var portalEndpoint = new DomainSocketAddress(Path.of("target") .resolve(UUID.randomUUID().toString()) .toFile()); - final var portal = new Portal<>(NettyServerBuilder.forAddress(portalEndpoint) + final var agent = DigestAlgorithm.DEFAULT.getLast(); + final var portal = new Portal<>(agent, + NettyServerBuilder.forAddress(portalEndpoint) .protocolNegotiator(new DomainSocketNegotiator()) .channelType(getServerDomainSocketChannelClass()) .workerEventLoopGroup(getEventLoopGroup()) diff --git a/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java b/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java index f879a7a39..4e4a42dc9 100644 --- a/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java +++ b/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java @@ -135,7 +135,8 @@ public ProcessDomain(Digest group, ControlledIdentifierMember member, Builder bu bridge = new DomainSocketAddress(communicationsDirectory.resolve(UUID.randomUUID().toString()).toFile()); portalEndpoint = new DomainSocketAddress(communicationsDirectory.resolve(UUID.randomUUID().toString()) .toFile()); - portal = new Portal(NettyServerBuilder.forAddress(portalEndpoint) + portal = new Portal(member.getId(), + NettyServerBuilder.forAddress(portalEndpoint) .protocolNegotiator(new DomainSocketNegotiator()) .channelType(getServerDomainSocketChannelClass()) .workerEventLoopGroup(portalEventLoopGroup) diff --git a/model/src/main/java/com/salesforce/apollo/model/SubDomain.java b/model/src/main/java/com/salesforce/apollo/model/SubDomain.java index e6b212daf..ecb7ffb11 100644 --- a/model/src/main/java/com/salesforce/apollo/model/SubDomain.java +++ b/model/src/main/java/com/salesforce/apollo/model/SubDomain.java @@ -50,13 +50,13 @@ * */ public class SubDomain extends Domain { - private static final String DELEGATES_MAP_TEMPLATE = "delegates-%s"; - private static final String DELEGATORS_MAP_TEMPLATE = "delegators-%s"; - private final static Logger log = LoggerFactory.getLogger(SubDomain.class); + private static final String DELEGATES_MAP_TEMPLATE = "delegates-%s"; + private static final String DELEGATIONS_MAP_TEMPLATE = "delegations-%s"; + private final static Logger log = LoggerFactory.getLogger(SubDomain.class); private final MVMap delegates; @SuppressWarnings("unused") - private final MVMap delegators; + private final MVMap delegations; private final double fpr; private final Duration gossipInterval; private final int maxTransfer; @@ -87,10 +87,10 @@ public SubDomain(ControlledIdentifierMember member, Builder prm, String dbURL, P builder.setFileName(checkpointBaseDir.resolve(identifier).toFile()); store = builder.build(); delegates = store.openMap(DELEGATES_MAP_TEMPLATE.formatted(identifier)); - delegators = store.openMap(DELEGATORS_MAP_TEMPLATE.formatted(identifier)); + delegations = store.openMap(DELEGATIONS_MAP_TEMPLATE.formatted(identifier)); CommonCommunications comms = params.communications() .create(member, params.context().getId(), delegation(), - "delegation", + "delegates", r -> new DelegationServer(params.communications() .getClientIdentityProvider(), r, null)); diff --git a/model/src/test/java/com/salesforce/apollo/model/demesnes/DemesneTest.java b/model/src/test/java/com/salesforce/apollo/model/demesnes/DemesneTest.java index e75f9ad97..39694653a 100644 --- a/model/src/test/java/com/salesforce/apollo/model/demesnes/DemesneTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/demesnes/DemesneTest.java @@ -215,6 +215,7 @@ public void portal() throws Exception { final var ctxB = DigestAlgorithm.DEFAULT.getLast().prefix(0x666); var serverMember1 = new SigningMemberImpl(Utils.getMember(0)); var serverMember2 = new SigningMemberImpl(Utils.getMember(1)); + final var bridge = new DomainSocketAddress(Path.of("target").resolve(UUID.randomUUID().toString()).toFile()); final var exec = Executors.newVirtualThreadPerTaskExecutor(); @@ -223,7 +224,8 @@ public void portal() throws Exception { .resolve(UUID.randomUUID().toString()) .toFile()); final var routes = new HashMap(); - final var portal = new Portal<>(NettyServerBuilder.forAddress(portalEndpoint) + final var portal = new Portal<>(serverMember1.getId(), + NettyServerBuilder.forAddress(portalEndpoint) .protocolNegotiator(new DomainSocketNegotiator()) .channelType(getServerDomainSocketChannelClass()) .workerEventLoopGroup(getEventLoopGroup())