Skip to content

Commit

Permalink
run to CHOAM. 1 failure, 2 errors ;)
Browse files Browse the repository at this point in the history
progress.

finally.
  • Loading branch information
Hellblazer committed Sep 25, 2023
1 parent 40b88b7 commit 668bd88
Show file tree
Hide file tree
Showing 14 changed files with 206 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.time.Clock;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

/**
Expand All @@ -44,7 +43,6 @@ public GorgoneionClient(ControlledIdentifierMember member, Function<SignedNonce,
}

public Validations apply(Duration timeout) {
var invitation = new CompletableFuture<Validations>();
KERL_ application = member.kerl();
var fs = client.apply(application, timeout);
Credentials credentials = credentials(fs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,18 @@
import com.salesforce.apollo.stereotomy.mem.MemKeyStore;
import com.salesforce.apollo.stereotomy.services.proto.ProtoEventObserver;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.security.SecureRandom;
import java.time.Duration;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -119,16 +125,23 @@ public void multiSmoke() throws Exception {
final var prefix = UUID.randomUUID().toString();
final var members = IntStream.range(0, 10).mapToObj(i -> new ControlledIdentifierMember(stereotomy.newIdentifier())).toList();

var countdown = new CountDownLatch(3);
// The kerl observer to publish admitted client KERLs to
var observer = mock(ProtoEventObserver.class);
doAnswer(new Answer<Void>() {
public Void answer(InvocationOnMock invocation) {
countdown.countDown();
return null;
}
}).when(observer).publish(Mockito.any(), Mockito.anyList());

var context = Context.<Member>newBuilder().setCardinality(members.size()).build();
for (ControlledIdentifierMember member : members) {
context.activate(member);
}
final var parameters = Parameters.newBuilder().setKerl(kerl).build();
final var exec = Executors.newVirtualThreadPerTaskExecutor();
@SuppressWarnings("unused") final var gorgons = members.stream().map(m -> {
members.stream().map(m -> {
final var router = new LocalServer(prefix, m, exec).router(ServerConnectionCache.newBuilder().setTarget(2),
exec);
router.start();
Expand Down Expand Up @@ -170,7 +183,6 @@ public void multiSmoke() throws Exception {
assertNotEquals(Validations.getDefaultInstance(), invitation);
assertTrue(invitation.getValidationsCount() >= context.majority());

// Verify client KERL published
verify(observer, times(3)).publish(client.kerl(), Collections.singletonList(invitation));
assertTrue(countdown.await(1, TimeUnit.SECONDS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,17 @@ private Validations register(Credentials request) {
member.getId());
}
}, scheduler, parameters.frequency());
return validated.thenApply(v -> {
notarize(request, v);
return v;
}).getNow(null);
try {
return validated.thenApply(v -> {
notarize(request, v);
return v;
}).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

private Validation_ validate(Credentials credentials) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@

import java.time.Clock;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Predicate;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,13 @@ public KERLAdapter(ProtoKERLService kerl, DigestAlgorithm algorithm) {

@Override
public KeyState append(KeyEvent event) {
return new KeyStateImpl(kerl.append(Collections.singletonList(event.toKeyEvent_())).getFirst());
List<KeyState_> appended = kerl.append(Collections.singletonList(event.toKeyEvent_()));
if (appended.isEmpty()) {
return null;
}
KeyState_ published = appended.getFirst();
return published.equals(KeyState_.getDefaultInstance())
? null : new KeyStateImpl(published);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ public CachingKEL(Function<Function<K, ?>, ?> kelSupplier) {
this(kelSupplier, defaultKsCoordsBuilder(), defaultEventCoordsBuilder());
}

public CachingKEL(Function<Function<K, ?>, ?> kelSupplier, Caffeine<EventCoordinates, KeyState> builder,
Caffeine<EventCoordinates, KeyEvent> eventBuilder) {
public CachingKEL(Function<Function<K, ?>, ?> kelSupplier, Caffeine<EventCoordinates, KeyState> builder, Caffeine<EventCoordinates, KeyEvent> eventBuilder) {
ksCoords = builder.build(new CacheLoader<EventCoordinates, KeyState>() {


Expand All @@ -71,26 +70,19 @@ public CachingKEL(Function<Function<K, ?>, ?> kelSupplier, Caffeine<EventCoordin
}

public static Caffeine<EventCoordinates, KeyEvent> defaultEventCoordsBuilder() {
return Caffeine.newBuilder()
.maximumSize(10_000)
.expireAfterWrite(Duration.ofMinutes(10))
.removalListener((EventCoordinates coords, KeyEvent e,
RemovalCause cause) -> log.trace("KeyEvent {} was removed ({})", coords,
cause));
return Caffeine.newBuilder().maximumSize(10_000).expireAfterWrite(Duration.ofMinutes(10)).removalListener((EventCoordinates coords, KeyEvent e, RemovalCause cause) -> log.trace("KeyEvent {} was removed ({})", coords, cause));
}

public static Caffeine<EventCoordinates, KeyState> defaultKsCoordsBuilder() {
return Caffeine.newBuilder()
.maximumSize(10_000)
.expireAfterWrite(Duration.ofMinutes(10))
.removalListener((EventCoordinates coords, KeyState ks,
RemovalCause cause) -> log.trace("KeyState {} was removed ({})", coords,
cause));
return Caffeine.newBuilder().maximumSize(10_000).expireAfterWrite(Duration.ofMinutes(10)).removalListener((EventCoordinates coords, KeyState ks, RemovalCause cause) -> log.trace("KeyState {} was removed ({})", coords, cause));
}

public KeyState append(KeyEvent event) {
try {
return complete(kel -> kel.append(event));
} catch (Throwable e) {
log.error("Cannot complete append", e);
return null;
} finally {
keyCoords.invalidate(event.getCoordinates());
}
Expand All @@ -103,6 +95,12 @@ public List<KeyState> append(KeyEvent... events) {
}
try {
return complete(kel -> kel.append(events));
} catch (ClassCastException e) {
log.error("Cannot complete append", e);
return null;
} catch (Throwable e) {
log.error("Cannot complete append", e);
return null;
} finally {
for (var event : events) {
keyCoords.invalidate(event.getCoordinates());
Expand All @@ -115,17 +113,32 @@ public List<KeyState> append(List<KeyEvent> events, List<AttachmentEvent> attach
if (events.isEmpty() && attachments.isEmpty()) {
return Collections.emptyList();
}
return complete(kel -> kel.append(events, attachments));
try {
return complete(kel -> kel.append(events, attachments));
} catch (Throwable e) {
log.error("Cannot complete append", e);
return null;
}
}

@Override
public Attachment getAttachment(EventCoordinates coordinates) {
return complete(kel -> kel.getAttachment(coordinates));
try {
return complete(kel -> kel.getAttachment(coordinates));
} catch (Throwable e) {
log.error("Cannot complete append", e);
return null;
}
}

@Override
public DigestAlgorithm getDigestAlgorithm() {
return complete(kel -> kel.getDigestAlgorithm());
try {
return complete(kel -> kel.getDigestAlgorithm());
} catch (Throwable e) {
log.error("Cannot complete append", e);
return null;
}
}

@Override
Expand All @@ -140,21 +153,46 @@ public KeyState getKeyState(EventCoordinates coordinates) {

@Override
public KeyState getKeyState(Identifier identifier) {
return complete(kel -> kel.getKeyState(identifier));
try {
return complete(kel -> kel.getKeyState(identifier));
} catch (Throwable e) {
log.error("Cannot complete append", e);
return null;
}
}

@Override
public KeyStateWithAttachments getKeyStateWithAttachments(EventCoordinates coordinates) {
return complete(kel -> kel.getKeyStateWithAttachments(coordinates));
try {
return complete(kel -> kel.getKeyStateWithAttachments(coordinates));
} catch (Throwable e) {
log.error("Cannot complete append", e);
return null;
}
}

@Override
public Verifier.DefaultVerifier getVerifier(KeyCoordinates coordinates) {
return complete(kel -> kel.getVerifier(coordinates));
try {
return complete(kel -> kel.getVerifier(coordinates));
} catch (Throwable e) {
log.error("Cannot complete append", e);
return null;
}
}

protected <T, I> T complete(Function<K, I> func) {
@SuppressWarnings("unchecked") final var result = (T) kelSupplier.apply(func);
return result;
try {
@SuppressWarnings("unchecked") final var result = (T) kelSupplier.apply(func);
return result;
} catch (Throwable t) {
log.error("Error completing cache", t);
return null;
}
}

public void clear() {
keyCoords.invalidateAll();
ksCoords.invalidateAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import com.salesforce.apollo.stereotomy.event.AttachmentEvent;
import com.salesforce.apollo.stereotomy.event.KeyEvent;
import com.salesforce.apollo.stereotomy.identifier.Identifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
Expand All @@ -24,6 +26,7 @@
* @author hal.hildebrand
*/
public class CachingKERL extends CachingKEL<KERL> implements KERL {
private static final Logger log = LoggerFactory.getLogger(CachingKERL.class);

public CachingKERL(Function<Function<KERL, ?>, ?> kelSupplier) {
super(kelSupplier);
Expand All @@ -36,24 +39,43 @@ public CachingKERL(Function<Function<KERL, ?>, ?> kelSupplier, Caffeine<EventCoo

@Override
public Void append(List<AttachmentEvent> event) {
complete(kerl -> kerl.append(event));
try {
complete(kerl -> kerl.append(event));
} catch (Throwable e) {
log.error("Cannot complete append", e);
return null;
}
return null;
}

@Override
public Void appendValidations(EventCoordinates coordinates,
Map<EventCoordinates, JohnHancock> validations) {
return complete(kerl -> kerl.appendValidations(coordinates, validations));
try {
return complete(kerl -> kerl.appendValidations(coordinates, validations));
} catch (Throwable e) {
log.error("Cannot complete append", e);
return null;
}
}

@Override
public Map<EventCoordinates, JohnHancock> getValidations(EventCoordinates coordinates) {
return complete(kerl -> kerl.getValidations(coordinates));
try {
return complete(kerl -> kerl.getValidations(coordinates));
} catch (Throwable e) {
log.error("Cannot complete getValidations", e);
return null;
}
}

@Override
public List<EventWithAttachments> kerl(Identifier identifier) {
return complete(kerl -> kerl.kerl(identifier));
try {
return complete(kerl -> kerl.kerl(identifier));
} catch (Throwable e) {
log.error("Cannot complete kerl", e);
return null;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.salesforce.apollo.stereotomy.event.protobuf.ProtobufEventFactory;
import com.salesforce.apollo.stereotomy.identifier.Identifier;
import com.salesforce.apollo.stereotomy.processing.KeyEventProcessor;
import org.h2.jdbc.JdbcSQLIntegrityConstraintViolationException;
import org.jooq.DSLContext;
import org.jooq.Record1;
import org.jooq.exception.DataAccessException;
Expand Down Expand Up @@ -163,12 +164,20 @@ public static void append(DSLContext context, KeyEvent event, KeyState newState,

final var identBytes = event.getIdentifier().toIdent().toByteArray();

context.mergeInto(IDENTIFIER)
.using(context.selectOne())
.on(IDENTIFIER.PREFIX.eq(identBytes))
.whenNotMatchedThenInsert(IDENTIFIER.PREFIX)
.values(identBytes)
.execute();
try {
context.mergeInto(IDENTIFIER)
.using(context.selectOne())
.on(IDENTIFIER.PREFIX.eq(identBytes))
.whenNotMatchedThenInsert(IDENTIFIER.PREFIX)
.values(identBytes)
.execute();
} catch (DataAccessException e) {
if (e.getCause() instanceof JdbcSQLIntegrityConstraintViolationException icv) {
log.info("Constraint violation ignored: {}", icv.toString());
} else {
throw e;
}
}

var identifierId = context.select(IDENTIFIER.ID)
.from(IDENTIFIER)
Expand All @@ -186,6 +195,7 @@ public static void append(DSLContext context, KeyEvent event, KeyState newState,
.fetchOne()
.value1();
} catch (DataAccessException e) {
log.info("already published: {} : {}", event.getCoordinates(), e.toString());
// Already exists
var coordinates = event.getCoordinates();
id = context.select(COORDINATES.ID)
Expand All @@ -210,8 +220,8 @@ public static void append(DSLContext context, KeyEvent event, KeyState newState,
.execute();
} catch (DataAccessException e) {
// ignore
log.info("already inserted event: {} : {}",e, e.toString());
}
log.trace("Inserted event: {}", event);
context.mergeInto(CURRENT_KEY_STATE)
.using(context.selectOne())
.on(CURRENT_KEY_STATE.IDENTIFIER.eq(identifierId.value1()))
Expand All @@ -221,7 +231,7 @@ public static void append(DSLContext context, KeyEvent event, KeyState newState,
.set(CURRENT_KEY_STATE.IDENTIFIER, identifierId.value1())
.set(CURRENT_KEY_STATE.CURRENT, id)
.execute();
log.trace("Inserted key state: {}", event);
log.info("Inserted key state: {}", event);
}

public static void appendAttachments(Connection connection, List<byte[]> attachments) {
Expand Down
Loading

0 comments on commit 668bd88

Please sign in to comment.