Skip to content

Commit

Permalink
enjoin. replicated join to observers in the cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Jun 19, 2024
1 parent 1f0a47f commit eae25e7
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

/**
* @author hal.hildebrand
*
*/
public interface FireflyMetrics extends EndpointMetrics {

Expand All @@ -25,6 +24,8 @@ public interface FireflyMetrics extends EndpointMetrics {

Histogram gossipResponse();

Timer inboundEnjoinDuration();

Histogram inboundGateway();

Histogram inboundGossip();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,17 @@
*/
package com.salesforce.apollo.fireflies;

import static com.codahale.metrics.MetricRegistry.name;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.protocols.EndpointMetricsImpl;

import static com.codahale.metrics.MetricRegistry.name;

/**
* @author hal.hildebrand
*
*/
public class FireflyMetricsImpl extends EndpointMetricsImpl implements FireflyMetrics {
private final Meter accusations;
Expand Down Expand Up @@ -48,6 +47,7 @@ public class FireflyMetricsImpl extends EndpointMetricsImpl implements FireflyMe
private final Timer seedDuration;
private final Meter shunnedGossip;
private final Meter viewChanges;
private final Timer inboundEnjoinDuration;

public FireflyMetricsImpl(Digest context, MetricRegistry registry) {
super(registry);
Expand Down Expand Up @@ -80,6 +80,7 @@ public FireflyMetricsImpl(Digest context, MetricRegistry registry) {
shunnedGossip = registry.meter(name(context.shortString(), "ff.gossip.shunned"));
inboundSeed = registry.histogram(name(context.shortString(), "ff.seed.inbound.bytes"));
viewChanges = registry.meter(name(context.shortString(), "ff.view.change"));
inboundEnjoinDuration = registry.timer(name(context.shortString(), "ff.enjoin.duration"));
}

@Override
Expand All @@ -102,6 +103,11 @@ public Histogram gossipResponse() {
return gossipResponse;
}

@Override
public Timer inboundEnjoinDuration() {
return inboundEnjoinDuration;
}

@Override
public Histogram inboundGateway() {
return inboundGateway;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1839,6 +1839,10 @@ boolean setNote(NoteWrapper next) {

public class Service implements EntranceService, FFService, ServiceRouting {

public void enjoin(Join join, Digest from) {
viewManagement.enjoin(join, from);
}

/**
* Asynchronously add a member to the next view
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.salesforce.apollo.fireflies.proto.Update.Builder;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.membership.ReservoirSampler;
import com.salesforce.apollo.ring.SliceIterator;
import com.salesforce.apollo.stereotomy.identifier.SelfAddressingIdentifier;
import com.salesforce.apollo.utils.Entropy;
import com.salesforce.apollo.utils.Utils;
Expand Down Expand Up @@ -123,6 +124,55 @@ Digest currentView() {
return currentView.get();
}

void enjoin(Join join, Digest observer) {
final var joinView = Digest.from(join.getView());
final var from = observer;
if (!joined()) {
log.trace("Not joined, ignored enjoin of view: {} from: {} on: {}", joinView, from, node.getId());
throw new StatusRuntimeException(Status.FAILED_PRECONDITION.withDescription(
"Not joined, ignored join of view: %s from: %s on: %s".formatted(joinView, from, node.getId())));
}
var note = new NoteWrapper(join.getNote(), digestAlgo);
if (!view.validate(note.getIdentifier())) {
log.debug("Ignored enjoin of view: {} from: {} invalid identifier on: {}", joinView, from, node.getId());
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Invalid identifier"));
}
view.stable(() -> {
var thisView = currentView();
log.debug("Enjoin requested from: {} view: {} context: {} cardinality: {} on: {}", from, thisView,
context.getId(), cardinality(), node.getId());
if (contains(from)) {
log.debug("Already a member: {} view: {} context: {} cardinality: {} on: {}", from, thisView,
context.getId(), cardinality(), node.getId());
return;
}
if (!observers.contains(node.getId())) {
log.trace("Not observer, ignoring Join from: {} observers: {} on: {}", from, observers, node.getId());
throw new StatusRuntimeException(
Status.FAILED_PRECONDITION.withDescription("Not observer, ignored join of view"));
}
if (!thisView.equals(joinView)) {
throw new StatusRuntimeException(
Status.OUT_OF_RANGE.withDescription("View: " + joinView + " does not match: " + thisView));
}
if (!View.isValidMask(note.getMask(), context)) {
log.warn(
"Invalid enjoin mask: {} majority: {} from member: {} view: {} context: {} cardinality: {} on: {}",
note.getMask(), context.majority(), from, thisView, context.getId(), cardinality(), node.getId());
}
if (pendingJoins.size() >= params.maxPending()) {
throw new StatusRuntimeException(Status.RESOURCE_EXHAUSTED.withDescription("No room at the inn"));
}
pendingJoins.computeIfAbsent(from, d -> seeds -> {
log.info("Gateway established for: {} (enjoined) view: {} context: {} cardinality: {} on: {}", from,
currentView(), context.getId(), cardinality(), node.getId());
});
joins.put(note.getId(), note);
log.debug("Member pending enjoin: {} view: {} context: {} on: {}", from, currentView(), context.getId(),
node.getId());
});
}

void gc(Participant member) {
assert member != null;
view.stable(() -> {
Expand Down Expand Up @@ -286,9 +336,9 @@ void join(Join join, Digest from, StreamObserver<Gateway> responseObserver, Time
return;
}
if (!observers.contains(node.getId())) {
log.warn("Join not observer! from: {} observers: {} on: {}", from, observers, node.getId());
responseObserver.onNext(Gateway.getDefaultInstance());
return;
log.trace("Not observer, ignoring Join from: {} observers: {} on: {}", from, observers, node.getId());
responseObserver.onError(new StatusRuntimeException(
Status.FAILED_PRECONDITION.withDescription("Not observer, ignored join of view")));
}
if (!thisView.equals(joinView)) {
responseObserver.onError(new StatusRuntimeException(
Expand All @@ -313,6 +363,10 @@ void join(Join join, Digest from, StreamObserver<Gateway> responseObserver, Time
joins.put(note.getId(), note);
log.debug("Member pending join: {} view: {} context: {} on: {}", from, currentView(), context.getId(),
node.getId());
var enjoining = new SliceIterator<>("Enjoining[%s:%s]".formatted(currentView(), from), node,
observers.stream().map(context::getActiveMember).toList(), view.comm);
enjoining.iterate(t -> t.enjoin(join), (_, _, _, _) -> true, () -> {
}, Duration.ofMillis(1));
});
}

Expand Down Expand Up @@ -378,8 +432,8 @@ boolean joined() {
*/
void maybeViewChange() {
if (context.size() == 1 && joins.size() < context.getRingCount() - 1) {
log.info("Cannot form cluster: {} with: {} members, required:4 on: {}", currentView(),
joins.size() + context.size(), node.getId());
log.trace("Cannot form cluster: {} with: {} members, required > 3 on: {}", currentView(),
joins.size() + context.size(), node.getId());
view.scheduleViewChange();
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@
import com.salesforce.apollo.archipelago.ManagedServerChannel;
import com.salesforce.apollo.archipelago.ServerConnectionCache.CreateClientCommunications;
import com.salesforce.apollo.fireflies.FireflyMetrics;
import com.salesforce.apollo.fireflies.proto.FirefliesGrpc;
import com.salesforce.apollo.fireflies.proto.Gossip;
import com.salesforce.apollo.fireflies.proto.SayWhat;
import com.salesforce.apollo.fireflies.proto.State;
import com.salesforce.apollo.fireflies.proto.*;
import com.salesforce.apollo.membership.Member;

/**
Expand Down Expand Up @@ -42,6 +39,12 @@ public void close() {
channel.release();
}

@Override
public Void enjoin(Join join) {
channel.wrap(FirefliesGrpc.newFutureStub(channel)).enjoin(join);
return null;
}

@Override
public Member getMember() {
return channel.getMember();
Expand All @@ -63,10 +66,6 @@ public Gossip gossip(SayWhat sw) {
return result;
}

public void release() {
close();
}

@Override
public String toString() {
return String.format("->[%s]", channel.getMember());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@

import com.codahale.metrics.Timer.Context;
import com.google.protobuf.Empty;
import com.salesforce.apollo.fireflies.proto.FirefliesGrpc.FirefliesImplBase;
import com.salesforce.apollo.fireflies.proto.Gossip;
import com.salesforce.apollo.fireflies.proto.SayWhat;
import com.salesforce.apollo.fireflies.proto.State;
import com.salesforce.apollo.archipelago.RoutableService;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.fireflies.FireflyMetrics;
import com.salesforce.apollo.fireflies.View.Service;
import com.salesforce.apollo.fireflies.proto.FirefliesGrpc.FirefliesImplBase;
import com.salesforce.apollo.fireflies.proto.Gossip;
import com.salesforce.apollo.fireflies.proto.Join;
import com.salesforce.apollo.fireflies.proto.SayWhat;
import com.salesforce.apollo.fireflies.proto.State;
import com.salesforce.apollo.protocols.ClientIdentity;

import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

Expand All @@ -36,6 +36,29 @@ public FfServer(ClientIdentity identity, RoutableService<Service> r, FireflyMetr
this.router = r;
}

@Override
public void enjoin(Join request, StreamObserver<Empty> responseObserver) {
Context timer = metrics == null ? null : metrics.inboundEnjoinDuration().time();
if (metrics != null) {
var serializedSize = request.getSerializedSize();
metrics.inboundBandwidth().mark(serializedSize);
metrics.inboundGossip().update(serializedSize);
}
Digest from = identity.getFrom();
if (from == null) {
responseObserver.onError(new IllegalStateException("Member has been removed"));
return;
}
router.evaluate(responseObserver, s -> {
s.enjoin(request, from);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
if (timer != null) {
timer.stop();
}
});
}

@Override
public void gossip(SayWhat request, StreamObserver<Gossip> responseObserver) {
Context timer = metrics == null ? null : metrics.inboundGossipDuration().time();
Expand Down Expand Up @@ -99,5 +122,4 @@ public void update(State request, StreamObserver<Empty> responseObserver) {
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
*/
package com.salesforce.apollo.fireflies.comm.gossip;

import com.salesforce.apollo.archipelago.Link;
import com.salesforce.apollo.fireflies.View.Node;
import com.salesforce.apollo.fireflies.proto.Gossip;
import com.salesforce.apollo.fireflies.proto.Join;
import com.salesforce.apollo.fireflies.proto.SayWhat;
import com.salesforce.apollo.fireflies.proto.State;
import com.salesforce.apollo.archipelago.Link;
import com.salesforce.apollo.fireflies.View.Node;
import com.salesforce.apollo.membership.Member;

import java.io.IOException;
Expand All @@ -27,6 +28,11 @@ static Fireflies getLocalLoopback(Node node) {
public void close() throws IOException {
}

@Override
public Void enjoin(Join join) {
return null;
}

@Override
public Member getMember() {
return node;
Expand All @@ -43,6 +49,8 @@ public void update(State state) {
};
}

Void enjoin(Join join);

Gossip gossip(SayWhat sw);

void update(State state);
Expand Down
1 change: 1 addition & 0 deletions grpc/src/main/proto/fireflies.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package fireflies;
service Fireflies {
rpc gossip (SayWhat) returns (Gossip) {}
rpc update (State) returns (google.protobuf.Empty) {}
rpc enjoin (Join) returns (google.protobuf.Empty) {}
}

message SayWhat {
Expand Down

0 comments on commit eae25e7

Please sign in to comment.