Skip to content

Commit

Permalink
handle unknown accusations, initiate view change under lock
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Jun 22, 2024
1 parent 5b3084e commit 6d6bd30
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 27 deletions.
26 changes: 18 additions & 8 deletions fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ protected Gossip gossip(Fireflies link, int ring) {
case PERMISSION_DENIED:
log.trace("Rejected gossip: {} view: {} from: {} on: {}", sre.getStatus(), currentView(), p.getId(),
node.getId());
accuse(p, ring, sre);
break;
case FAILED_PRECONDITION:
log.trace("Failed gossip: {} view: {} from: {} on: {}", sre.getStatus(), currentView(), p.getId(),
Expand Down Expand Up @@ -744,11 +745,12 @@ private boolean add(AccusationWrapper accusation, Participant accuser, Participa
return false;
}

if (accused.isAccusedOn(accusation.getRingNumber())) {
Participant currentAccuser = context.getMember(
accused.getAccusation(accusation.getRingNumber()).getAccuser());
if (!currentAccuser.equals(accuser)) {
if (context.isBetween(accusation.getRingNumber(), currentAccuser, accuser, accused)) {
var acc = accused.getAccusation(accusation.getRingNumber());
if (acc != null) {
var currentAccuser = context.getMember(acc.getAccuser());
if (currentAccuser == null || !currentAccuser.equals(accuser)) {
if (currentAccuser == null || context.isBetween(accusation.getRingNumber(), currentAccuser, accuser,
accused)) {
if (!accused.verify(accusation.getSignature(),
accusation.getWrapped().getAccusation().toByteString())) {
log.trace("Accusation discarded, accusation by: {} accused:{} signature invalid on: {}",
Expand Down Expand Up @@ -782,8 +784,8 @@ private boolean add(AccusationWrapper accusation, Participant accuser, Participa
}
return false;
}
Participant predecessor = context.predecessor(accusation.getRingNumber(), accused,
m -> (!m.isAccused()) || (m.equals(accuser)));
var predecessor = context.predecessor(accusation.getRingNumber(), accused,
m -> (!m.isAccused()) || (m.equals(accuser)));
if (accuser.equals(predecessor)) {
accused.addAccusation(accusation);
if (!accused.equals(node) && !pendingRebuttals.containsKey(accused.getId())) {
Expand Down Expand Up @@ -1135,6 +1137,11 @@ private void handleSRE(String type, RingCommunications.Destination<Participant,
case PERMISSION_DENIED:
log.trace("Rejected: {}: {} view: {} from: {} on: {}", type, sre.getStatus(), currentView(), member.getId(),
node.getId());
accuse(member, destination.ring(), sre);
break;
case FAILED_PRECONDITION:
log.trace("Failed: {}: {} view: {} from: {} on: {}", type, sre.getStatus(), currentView(), member.getId(),
node.getId());
break;
case RESOURCE_EXHAUSTED:
log.trace("Unavailable for: {}: {} view: {} from: {} on: {}", type, sre.getStatus(), currentView(),
Expand All @@ -1159,6 +1166,9 @@ private void handleSRE(String type, RingCommunications.Destination<Participant,
*/
private void invalidate(Participant q, DynamicContextImpl.Ring<Participant> ring, Deque<Participant> check) {
AccusationWrapper qa = q.getAccusation(ring.getIndex());
if (qa == null) {
return;
}
Participant accuser = context.getMember(qa.getAccuser());
Participant accused = context.getMember(qa.getAccused());
if (ring.isBetween(accuser, q, accused)) {
Expand Down Expand Up @@ -1400,7 +1410,7 @@ private void validate(Digest from, final int ring, Digest requestView, String ty
log.debug("Invalid {}, view: {} current: {} ring: {} from: {} on: {}", type, requestView, currentView(),
ring, from, node.getId());
throw new StatusRuntimeException(
Status.PERMISSION_DENIED.withDescription("Invalid view: " + requestView + " current: " + currentView()));
Status.FAILED_PRECONDITION.withDescription("Invalid view: " + requestView + " current: " + currentView()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,21 +434,7 @@ void maybeViewChange() {
return;
}
if ((context.offlineCount() > 0 || !joins.isEmpty())) {
if (isObserver()) {
log.info("Initiating view change: {} (observer) joins: {} leaves: {} on: {}", currentView(),
joins.size(), view.streamShunned().count(), node.getId());
initiateViewChange();
} else {
// Use pending rebuttals as a proxy for stability
if (view.hasPendingRebuttals()) {
log.debug("Pending rebuttals in view: {} on: {}", currentView(), node.getId());
view.scheduleViewChange(2); // 2 TTL round2 to check again
} else {
log.info("Initiating view change: {} (non observer) joins: {} leaves: {} on: {}", currentView(),
joins.size(), view.streamShunned().count(), node.getId());
view.scheduleFinalizeViewChange();
}
}
initiateViewChange();
} else {
// log.trace("No view change: {} joins: {} leaves: {} on: {}", currentView(), joins.size(),
// view.streamShunned().count(), node.getId());
Expand Down Expand Up @@ -574,7 +560,6 @@ void start(CompletableFuture<Void> onJoin, boolean bootstrap) {
* Initiate the view change
*/
private void initiateViewChange() {
assert isObserver() : "Not observer: " + node.getId();
view.stable(() -> {
if (vote.get() != null) {
log.trace("Vote already cast for: {} on: {}", currentView(), node.getId());
Expand All @@ -588,11 +573,12 @@ private void initiateViewChange() {
}
view.scheduleFinalizeViewChange();
if (!isObserver(node.getId())) {
log.warn("Initiating view change: {} (non observer) on: {}", currentView(), node.getId());
log.info("Initiating (non observer) view change: {} joins: {} leaves: {} on: {}", currentView(),
joins.size(), view.streamShunned().count(), node.getId());
return;
}
log.warn("Initiating view change vote: {} joins: {} leaves: {} observers: {} on: {}", currentView(),
joins.size(), view.streamShunned().count(), observersList(), node.getId());
log.warn("Initiating (observer) view change vote: {} joins: {} leaves: {} observers: {} on: {}",
currentView(), joins.size(), view.streamShunned().count(), observersList(), node.getId());
final var builder = ViewChange.newBuilder()
.setObserver(node.getId().toDigeste())
.setCurrent(currentView().toDigeste())
Expand Down

0 comments on commit 6d6bd30

Please sign in to comment.