From eae25e71679f0b6f71f913c994ddca3b2553c2fe Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Tue, 18 Jun 2024 19:52:13 -0700 Subject: [PATCH] enjoin. replicated join to observers in the cluster --- .../apollo/fireflies/FireflyMetrics.java | 3 +- .../apollo/fireflies/FireflyMetricsImpl.java | 12 +++- .../com/salesforce/apollo/fireflies/View.java | 4 ++ .../apollo/fireflies/ViewManagement.java | 64 +++++++++++++++++-- .../fireflies/comm/gossip/FfClient.java | 15 ++--- .../fireflies/comm/gossip/FfServer.java | 34 ++++++++-- .../fireflies/comm/gossip/Fireflies.java | 12 +++- grpc/src/main/proto/fireflies.proto | 1 + 8 files changed, 120 insertions(+), 25 deletions(-) diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/FireflyMetrics.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/FireflyMetrics.java index 49ac68e90..1b2f23466 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/FireflyMetrics.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/FireflyMetrics.java @@ -13,7 +13,6 @@ /** * @author hal.hildebrand - * */ public interface FireflyMetrics extends EndpointMetrics { @@ -25,6 +24,8 @@ public interface FireflyMetrics extends EndpointMetrics { Histogram gossipResponse(); + Timer inboundEnjoinDuration(); + Histogram inboundGateway(); Histogram inboundGossip(); diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/FireflyMetricsImpl.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/FireflyMetricsImpl.java index 1e1f5925f..3583d2443 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/FireflyMetricsImpl.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/FireflyMetricsImpl.java @@ -6,8 +6,6 @@ */ package com.salesforce.apollo.fireflies; -import static com.codahale.metrics.MetricRegistry.name; - import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; @@ -15,9 +13,10 @@ import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.protocols.EndpointMetricsImpl; +import static com.codahale.metrics.MetricRegistry.name; + /** * @author hal.hildebrand - * */ public class FireflyMetricsImpl extends EndpointMetricsImpl implements FireflyMetrics { private final Meter accusations; @@ -48,6 +47,7 @@ public class FireflyMetricsImpl extends EndpointMetricsImpl implements FireflyMe private final Timer seedDuration; private final Meter shunnedGossip; private final Meter viewChanges; + private final Timer inboundEnjoinDuration; public FireflyMetricsImpl(Digest context, MetricRegistry registry) { super(registry); @@ -80,6 +80,7 @@ public FireflyMetricsImpl(Digest context, MetricRegistry registry) { shunnedGossip = registry.meter(name(context.shortString(), "ff.gossip.shunned")); inboundSeed = registry.histogram(name(context.shortString(), "ff.seed.inbound.bytes")); viewChanges = registry.meter(name(context.shortString(), "ff.view.change")); + inboundEnjoinDuration = registry.timer(name(context.shortString(), "ff.enjoin.duration")); } @Override @@ -102,6 +103,11 @@ public Histogram gossipResponse() { return gossipResponse; } + @Override + public Timer inboundEnjoinDuration() { + return inboundEnjoinDuration; + } + @Override public Histogram inboundGateway() { return inboundGateway; diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java index eef938c7a..1df306b37 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java @@ -1839,6 +1839,10 @@ boolean setNote(NoteWrapper next) { public class Service implements EntranceService, FFService, ServiceRouting { + public void enjoin(Join join, Digest from) { + viewManagement.enjoin(join, from); + } + /** * Asynchronously add a member to the next view */ diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java index 5e18e4338..f070be7ec 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java @@ -19,6 +19,7 @@ import com.salesforce.apollo.fireflies.proto.Update.Builder; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.ReservoirSampler; +import com.salesforce.apollo.ring.SliceIterator; import com.salesforce.apollo.stereotomy.identifier.SelfAddressingIdentifier; import com.salesforce.apollo.utils.Entropy; import com.salesforce.apollo.utils.Utils; @@ -123,6 +124,55 @@ Digest currentView() { return currentView.get(); } + void enjoin(Join join, Digest observer) { + final var joinView = Digest.from(join.getView()); + final var from = observer; + if (!joined()) { + log.trace("Not joined, ignored enjoin of view: {} from: {} on: {}", joinView, from, node.getId()); + throw new StatusRuntimeException(Status.FAILED_PRECONDITION.withDescription( + "Not joined, ignored join of view: %s from: %s on: %s".formatted(joinView, from, node.getId()))); + } + var note = new NoteWrapper(join.getNote(), digestAlgo); + if (!view.validate(note.getIdentifier())) { + log.debug("Ignored enjoin of view: {} from: {} invalid identifier on: {}", joinView, from, node.getId()); + throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Invalid identifier")); + } + view.stable(() -> { + var thisView = currentView(); + log.debug("Enjoin requested from: {} view: {} context: {} cardinality: {} on: {}", from, thisView, + context.getId(), cardinality(), node.getId()); + if (contains(from)) { + log.debug("Already a member: {} view: {} context: {} cardinality: {} on: {}", from, thisView, + context.getId(), cardinality(), node.getId()); + return; + } + if (!observers.contains(node.getId())) { + log.trace("Not observer, ignoring Join from: {} observers: {} on: {}", from, observers, node.getId()); + throw new StatusRuntimeException( + Status.FAILED_PRECONDITION.withDescription("Not observer, ignored join of view")); + } + if (!thisView.equals(joinView)) { + throw new StatusRuntimeException( + Status.OUT_OF_RANGE.withDescription("View: " + joinView + " does not match: " + thisView)); + } + if (!View.isValidMask(note.getMask(), context)) { + log.warn( + "Invalid enjoin mask: {} majority: {} from member: {} view: {} context: {} cardinality: {} on: {}", + note.getMask(), context.majority(), from, thisView, context.getId(), cardinality(), node.getId()); + } + if (pendingJoins.size() >= params.maxPending()) { + throw new StatusRuntimeException(Status.RESOURCE_EXHAUSTED.withDescription("No room at the inn")); + } + pendingJoins.computeIfAbsent(from, d -> seeds -> { + log.info("Gateway established for: {} (enjoined) view: {} context: {} cardinality: {} on: {}", from, + currentView(), context.getId(), cardinality(), node.getId()); + }); + joins.put(note.getId(), note); + log.debug("Member pending enjoin: {} view: {} context: {} on: {}", from, currentView(), context.getId(), + node.getId()); + }); + } + void gc(Participant member) { assert member != null; view.stable(() -> { @@ -286,9 +336,9 @@ void join(Join join, Digest from, StreamObserver responseObserver, Time return; } if (!observers.contains(node.getId())) { - log.warn("Join not observer! from: {} observers: {} on: {}", from, observers, node.getId()); - responseObserver.onNext(Gateway.getDefaultInstance()); - return; + log.trace("Not observer, ignoring Join from: {} observers: {} on: {}", from, observers, node.getId()); + responseObserver.onError(new StatusRuntimeException( + Status.FAILED_PRECONDITION.withDescription("Not observer, ignored join of view"))); } if (!thisView.equals(joinView)) { responseObserver.onError(new StatusRuntimeException( @@ -313,6 +363,10 @@ void join(Join join, Digest from, StreamObserver responseObserver, Time joins.put(note.getId(), note); log.debug("Member pending join: {} view: {} context: {} on: {}", from, currentView(), context.getId(), node.getId()); + var enjoining = new SliceIterator<>("Enjoining[%s:%s]".formatted(currentView(), from), node, + observers.stream().map(context::getActiveMember).toList(), view.comm); + enjoining.iterate(t -> t.enjoin(join), (_, _, _, _) -> true, () -> { + }, Duration.ofMillis(1)); }); } @@ -378,8 +432,8 @@ boolean joined() { */ void maybeViewChange() { if (context.size() == 1 && joins.size() < context.getRingCount() - 1) { - log.info("Cannot form cluster: {} with: {} members, required:4 on: {}", currentView(), - joins.size() + context.size(), node.getId()); + log.trace("Cannot form cluster: {} with: {} members, required > 3 on: {}", currentView(), + joins.size() + context.size(), node.getId()); view.scheduleViewChange(); return; } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/comm/gossip/FfClient.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/comm/gossip/FfClient.java index fea51e129..f73b62ce3 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/comm/gossip/FfClient.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/comm/gossip/FfClient.java @@ -10,10 +10,7 @@ import com.salesforce.apollo.archipelago.ManagedServerChannel; import com.salesforce.apollo.archipelago.ServerConnectionCache.CreateClientCommunications; import com.salesforce.apollo.fireflies.FireflyMetrics; -import com.salesforce.apollo.fireflies.proto.FirefliesGrpc; -import com.salesforce.apollo.fireflies.proto.Gossip; -import com.salesforce.apollo.fireflies.proto.SayWhat; -import com.salesforce.apollo.fireflies.proto.State; +import com.salesforce.apollo.fireflies.proto.*; import com.salesforce.apollo.membership.Member; /** @@ -42,6 +39,12 @@ public void close() { channel.release(); } + @Override + public Void enjoin(Join join) { + channel.wrap(FirefliesGrpc.newFutureStub(channel)).enjoin(join); + return null; + } + @Override public Member getMember() { return channel.getMember(); @@ -63,10 +66,6 @@ public Gossip gossip(SayWhat sw) { return result; } - public void release() { - close(); - } - @Override public String toString() { return String.format("->[%s]", channel.getMember()); diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/comm/gossip/FfServer.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/comm/gossip/FfServer.java index 3944a7b6b..8ad78e252 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/comm/gossip/FfServer.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/comm/gossip/FfServer.java @@ -8,16 +8,16 @@ import com.codahale.metrics.Timer.Context; import com.google.protobuf.Empty; -import com.salesforce.apollo.fireflies.proto.FirefliesGrpc.FirefliesImplBase; -import com.salesforce.apollo.fireflies.proto.Gossip; -import com.salesforce.apollo.fireflies.proto.SayWhat; -import com.salesforce.apollo.fireflies.proto.State; import com.salesforce.apollo.archipelago.RoutableService; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.fireflies.FireflyMetrics; import com.salesforce.apollo.fireflies.View.Service; +import com.salesforce.apollo.fireflies.proto.FirefliesGrpc.FirefliesImplBase; +import com.salesforce.apollo.fireflies.proto.Gossip; +import com.salesforce.apollo.fireflies.proto.Join; +import com.salesforce.apollo.fireflies.proto.SayWhat; +import com.salesforce.apollo.fireflies.proto.State; import com.salesforce.apollo.protocols.ClientIdentity; - import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; @@ -36,6 +36,29 @@ public FfServer(ClientIdentity identity, RoutableService r, FireflyMetr this.router = r; } + @Override + public void enjoin(Join request, StreamObserver responseObserver) { + Context timer = metrics == null ? null : metrics.inboundEnjoinDuration().time(); + if (metrics != null) { + var serializedSize = request.getSerializedSize(); + metrics.inboundBandwidth().mark(serializedSize); + metrics.inboundGossip().update(serializedSize); + } + Digest from = identity.getFrom(); + if (from == null) { + responseObserver.onError(new IllegalStateException("Member has been removed")); + return; + } + router.evaluate(responseObserver, s -> { + s.enjoin(request, from); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + if (timer != null) { + timer.stop(); + } + }); + } + @Override public void gossip(SayWhat request, StreamObserver responseObserver) { Context timer = metrics == null ? null : metrics.inboundGossipDuration().time(); @@ -99,5 +122,4 @@ public void update(State request, StreamObserver responseObserver) { } }); } - } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/comm/gossip/Fireflies.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/comm/gossip/Fireflies.java index 5fd6bc2fd..c71c48754 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/comm/gossip/Fireflies.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/comm/gossip/Fireflies.java @@ -6,11 +6,12 @@ */ package com.salesforce.apollo.fireflies.comm.gossip; +import com.salesforce.apollo.archipelago.Link; +import com.salesforce.apollo.fireflies.View.Node; import com.salesforce.apollo.fireflies.proto.Gossip; +import com.salesforce.apollo.fireflies.proto.Join; import com.salesforce.apollo.fireflies.proto.SayWhat; import com.salesforce.apollo.fireflies.proto.State; -import com.salesforce.apollo.archipelago.Link; -import com.salesforce.apollo.fireflies.View.Node; import com.salesforce.apollo.membership.Member; import java.io.IOException; @@ -27,6 +28,11 @@ static Fireflies getLocalLoopback(Node node) { public void close() throws IOException { } + @Override + public Void enjoin(Join join) { + return null; + } + @Override public Member getMember() { return node; @@ -43,6 +49,8 @@ public void update(State state) { }; } + Void enjoin(Join join); + Gossip gossip(SayWhat sw); void update(State state); diff --git a/grpc/src/main/proto/fireflies.proto b/grpc/src/main/proto/fireflies.proto index 036713cf5..bc6657d10 100644 --- a/grpc/src/main/proto/fireflies.proto +++ b/grpc/src/main/proto/fireflies.proto @@ -15,6 +15,7 @@ package fireflies; service Fireflies { rpc gossip (SayWhat) returns (Gossip) {} rpc update (State) returns (google.protobuf.Empty) {} + rpc enjoin (Join) returns (google.protobuf.Empty) {} } message SayWhat {