Skip to content

Commit

Permalink
Fix exception behaviour for various failures. (#154)
Browse files Browse the repository at this point in the history
* Fix exception behaviour for various failures.
Add more details about the side effects behaviour.
* Rename SuspendedException into AbortedExecutionException.
slinkydeveloper authored Nov 21, 2023
1 parent 5b280e2 commit de4eceb
Showing 28 changed files with 573 additions and 267 deletions.
Original file line number Diff line number Diff line change
@@ -74,17 +74,7 @@ private static class ExecutorSwitchingSyscalls implements SyscallsInternal {

private ExecutorSwitchingSyscalls(SyscallsInternal syscalls, Executor syscallsExecutor) {
this.syscalls = syscalls;
this.syscallsExecutor =
r ->
syscallsExecutor.execute(
() -> {
try {
r.run();
} catch (Throwable e) {
syscalls.fail(e);
throw e;
}
});
this.syscallsExecutor = syscallsExecutor;
}

@Override
@@ -152,9 +142,10 @@ public void exitSideEffectBlock(ByteString toWrite, ExitSideEffectSyscallCallbac
}

@Override
public void exitSideEffectBlockWithException(
Throwable toWrite, ExitSideEffectSyscallCallback callback) {
syscallsExecutor.execute(() -> syscalls.exitSideEffectBlockWithException(toWrite, callback));
public void exitSideEffectBlockWithTerminalException(
TerminalException toWrite, ExitSideEffectSyscallCallback callback) {
syscallsExecutor.execute(
() -> syscalls.exitSideEffectBlockWithTerminalException(toWrite, callback));
}

@Override
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package dev.restate.sdk.core.impl;

import dev.restate.sdk.core.SuspendedException;
import dev.restate.sdk.core.AbortedExecutionException;
import javax.annotation.Nullable;

class InputPublisherState {
@@ -12,7 +12,7 @@ void notifyClosed(Throwable cause) {
}

boolean isSuspended() {
return this.closeCause == SuspendedException.INSTANCE;
return this.closeCause == AbortedExecutionException.INSTANCE;
}

boolean isClosed() {
Original file line number Diff line number Diff line change
@@ -5,8 +5,8 @@
import com.google.rpc.Code;
import dev.restate.generated.sdk.java.Java;
import dev.restate.generated.service.protocol.Protocol;
import dev.restate.sdk.core.AbortedExecutionException;
import dev.restate.sdk.core.InvocationId;
import dev.restate.sdk.core.SuspendedException;
import dev.restate.sdk.core.impl.DeferredResults.CombinatorDeferredResult;
import dev.restate.sdk.core.impl.DeferredResults.ResolvableSingleDeferredResult;
import dev.restate.sdk.core.impl.DeferredResults.SingleDeferredResultInternal;
@@ -148,8 +148,8 @@ public void onError(Throwable throwable) {
@Override
public void onComplete() {
LOG.trace("Input publisher closed");
this.readyResultStateMachine.abort(SuspendedException.INSTANCE);
this.sideEffectAckStateMachine.abort(SuspendedException.INSTANCE);
this.readyResultStateMachine.abort(AbortedExecutionException.INSTANCE);
this.sideEffectAckStateMachine.abort(AbortedExecutionException.INSTANCE);
}

// --- Init routine to wait for the start message
@@ -260,7 +260,7 @@ <E extends MessageLite, T> void processCompletableJournalEntry(
SyscallCallback<DeferredResult<T>> callback) {
checkInsideSideEffectGuard();
if (this.state == State.CLOSED) {
callback.onCancel(SuspendedException.INSTANCE);
callback.onCancel(AbortedExecutionException.INSTANCE);
} else if (this.state == State.REPLAYING) {
// Retrieve the entry
this.readEntry(
@@ -331,7 +331,7 @@ <E extends MessageLite> void processJournalEntry(
E expectedEntryMessage, JournalEntry<E> journalEntry, SyscallCallback<Void> callback) {
checkInsideSideEffectGuard();
if (this.state == State.CLOSED) {
callback.onCancel(SuspendedException.INSTANCE);
callback.onCancel(AbortedExecutionException.INSTANCE);
} else if (this.state == State.REPLAYING) {
// Retrieve the entry
this.readEntry(
@@ -363,7 +363,7 @@ <E extends MessageLite> void processJournalEntry(
void enterSideEffectBlock(EnterSideEffectSyscallCallback callback) {
checkInsideSideEffectGuard();
if (this.state == State.CLOSED) {
callback.onCancel(SuspendedException.INSTANCE);
callback.onCancel(AbortedExecutionException.INSTANCE);
} else if (this.state == State.REPLAYING) {
// Retrieve the entry
this.readEntry(
@@ -390,7 +390,7 @@ void exitSideEffectBlock(
Java.SideEffectEntryMessage sideEffectEntry, ExitSideEffectSyscallCallback callback) {
this.insideSideEffect = false;
if (this.state == State.CLOSED) {
callback.onCancel(SuspendedException.INSTANCE);
callback.onCancel(AbortedExecutionException.INSTANCE);
} else if (this.state == State.REPLAYING) {
throw new IllegalStateException(
"exitSideEffect has been invoked when the state machine is in replaying mode. "
@@ -415,7 +415,7 @@ public void onLastSideEffectAck() {
@Override
public void onSuspend() {
writeSuspension(sideEffectAckStateMachine.getLastExecutedSideEffect());
callback.onCancel(SuspendedException.INSTANCE);
callback.onCancel(AbortedExecutionException.INSTANCE);
}

@Override
@@ -434,7 +434,7 @@ void completeSideEffectCallbackWithEntry(
if (sideEffectEntry.hasFailure()) {
callback.onFailure(Util.toRestateException(sideEffectEntry.getFailure()));
} else {
callback.onResult(sideEffectEntry.getValue());
callback.onSuccess(sideEffectEntry.getValue());
}
}

@@ -479,7 +479,7 @@ public boolean onNewReadyResult(Map<Integer, ReadyResultInternal<?>> resultMap)
@Override
public void onSuspend() {
writeSuspension(deferred.entryIndex());
callback.onCancel(SuspendedException.INSTANCE);
callback.onCancel(AbortedExecutionException.INSTANCE);
}

@Override
@@ -620,7 +620,7 @@ public boolean onNewReadyResult(Map<Integer, ReadyResultInternal<?>> resultMap)
@Override
public void onSuspend() {
writeSuspension(resolvableSingles.keySet());
callback.onCancel(SuspendedException.INSTANCE);
callback.onCancel(AbortedExecutionException.INSTANCE);
}

@Override
Original file line number Diff line number Diff line change
@@ -84,8 +84,7 @@ public InvocationHandler resolve(
SyscallsInternal syscalls =
syscallExecutor != null
? ExecutorSwitchingWrappers.syscalls(new SyscallsImpl(stateMachine), syscallExecutor)
// We still wrap with syscalls executor switching to exploit the error handling
: ExecutorSwitchingWrappers.syscalls(new SyscallsImpl(stateMachine), Runnable::run);
: new SyscallsImpl(stateMachine);
RestateServerCall bridge = new RestateServerCall(method.getMethodDescriptor(), syscalls);

return new InvocationHandler() {
Original file line number Diff line number Diff line change
@@ -101,7 +101,7 @@ public void close(Status status, Metadata trailers) {
if (status.getCause() != null) {
syscalls.fail(status.getCause());
} else {
// Just propagate cause
// Just propagate status
syscalls.fail(status.asRuntimeException());
}
}
Loading

0 comments on commit de4eceb

Please sign in to comment.