From 9f13858108d48fe38af42e2cdc37b5f03949bc11 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Tue, 17 Mar 2026 13:45:33 -0700 Subject: [PATCH 1/2] separate context impl and interface --- docs/spec/waitForCondition.md | 2 +- .../lambda/durable/testing/TestOperation.java | 4 +- .../amazon/lambda/durable/BaseContext.java | 119 +-- .../amazon/lambda/durable/DurableContext.java | 688 +++-------------- .../lambda/durable/DurableExecutor.java | 4 +- .../amazon/lambda/durable/StepContext.java | 66 +- .../durable/context/BaseContextImpl.java | 137 ++++ .../durable/context/DurableContextImpl.java | 729 ++++++++++++++++++ .../durable/context/StepContextImpl.java | 72 ++ .../lambda/durable/logging/DurableLogger.java | 8 +- .../operation/BaseConcurrentOperation.java | 5 +- .../operation/BaseDurableOperation.java | 11 +- .../durable/operation/CallbackOperation.java | 4 +- .../operation/ChildContextOperation.java | 5 +- .../operation/ConcurrencyOperation.java | 8 +- .../durable/operation/InvokeOperation.java | 4 +- .../durable/operation/MapOperation.java | 4 +- .../durable/operation/ParallelOperation.java | 5 +- .../durable/operation/StepOperation.java | 4 +- .../operation/WaitForConditionOperation.java | 4 +- .../durable/operation/WaitOperation.java | 5 +- .../{ => util}/CompletedDurableFuture.java | 8 +- .../lambda/durable/DurableContextTest.java | 10 +- .../lambda/durable/ReplayValidationTest.java | 3 +- .../durable/logging/DurableLoggerTest.java | 6 +- .../operation/BaseDurableOperationTest.java | 6 +- .../operation/CallbackOperationTest.java | 6 +- .../operation/ChildContextOperationTest.java | 5 +- .../operation/ConcurrencyOperationTest.java | 9 +- .../operation/InvokeOperationTest.java | 6 +- .../operation/ParallelOperationTest.java | 6 +- .../durable/operation/StepOperationTest.java | 6 +- .../WaitForConditionOperationTest.java | 6 +- .../durable/operation/WaitOperationTest.java | 6 +- 34 files changed, 1153 insertions(+), 818 deletions(-) create mode 100644 sdk/src/main/java/software/amazon/lambda/durable/context/BaseContextImpl.java create mode 100644 sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java create mode 100644 sdk/src/main/java/software/amazon/lambda/durable/context/StepContextImpl.java rename sdk/src/main/java/software/amazon/lambda/durable/{ => util}/CompletedDurableFuture.java (68%) diff --git a/docs/spec/waitForCondition.md b/docs/spec/waitForCondition.md index 99f0d730a..89a3accb1 100644 --- a/docs/spec/waitForCondition.md +++ b/docs/spec/waitForCondition.md @@ -76,7 +76,7 @@ public interface WaitForConditionWaitStrategy { ``` - `state`: the current state returned by the check function -- `attempt`: 1-based attempt number (first check is attempt 1) +- `attempt`: 0-based attempt number (first check is attempt 0) - Returns a `WaitForConditionDecision` indicating whether to continue or stop ### WaitForConditionDecision diff --git a/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/TestOperation.java b/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/TestOperation.java index deb36bb9a..56b7e4897 100644 --- a/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/TestOperation.java +++ b/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/TestOperation.java @@ -85,9 +85,9 @@ public ErrorObject getError() { return details != null ? details.error() : null; } - /** Returns the current retry attempt number (1-based), defaulting to 1 if not available. */ + /** Returns the current retry attempt number (0-based), defaulting to 0 if not available. */ public int getAttempt() { var details = operation.stepDetails(); - return details != null && details.attempt() != null ? details.attempt() : 1; + return details != null && details.attempt() != null ? details.attempt() : 0; } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/BaseContext.java b/sdk/src/main/java/software/amazon/lambda/durable/BaseContext.java index 64e22a70e..0874e75c9 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/BaseContext.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/BaseContext.java @@ -3,134 +3,49 @@ package software.amazon.lambda.durable; import com.amazonaws.services.lambda.runtime.Context; -import software.amazon.lambda.durable.execution.ExecutionManager; -import software.amazon.lambda.durable.execution.SuspendExecutionException; -import software.amazon.lambda.durable.execution.ThreadContext; -import software.amazon.lambda.durable.execution.ThreadType; import software.amazon.lambda.durable.logging.DurableLogger; -public abstract class BaseContext implements AutoCloseable { - private final ExecutionManager executionManager; - private final DurableConfig durableConfig; - private final Context lambdaContext; - private final ExecutionContext executionContext; - private final String contextId; - private final String contextName; - private final ThreadType threadType; - - private boolean isReplaying; - - /** - * Creates a new BaseContext instance. - * - * @param executionManager the execution manager for thread coordination and state management - * @param durableConfig the durable execution configuration - * @param lambdaContext the AWS Lambda runtime context - * @param contextId the context ID, null for root context, set for child contexts - * @param contextName the human-readable name for this context - * @param threadType the type of thread this context runs on - */ - protected BaseContext( - ExecutionManager executionManager, - DurableConfig durableConfig, - Context lambdaContext, - String contextId, - String contextName, - ThreadType threadType) { - this.executionManager = executionManager; - this.durableConfig = durableConfig; - this.lambdaContext = lambdaContext; - this.contextId = contextId; - this.contextName = contextName; - this.executionContext = new ExecutionContext(executionManager.getDurableExecutionArn()); - this.isReplaying = executionManager.hasOperationsForContext(contextId); - this.threadType = threadType; - - // write the thread id and type to thread local - executionManager.setCurrentThreadContext(new ThreadContext(contextId, threadType)); - } - - // =============== accessors ================ +public interface BaseContext extends AutoCloseable { /** * Gets a logger with additional information of the current execution context. * * @return a DurableLogger instance */ - public abstract DurableLogger getLogger(); + DurableLogger getLogger(); /** * Returns the AWS Lambda runtime context. * * @return the Lambda context */ - public Context getLambdaContext() { - return lambdaContext; - } + Context getLambdaContext(); /** - * Returns metadata about the current durable execution. - * - *

The execution context provides information that remains constant throughout the execution lifecycle, such as - * the durable execution ARN. This is useful for tracking execution progress, correlating logs, and referencing this - * execution in external systems. + * Returns the current durable execution arn * - * @return the execution context + * @return the execution arn */ - public ExecutionContext getExecutionContext() { - return executionContext; - } + String getExecutionArn(); /** * Returns the configuration for durable execution behavior. * * @return the durable configuration */ - public DurableConfig getDurableConfig() { - return durableConfig; - } + DurableConfig getDurableConfig(); - // ============= internal utilities =============== - - /** Gets the context ID for this context. Null for root context, set for child contexts. */ - public String getContextId() { - return contextId; - } - - public String getContextName() { - return contextName; - } + /** + * Gets the context ID for this context. Null for root context, operationId of the context operation for child + * contexts. + */ + String getContextId(); - public ExecutionManager getExecutionManager() { - return executionManager; - } + /** Gets the context name for this context. Null for root context. */ + String getContextName(); /** Returns whether this context is currently in replay mode. */ - boolean isReplaying() { - return isReplaying; - } - - /** - * Transitions this context from replay to execution mode. Called when the first un-cached operation is encountered. - */ - void setExecutionMode() { - this.isReplaying = false; - } + boolean isReplaying(); - public void close() { - // this is called in the user thread, after the context's user code has completed - if (getContextId() != null) { - // if this is a child context or a step context, we need to - // deregister the context's thread from the execution manager - try { - executionManager.deregisterActiveThread(getContextId()); - } catch (SuspendExecutionException e) { - // Expected when this is the last active thread. Must catch here because: - // 1/ This runs in a worker thread detached from handlerFuture - // 2/ Uncaught exception would prevent stepAsync().get() from resume - // Suspension/Termination is already signaled via - // suspendExecutionFuture/terminateExecutionFuture - // before the throw. - } - } - } + /** Closes this context. */ + void close(); } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java b/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java index be4248733..d9c20292b 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java @@ -2,106 +2,15 @@ // SPDX-License-Identifier: Apache-2.0 package software.amazon.lambda.durable; -import com.amazonaws.services.lambda.runtime.Context; import java.time.Duration; import java.util.Collection; -import java.util.List; -import java.util.Objects; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; -import org.slf4j.LoggerFactory; -import software.amazon.awssdk.services.lambda.model.OperationType; -import software.amazon.lambda.durable.execution.ExecutionManager; -import software.amazon.lambda.durable.execution.OperationIdGenerator; -import software.amazon.lambda.durable.execution.ThreadType; -import software.amazon.lambda.durable.logging.DurableLogger; import software.amazon.lambda.durable.model.MapResult; -import software.amazon.lambda.durable.model.OperationIdentifier; -import software.amazon.lambda.durable.model.OperationSubType; -import software.amazon.lambda.durable.operation.CallbackOperation; -import software.amazon.lambda.durable.operation.ChildContextOperation; -import software.amazon.lambda.durable.operation.InvokeOperation; -import software.amazon.lambda.durable.operation.MapOperation; -import software.amazon.lambda.durable.operation.ParallelOperation; -import software.amazon.lambda.durable.operation.StepOperation; -import software.amazon.lambda.durable.operation.WaitForConditionOperation; -import software.amazon.lambda.durable.operation.WaitOperation; -import software.amazon.lambda.durable.validation.ParameterValidator; - -/** - * User-facing API for defining durable operations within a workflow. - * - *

Provides methods for creating steps, waits, chained invokes, callbacks, and child contexts. Each method creates a - * checkpoint-backed operation that survives Lambda interruptions. - */ -public class DurableContext extends BaseContext { - private static final String WAIT_FOR_CALLBACK_CALLBACK_SUFFIX = "-callback"; - private static final String WAIT_FOR_CALLBACK_SUBMITTER_SUFFIX = "-submitter"; - private static final int MAX_WAIT_FOR_CALLBACK_NAME_LENGTH = ParameterValidator.MAX_OPERATION_NAME_LENGTH - - Math.max(WAIT_FOR_CALLBACK_CALLBACK_SUFFIX.length(), WAIT_FOR_CALLBACK_SUBMITTER_SUFFIX.length()); - private final OperationIdGenerator operationIdGenerator; - private volatile DurableLogger logger; - - /** Shared initialization — sets all fields. */ - private DurableContext( - ExecutionManager executionManager, - DurableConfig durableConfig, - Context lambdaContext, - String contextId, - String contextName) { - super(executionManager, durableConfig, lambdaContext, contextId, contextName, ThreadType.CONTEXT); - operationIdGenerator = new OperationIdGenerator(contextId); - } - - /** - * Creates a root context (contextId = null) - * - *

The context itself always has a null contextId (making it a root context). - * - * @param executionManager the execution manager - * @param durableConfig the durable configuration - * @param lambdaContext the Lambda context - * @return a new root DurableContext - */ - public static DurableContext createRootContext( - ExecutionManager executionManager, DurableConfig durableConfig, Context lambdaContext) { - return new DurableContext(executionManager, durableConfig, lambdaContext, null, null); - } - - /** - * Creates a child context. - * - * @param childContextId the child context's ID (the CONTEXT operation's operation ID) - * @param childContextName the name of the child context - * @return a new DurableContext for the child context - */ - public DurableContext createChildContext(String childContextId, String childContextName) { - return new DurableContext( - getExecutionManager(), getDurableConfig(), getLambdaContext(), childContextId, childContextName); - } - - /** - * Creates a step context for executing step operations. - * - * @param stepOperationId the ID of the step operation (used for thread registration) - * @param stepOperationName the name of the step operation - * @param attempt the current retry attempt number (1-based) - * @return a new StepContext instance - */ - public StepContext createStepContext(String stepOperationId, String stepOperationName, int attempt) { - return new StepContext( - getExecutionManager(), - getDurableConfig(), - getLambdaContext(), - stepOperationId, - stepOperationName, - attempt); - } - - // ========== step methods ========== +public interface DurableContext extends BaseContext { /** * Executes a durable step with the given name and blocks until it completes. * @@ -114,9 +23,7 @@ public StepContext createStepContext(String stepOperationId, String stepOperatio * @param func the function to execute, receiving a {@link StepContext} * @return the step result */ - public T step(String name, Class resultType, Function func) { - return step(name, TypeToken.get(resultType), func, StepConfig.builder().build()); - } + T step(String name, Class resultType, Function func); /** * Executes a durable step with the given name and configuration, blocking until it completes. @@ -128,10 +35,7 @@ public T step(String name, Class resultType, Function fun * @param config the step configuration (retry strategy, semantics, custom SerDes) * @return the step result */ - public T step(String name, Class resultType, Function func, StepConfig config) { - // Simply delegate to stepAsync and block on the result - return stepAsync(name, resultType, func, config).get(); - } + T step(String name, Class resultType, Function func, StepConfig config); /** * Executes a durable step using a {@link TypeToken} for generic result types, blocking until it completes. @@ -142,9 +46,7 @@ public T step(String name, Class resultType, Function fun * @param func the function to execute, receiving a {@link StepContext} * @return the step result */ - public T step(String name, TypeToken typeToken, Function func) { - return step(name, typeToken, func, StepConfig.builder().build()); - } + T step(String name, TypeToken typeToken, Function func); /** * Executes a durable step using a {@link TypeToken} and configuration, blocking until it completes. @@ -156,10 +58,7 @@ public T step(String name, TypeToken typeToken, Function * @param config the step configuration (retry strategy, semantics, custom SerDes) * @return the step result */ - public T step(String name, TypeToken typeToken, Function func, StepConfig config) { - // Simply delegate to stepAsync and block on the result - return stepAsync(name, typeToken, func, config).get(); - } + T step(String name, TypeToken typeToken, Function func, StepConfig config); /** * Asynchronously executes a durable step, returning a {@link DurableFuture} that can be composed or blocked on. @@ -170,111 +69,72 @@ public T step(String name, TypeToken typeToken, Function * @param func the function to execute, receiving a {@link StepContext} * @return a future representing the step result */ - public DurableFuture stepAsync(String name, Class resultType, Function func) { - return stepAsync( - name, TypeToken.get(resultType), func, StepConfig.builder().build()); - } - - /** Asynchronously executes a durable step with custom configuration. */ - public DurableFuture stepAsync( - String name, Class resultType, Function func, StepConfig config) { - return stepAsync(name, TypeToken.get(resultType), func, config); - } - - /** Asynchronously executes a durable step using a {@link TypeToken} for generic result types. */ - public DurableFuture stepAsync(String name, TypeToken typeToken, Function func) { - return stepAsync(name, typeToken, func, StepConfig.builder().build()); - } + DurableFuture stepAsync(String name, Class resultType, Function func); /** - * Asynchronously executes a durable step using a {@link TypeToken} and custom configuration. + * Asynchronously executes a durable step using custom configuration. * *

This is the core stepAsync implementation. All other step/stepAsync overloads delegate here. * * @param the result type * @param name the unique operation name within this context - * @param typeToken the type token for deserialization of generic types * @param func the function to execute, receiving a {@link StepContext} * @param config the step configuration (retry strategy, semantics, custom SerDes) * @return a future representing the step result */ - public DurableFuture stepAsync( - String name, TypeToken typeToken, Function func, StepConfig config) { - Objects.requireNonNull(config, "config cannot be null"); - Objects.requireNonNull(typeToken, "typeToken cannot be null"); - ParameterValidator.validateOperationName(name); + DurableFuture stepAsync(String name, Class resultType, Function func, StepConfig config); - if (config.serDes() == null) { - config = config.toBuilder().serDes(getDurableConfig().getSerDes()).build(); - } - var operationId = nextOperationId(); - - // Create and start step operation with TypeToken - var operation = new StepOperation<>( - OperationIdentifier.of(operationId, name, OperationType.STEP), func, typeToken, config, this); - - operation.execute(); // Start the step (returns immediately) + /** + * Asynchronously executes a durable step using a {@link TypeToken} for generic result types. + * + *

This is the core stepAsync implementation. All other step/stepAsync overloads delegate here. + * + * @param the result type + * @param name the unique operation name within this context + * @param typeToken the type token for deserialization of generic types + * @param func the function to execute, receiving a {@link StepContext} + * @return a future representing the step result + */ + DurableFuture stepAsync(String name, TypeToken typeToken, Function func); - return operation; - } + /** + * Asynchronously executes a durable step using a {@link TypeToken} and custom configuration. + * + *

This is the core stepAsync implementation. All other step/stepAsync overloads delegate here. + * + * @param the result type + * @param name the unique operation name within this context + * @param typeToken the type token for deserialization of generic types + * @param func the function to execute, receiving a {@link StepContext} + * @param config the step configuration (retry strategy, semantics, custom SerDes) + * @return a future representing the step result + */ + DurableFuture stepAsync( + String name, TypeToken typeToken, Function func, StepConfig config); - /** @deprecated use the variants accepting StepContext instead */ @Deprecated - public T step(String name, Class resultType, Supplier func) { - return stepAsync( - name, - TypeToken.get(resultType), - func, - StepConfig.builder().build()) - .get(); - } - - /** @deprecated use the variants accepting StepContext instead */ - @Deprecated - public T step(String name, Class resultType, Supplier func, StepConfig config) { - // Simply delegate to stepAsync and block on the result - return stepAsync(name, TypeToken.get(resultType), func, config).get(); - } + T step(String name, Class resultType, Supplier func); - /** @deprecated use the variants accepting StepContext instead */ @Deprecated - public T step(String name, TypeToken typeToken, Supplier func) { - return stepAsync(name, typeToken, func, StepConfig.builder().build()).get(); - } + T step(String name, Class resultType, Supplier func, StepConfig config); - /** @deprecated use the variants accepting StepContext instead */ @Deprecated - public T step(String name, TypeToken typeToken, Supplier func, StepConfig config) { - // Simply delegate to stepAsync and block on the result - return stepAsync(name, typeToken, func, config).get(); - } + T step(String name, TypeToken typeToken, Supplier func); - /** @deprecated use the variants accepting StepContext instead */ @Deprecated - public DurableFuture stepAsync(String name, Class resultType, Supplier func) { - return stepAsync( - name, TypeToken.get(resultType), func, StepConfig.builder().build()); - } + T step(String name, TypeToken typeToken, Supplier func, StepConfig config); - /** @deprecated use the variants accepting StepContext instead */ @Deprecated - public DurableFuture stepAsync(String name, Class resultType, Supplier func, StepConfig config) { - return stepAsync(name, TypeToken.get(resultType), func, config); - } + DurableFuture stepAsync(String name, Class resultType, Supplier func); - /** @deprecated use the variants accepting StepContext instead */ @Deprecated - public DurableFuture stepAsync(String name, TypeToken typeToken, Supplier func) { - return stepAsync(name, typeToken, func, StepConfig.builder().build()); - } + DurableFuture stepAsync(String name, Class resultType, Supplier func, StepConfig config); - /** @deprecated use the variants accepting StepContext instead */ @Deprecated - public DurableFuture stepAsync(String name, TypeToken typeToken, Supplier func, StepConfig config) { - return stepAsync(name, typeToken, stepContext -> func.get(), config); - } + DurableFuture stepAsync(String name, TypeToken typeToken, Supplier func); - // ========== wait methods ========== + @Deprecated + DurableFuture stepAsync(String name, TypeToken typeToken, Supplier func, StepConfig config); /** * Suspends execution for the specified duration without consuming compute resources. @@ -286,10 +146,7 @@ public DurableFuture stepAsync(String name, TypeToken typeToken, Suppl * @param duration the duration to wait * @return always {@code null} */ - public Void wait(String name, Duration duration) { - // Block (will throw SuspendExecutionException if there is no active thread) - return waitAsync(name, duration).get(); - } + Void wait(String name, Duration duration); /** * Asynchronously suspends execution for the specified duration. @@ -298,21 +155,7 @@ public Void wait(String name, Duration duration) { * @param duration the duration to wait * @return a future that completes when the wait duration has elapsed */ - public DurableFuture waitAsync(String name, Duration duration) { - ParameterValidator.validateDuration(duration, "Wait duration"); - ParameterValidator.validateOperationName(name); - - var operationId = nextOperationId(); - - // Create and start wait operation - var operation = - new WaitOperation(OperationIdentifier.of(operationId, name, OperationType.WAIT), duration, this); - - operation.execute(); // Checkpoint the wait - return operation; - } - - // ========== chained invoke methods ========== + DurableFuture waitAsync(String name, Duration duration); /** * Invokes another Lambda function by name and blocks until the result is available. @@ -328,59 +171,25 @@ public DurableFuture waitAsync(String name, Duration duration) { * @param resultType the result class for deserialization * @return the invocation result */ - public T invoke(String name, String functionName, U payload, Class resultType) { - return invokeAsync( - name, - functionName, - payload, - TypeToken.get(resultType), - InvokeConfig.builder().build()) - .get(); - } + T invoke(String name, String functionName, U payload, Class resultType); /** Invokes another Lambda function with custom configuration, blocking until the result is available. */ - public T invoke(String name, String functionName, U payload, Class resultType, InvokeConfig config) { - return invokeAsync(name, functionName, payload, TypeToken.get(resultType), config) - .get(); - } + T invoke(String name, String functionName, U payload, Class resultType, InvokeConfig config); /** Invokes another Lambda function using a {@link TypeToken} for generic result types, blocking until complete. */ - public T invoke(String name, String functionName, U payload, TypeToken typeToken) { - return invokeAsync( - name, - functionName, - payload, - typeToken, - InvokeConfig.builder().build()) - .get(); - } + T invoke(String name, String functionName, U payload, TypeToken typeToken); - /** Invokes another Lambda function using a {@link TypeToken} and custom configuration, blocking until complete. */ - public T invoke(String name, String functionName, U payload, TypeToken typeToken, InvokeConfig config) { - return invokeAsync(name, functionName, payload, typeToken, config).get(); - } + T invoke(String name, String functionName, U payload, TypeToken typeToken, InvokeConfig config); - /** Asynchronously invokes another Lambda function with custom configuration. */ - public DurableFuture invokeAsync( - String name, String functionName, U payload, Class resultType, InvokeConfig config) { - return invokeAsync(name, functionName, payload, TypeToken.get(resultType), config); - } + /** Invokes another Lambda function using a {@link TypeToken} and custom configuration, blocking until complete. */ + DurableFuture invokeAsync( + String name, String functionName, U payload, Class resultType, InvokeConfig config); /** Asynchronously invokes another Lambda function, returning a {@link DurableFuture}. */ - public DurableFuture invokeAsync(String name, String functionName, U payload, Class resultType) { - return invokeAsync( - name, - functionName, - payload, - TypeToken.get(resultType), - InvokeConfig.builder().build()); - } + DurableFuture invokeAsync(String name, String functionName, U payload, Class resultType); /** Asynchronously invokes another Lambda function using a {@link TypeToken} for generic result types. */ - public DurableFuture invokeAsync(String name, String functionName, U payload, TypeToken resultType) { - return invokeAsync( - name, functionName, payload, resultType, InvokeConfig.builder().build()); - } + DurableFuture invokeAsync(String name, String functionName, U payload, TypeToken resultType); /** * Asynchronously invokes another Lambda function using a {@link TypeToken} and custom configuration. @@ -396,52 +205,17 @@ public DurableFuture invokeAsync(String name, String functionName, U p * @param config the invoke configuration (custom SerDes for result and payload) * @return a future representing the invocation result */ - public DurableFuture invokeAsync( - String name, String functionName, U payload, TypeToken typeToken, InvokeConfig config) { - Objects.requireNonNull(config, "config cannot be null"); - Objects.requireNonNull(typeToken, "typeToken cannot be null"); - ParameterValidator.validateOperationName(name); - - if (config.serDes() == null) { - config = config.toBuilder().serDes(getDurableConfig().getSerDes()).build(); - } - if (config.payloadSerDes() == null) { - config = config.toBuilder() - .payloadSerDes(getDurableConfig().getSerDes()) - .build(); - } - var operationId = nextOperationId(); - - // Create and start invoke operation - var operation = new InvokeOperation<>( - OperationIdentifier.of(operationId, name, OperationType.CHAINED_INVOKE), - functionName, - payload, - typeToken, - config, - this); - - operation.execute(); // checkpoint the invoke operation - return operation; // Block (will throw SuspendExecutionException if needed) - } - - // ========== createCallback methods ========== + DurableFuture invokeAsync( + String name, String functionName, U payload, TypeToken typeToken, InvokeConfig config); /** Creates a callback with custom configuration. */ - public DurableCallbackFuture createCallback(String name, Class resultType, CallbackConfig config) { - return createCallback(name, TypeToken.get(resultType), config); - } + DurableCallbackFuture createCallback(String name, Class resultType, CallbackConfig config); /** Creates a callback using a {@link TypeToken} for generic result types. */ - public DurableCallbackFuture createCallback(String name, TypeToken typeToken) { - return createCallback(name, typeToken, CallbackConfig.builder().build()); - } + DurableCallbackFuture createCallback(String name, TypeToken typeToken); /** Creates a callback with default configuration. */ - public DurableCallbackFuture createCallback(String name, Class resultType) { - return createCallback( - name, TypeToken.get(resultType), CallbackConfig.builder().build()); - } + DurableCallbackFuture createCallback(String name, Class resultType); /** * Creates a callback operation that suspends execution until an external system completes it. @@ -455,21 +229,7 @@ public DurableCallbackFuture createCallback(String name, Class resultT * @param config the callback configuration (custom SerDes) * @return a future containing the callback ID and eventual result */ - public DurableCallbackFuture createCallback(String name, TypeToken typeToken, CallbackConfig config) { - ParameterValidator.validateOperationName(name); - if (config.serDes() == null) { - config = config.toBuilder().serDes(getDurableConfig().getSerDes()).build(); - } - var operationId = nextOperationId(); - - var operation = new CallbackOperation<>( - OperationIdentifier.of(operationId, name, OperationType.CALLBACK), typeToken, config, this); - operation.execute(); - - return operation; - } - - // ========== runInChildContext methods ========== + DurableCallbackFuture createCallback(String name, TypeToken typeToken, CallbackConfig config); /** * Runs a function in a child context, blocking until it completes. @@ -483,123 +243,40 @@ public DurableCallbackFuture createCallback(String name, TypeToken typ * @param func the function to execute, receiving a child {@link DurableContext} * @return the child context result */ - public T runInChildContext(String name, Class resultType, Function func) { - return runInChildContextAsync(name, TypeToken.get(resultType), func).get(); - } + T runInChildContext(String name, Class resultType, Function func); /** * Runs a function in a child context using a {@link TypeToken} for generic result types, blocking until complete. */ - public T runInChildContext(String name, TypeToken typeToken, Function func) { - return runInChildContextAsync(name, typeToken, func).get(); - } + T runInChildContext(String name, TypeToken typeToken, Function func); /** Asynchronously runs a function in a child context, returning a {@link DurableFuture}. */ - public DurableFuture runInChildContextAsync( - String name, Class resultType, Function func) { - return runInChildContextAsync(name, TypeToken.get(resultType), func); - } + DurableFuture runInChildContextAsync(String name, Class resultType, Function func); /** Asynchronously runs a function in a child context using a {@link TypeToken} for generic result types. */ - public DurableFuture runInChildContextAsync( - String name, TypeToken typeToken, Function func) { - return runInChildContextAsync(name, typeToken, func, OperationSubType.RUN_IN_CHILD_CONTEXT); - } - - private DurableFuture runInChildContextAsync( - String name, TypeToken typeToken, Function func, OperationSubType subType) { - Objects.requireNonNull(typeToken, "typeToken cannot be null"); - ParameterValidator.validateOperationName(name); - var operationId = nextOperationId(); - - var operation = new ChildContextOperation<>( - OperationIdentifier.of(operationId, name, OperationType.CONTEXT, subType), - func, - typeToken, - getDurableConfig().getSerDes(), - this); - - operation.execute(); - return operation; - } - - // ========== map methods ========== - - public MapResult map(String name, Collection items, Class resultType, MapFunction function) { - return mapAsync( - name, - items, - TypeToken.get(resultType), - function, - MapConfig.builder().build()) - .get(); - } - - public MapResult map( - String name, Collection items, Class resultType, MapFunction function, MapConfig config) { - return mapAsync(name, items, TypeToken.get(resultType), function, config) - .get(); - } - - public MapResult map( - String name, Collection items, TypeToken resultType, MapFunction function) { - return mapAsync(name, items, resultType, function, MapConfig.builder().build()) - .get(); - } - - public MapResult map( - String name, Collection items, TypeToken resultType, MapFunction function, MapConfig config) { - return mapAsync(name, items, resultType, function, config).get(); - } - - public DurableFuture> mapAsync( - String name, Collection items, Class resultType, MapFunction function) { - return mapAsync( - name, - items, - TypeToken.get(resultType), - function, - MapConfig.builder().build()); - } - - public DurableFuture> mapAsync( - String name, Collection items, Class resultType, MapFunction function, MapConfig config) { - return mapAsync(name, items, TypeToken.get(resultType), function, config); - } - - public DurableFuture> mapAsync( - String name, Collection items, TypeToken resultType, MapFunction function) { - return mapAsync(name, items, resultType, function, MapConfig.builder().build()); - } - - public DurableFuture> mapAsync( - String name, Collection items, TypeToken resultType, MapFunction function, MapConfig config) { - Objects.requireNonNull(items, "items cannot be null"); - Objects.requireNonNull(function, "function cannot be null"); - Objects.requireNonNull(resultType, "resultType cannot be null"); - Objects.requireNonNull(config, "config cannot be null"); - ParameterValidator.validateOperationName(name); - ParameterValidator.validateOrderedCollection(items); - - if (config.serDes() == null) { - config = config.toBuilder().serDes(getDurableConfig().getSerDes()).build(); - } - - // Short-circuit for empty collections — no checkpoint overhead - if (items.isEmpty()) { - return new CompletedDurableFuture<>(MapResult.empty()); - } - - // Convert to List for deterministic index-based access - var itemList = List.copyOf(items); - var operationId = nextOperationId(); - - var operation = new MapOperation<>(operationId, name, itemList, function, resultType, config, this); - operation.execute(); - return operation; - } - - // ========== parallel methods ========== + DurableFuture runInChildContextAsync(String name, TypeToken typeToken, Function func); + + MapResult map(String name, Collection items, Class resultType, MapFunction function); + + MapResult map( + String name, Collection items, Class resultType, MapFunction function, MapConfig config); + + MapResult map(String name, Collection items, TypeToken resultType, MapFunction function); + + MapResult map( + String name, Collection items, TypeToken resultType, MapFunction function, MapConfig config); + + DurableFuture> mapAsync( + String name, Collection items, Class resultType, MapFunction function); + + DurableFuture> mapAsync( + String name, Collection items, Class resultType, MapFunction function, MapConfig config); + + DurableFuture> mapAsync( + String name, Collection items, TypeToken resultType, MapFunction function); + + DurableFuture> mapAsync( + String name, Collection items, TypeToken resultType, MapFunction function, MapConfig config); /** * Creates a {@link ParallelContext} for executing multiple branches concurrently. @@ -607,25 +284,7 @@ public DurableFuture> mapAsync( * @param config the parallel execution configuration * @return a new ParallelContext for registering and executing branches */ - public ParallelContext parallel(String name, ParallelConfig config) { - Objects.requireNonNull(config, "config cannot be null"); - var operationId = nextOperationId(); - - var parallelOp = new ParallelOperation<>( - OperationIdentifier.of(operationId, name, OperationType.CONTEXT, OperationSubType.PARALLEL), - TypeToken.get(Void.class), - getDurableConfig().getSerDes(), - this, - config.maxConcurrency(), - config.minSuccessful(), - config.toleratedFailureCount()); - - parallelOp.execute(); - - return new ParallelContext(parallelOp, this); - } - - // ========= waitForCallback methods ============= + ParallelContext parallel(String name, ParallelConfig config); /** * Executes a submitter function and waits for an external callback, blocking until the callback completes. @@ -639,67 +298,38 @@ public ParallelContext parallel(String name, ParallelConfig config) { * @param func the submitter function, receiving the callback ID and a {@link StepContext} * @return the callback result */ - public T waitForCallback(String name, Class resultType, BiConsumer func) { - return waitForCallbackAsync( - name, - TypeToken.get(resultType), - func, - WaitForCallbackConfig.builder().build()) - .get(); - } + T waitForCallback(String name, Class resultType, BiConsumer func); /** Executes a submitter and waits for an external callback using a {@link TypeToken}, blocking until complete. */ - public T waitForCallback(String name, TypeToken typeToken, BiConsumer func) { - return waitForCallbackAsync( - name, typeToken, func, WaitForCallbackConfig.builder().build()) - .get(); - } + T waitForCallback(String name, TypeToken typeToken, BiConsumer func); /** Executes a submitter and waits for an external callback with custom configuration, blocking until complete. */ - public T waitForCallback( + T waitForCallback( String name, Class resultType, BiConsumer func, - WaitForCallbackConfig waitForCallbackConfig) { - return waitForCallbackAsync(name, TypeToken.get(resultType), func, waitForCallbackConfig) - .get(); - } + WaitForCallbackConfig waitForCallbackConfig); /** Executes a submitter and waits for an external callback using a {@link TypeToken} and custom configuration. */ - public T waitForCallback( + T waitForCallback( String name, TypeToken typeToken, BiConsumer func, - WaitForCallbackConfig waitForCallbackConfig) { - return waitForCallbackAsync(name, typeToken, func, waitForCallbackConfig) - .get(); - } + WaitForCallbackConfig waitForCallbackConfig); /** Asynchronously executes a submitter and waits for an external callback. */ - public DurableFuture waitForCallbackAsync( - String name, Class resultType, BiConsumer func) { - return waitForCallbackAsync( - name, - TypeToken.get(resultType), - func, - WaitForCallbackConfig.builder().build()); - } + DurableFuture waitForCallbackAsync(String name, Class resultType, BiConsumer func); /** Asynchronously executes a submitter and waits for an external callback using a {@link TypeToken}. */ - public DurableFuture waitForCallbackAsync( - String name, TypeToken typeToken, BiConsumer func) { - return waitForCallbackAsync( - name, typeToken, func, WaitForCallbackConfig.builder().build()); - } + DurableFuture waitForCallbackAsync( + String name, TypeToken typeToken, BiConsumer func); /** Asynchronously executes a submitter and waits for an external callback with custom configuration. */ - public DurableFuture waitForCallbackAsync( + DurableFuture waitForCallbackAsync( String name, Class resultType, BiConsumer func, - WaitForCallbackConfig waitForCallbackConfig) { - return waitForCallbackAsync(name, TypeToken.get(resultType), func, waitForCallbackConfig); - } + WaitForCallbackConfig waitForCallbackConfig); /** * Asynchronously executes a submitter and waits for an external callback using a {@link TypeToken} and custom @@ -716,133 +346,33 @@ public DurableFuture waitForCallbackAsync( * @param waitForCallbackConfig the configuration for both the callback and submitter step * @return a future representing the callback result */ - public DurableFuture waitForCallbackAsync( + DurableFuture waitForCallbackAsync( String name, TypeToken typeToken, BiConsumer func, - WaitForCallbackConfig waitForCallbackConfig) { - Objects.requireNonNull(typeToken, "typeToken cannot be null"); - Objects.requireNonNull(waitForCallbackConfig, "waitForCallbackConfig cannot be null"); - // waitForCallback adds a suffix for the callback operation name and the submitter operation name so - // the length restriction of waitForCallback name is different from the other operations. - ParameterValidator.validateOperationName(name, MAX_WAIT_FOR_CALLBACK_NAME_LENGTH); - - var finalWaitForCallbackConfig = waitForCallbackConfig.stepConfig().serDes() == null - ? waitForCallbackConfig.toBuilder() - .stepConfig(waitForCallbackConfig.stepConfig().toBuilder() - .serDes(getDurableConfig().getSerDes()) - .build()) - .build() - : waitForCallbackConfig; - - return runInChildContextAsync( - name, - typeToken, - childCtx -> { - var callback = childCtx.createCallback( - name + WAIT_FOR_CALLBACK_CALLBACK_SUFFIX, - typeToken, - finalWaitForCallbackConfig.callbackConfig()); - childCtx.step( - name + WAIT_FOR_CALLBACK_SUBMITTER_SUFFIX, - Void.class, - stepCtx -> { - func.accept(callback.callbackId(), stepCtx); - return null; - }, - finalWaitForCallbackConfig.stepConfig()); - return callback.get(); - }, - OperationSubType.WAIT_FOR_CALLBACK); - } - - // ========== waitForCondition methods ========== - - public T waitForCondition( + WaitForCallbackConfig waitForCallbackConfig); + + T waitForCondition( String name, Class resultType, BiFunction checkFunc, - WaitForConditionConfig config) { - return waitForConditionAsync(name, TypeToken.get(resultType), checkFunc, config) - .get(); - } + WaitForConditionConfig config); - public T waitForCondition( + T waitForCondition( String name, TypeToken typeToken, BiFunction checkFunc, - WaitForConditionConfig config) { - return waitForConditionAsync(name, typeToken, checkFunc, config).get(); - } + WaitForConditionConfig config); - public DurableFuture waitForConditionAsync( + DurableFuture waitForConditionAsync( String name, Class resultType, BiFunction checkFunc, - WaitForConditionConfig config) { - return waitForConditionAsync(name, TypeToken.get(resultType), checkFunc, config); - } + WaitForConditionConfig config); - public DurableFuture waitForConditionAsync( + DurableFuture waitForConditionAsync( String name, TypeToken typeToken, BiFunction checkFunc, - WaitForConditionConfig config) { - Objects.requireNonNull(config, "config cannot be null"); - Objects.requireNonNull(typeToken, "typeToken cannot be null"); - ParameterValidator.validateOperationName(name); - - if (config.serDes() == null) { - config = WaitForConditionConfig.builder(config.waitStrategy(), config.initialState()) - .serDes(getDurableConfig().getSerDes()) - .build(); - } - var operationId = nextOperationId(); - - var operation = new WaitForConditionOperation<>(operationId, name, checkFunc, typeToken, config, this); - - operation.execute(); - - return operation; - } - - // =============== accessors ================ - /** - * Returns a logger with execution context information for replay-aware logging. - * - * @return the durable logger - */ - public DurableLogger getLogger() { - // lazy initialize logger - if (logger == null) { - synchronized (this) { - if (logger == null) { - logger = new DurableLogger(LoggerFactory.getLogger(DurableContext.class), this); - } - } - } - return logger; - } - - /** - * Clears the logger's thread properties. Called during context destruction to prevent memory leaks and ensure clean - * state for subsequent executions. - */ - @Override - public void close() { - if (logger != null) { - logger.close(); - } - super.close(); - } - - /** - * Get the next operationId. Returns a globally unique operation ID by hashing a sequential operation counter. For - * root contexts, the counter value is hashed directly (e.g. "1", "2", "3"). For child contexts, the values are - * prefixed with the parent hashed contextId (e.g. "-1", "-2" inside parent context ). This - * matches the Python SDK's stepPrefix convention and prevents ID collisions in checkpoint batches. - */ - private String nextOperationId() { - return operationIdGenerator.nextOperationId(); - } + WaitForConditionConfig config); } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/DurableExecutor.java b/sdk/src/main/java/software/amazon/lambda/durable/DurableExecutor.java index 9b763dba4..721c8d5d6 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/DurableExecutor.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/DurableExecutor.java @@ -14,6 +14,7 @@ import software.amazon.awssdk.services.lambda.model.OperationAction; import software.amazon.awssdk.services.lambda.model.OperationType; import software.amazon.awssdk.services.lambda.model.OperationUpdate; +import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.exception.DurableOperationException; import software.amazon.lambda.durable.exception.IllegalDurableOperationException; import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException; @@ -49,7 +50,8 @@ public static DurableExecutionOutput execute( var userInput = extractUserInput( executionManager.getExecutionOperation(), config.getSerDes(), inputType); // use try-with-resources to clear logger properties - try (var context = DurableContext.createRootContext(executionManager, config, lambdaContext)) { + try (var context = + DurableContextImpl.createRootContext(executionManager, config, lambdaContext)) { return handler.apply(userInput, context); } }, diff --git a/sdk/src/main/java/software/amazon/lambda/durable/StepContext.java b/sdk/src/main/java/software/amazon/lambda/durable/StepContext.java index 3fe16ab21..548e335c1 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/StepContext.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/StepContext.java @@ -2,67 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 package software.amazon.lambda.durable; -import com.amazonaws.services.lambda.runtime.Context; -import org.slf4j.LoggerFactory; -import software.amazon.lambda.durable.execution.ExecutionManager; -import software.amazon.lambda.durable.execution.ThreadType; -import software.amazon.lambda.durable.logging.DurableLogger; - -/** - * Context available inside a step operation's user function. - * - *

Provides access to the current retry attempt number and a logger that includes execution metadata. Extends - * {@link BaseContext} for thread lifecycle management. - */ -public class StepContext extends BaseContext { - private volatile DurableLogger logger; - private final int attempt; - - /** - * Creates a new StepContext instance for use in step operations. - * - * @param executionManager Manages durable execution state and operations - * @param durableConfig Configuration for durable execution behavior - * @param lambdaContext AWS Lambda runtime context - * @param stepOperationId Unique identifier for this context instance that equals to step operation id - * @param stepOperationName the name of the step operation - * @param attempt the current retry attempt number (1-based) - */ - protected StepContext( - ExecutionManager executionManager, - DurableConfig durableConfig, - Context lambdaContext, - String stepOperationId, - String stepOperationName, - int attempt) { - super(executionManager, durableConfig, lambdaContext, stepOperationId, stepOperationName, ThreadType.STEP); - this.attempt = attempt; - } - - /** Returns the current retry attempt number (1-based). */ - public int getAttempt() { - return attempt; - } - - @Override - public DurableLogger getLogger() { - // lazy initialize logger - if (logger == null) { - synchronized (this) { - if (logger == null) { - logger = new DurableLogger(LoggerFactory.getLogger(StepContext.class), this); - } - } - } - return logger; - } - - /** Closes the logger for this context. */ - @Override - public void close() { - if (logger != null) { - logger.close(); - } - super.close(); - } +public interface StepContext extends BaseContext { + /** Returns the current retry attempt number (0-based). */ + int getAttempt(); } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/context/BaseContextImpl.java b/sdk/src/main/java/software/amazon/lambda/durable/context/BaseContextImpl.java new file mode 100644 index 000000000..89902f6a4 --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/context/BaseContextImpl.java @@ -0,0 +1,137 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.context; + +import com.amazonaws.services.lambda.runtime.Context; +import software.amazon.lambda.durable.BaseContext; +import software.amazon.lambda.durable.DurableConfig; +import software.amazon.lambda.durable.execution.ExecutionManager; +import software.amazon.lambda.durable.execution.SuspendExecutionException; +import software.amazon.lambda.durable.execution.ThreadContext; +import software.amazon.lambda.durable.execution.ThreadType; + +public abstract class BaseContextImpl implements AutoCloseable, BaseContext { + private final ExecutionManager executionManager; + private final DurableConfig durableConfig; + private final Context lambdaContext; + private final String contextId; + private final String contextName; + private final ThreadType threadType; + + private boolean isReplaying; + + /** + * Creates a new BaseContext instance. + * + * @param executionManager the execution manager for thread coordination and state management + * @param durableConfig the durable execution configuration + * @param lambdaContext the AWS Lambda runtime context + * @param contextId the context ID, null for root context, set for child contexts + * @param contextName the human-readable name for this context + * @param threadType the type of thread this context runs on + */ + protected BaseContextImpl( + ExecutionManager executionManager, + DurableConfig durableConfig, + Context lambdaContext, + String contextId, + String contextName, + ThreadType threadType) { + this.executionManager = executionManager; + this.durableConfig = durableConfig; + this.lambdaContext = lambdaContext; + this.contextId = contextId; + this.contextName = contextName; + this.isReplaying = executionManager.hasOperationsForContext(contextId); + this.threadType = threadType; + + // write the thread id and type to thread local + executionManager.setCurrentThreadContext(new ThreadContext(contextId, threadType)); + } + + // =============== accessors ================ + + /** + * Returns the AWS Lambda runtime context. + * + * @return the Lambda context + */ + @Override + public Context getLambdaContext() { + return lambdaContext; + } + + /** + * Returns metadata about the current durable execution. + * + *

The execution context provides information that remains constant throughout the execution lifecycle, such as + * the durable execution ARN. This is useful for tracking execution progress, correlating logs, and referencing this + * execution in external systems. + * + * @return the execution context + */ + @Override + public String getExecutionArn() { + return executionManager.getDurableExecutionArn(); + } + + /** + * Returns the configuration for durable execution behavior. + * + * @return the durable configuration + */ + @Override + public DurableConfig getDurableConfig() { + return durableConfig; + } + + // ============= internal utilities =============== + + /** Gets the context ID for this context. Null for root context, set for child contexts. */ + @Override + public String getContextId() { + return contextId; + } + + /** Gets the context name for this context. Null for root context. */ + @Override + public String getContextName() { + return contextName; + } + + public ExecutionManager getExecutionManager() { + return executionManager; + } + + /** Returns whether this context is currently in replay mode. */ + @Override + public boolean isReplaying() { + return isReplaying; + } + + /** + * Transitions this context from replay to execution mode. Called when the first un-cached operation is encountered. + */ + public void setExecutionMode() { + this.isReplaying = false; + } + + @Override + public void close() { + // this is called in the user thread, after the context's user code has completed + if (getContextId() != null) { + // if this is a child context or a step context, we need to + // deregister the context's thread from the execution manager + try { + executionManager.deregisterActiveThread(getContextId()); + } catch (SuspendExecutionException e) { + // Expected when this is the last active thread. Must catch here because: + // 1/ This runs in a worker thread detached from handlerFuture + // 2/ Uncaught exception would prevent stepAsync().get() from resume + // Suspension/Termination is already signaled via + // suspendExecutionFuture/terminateExecutionFuture + // before the throw. + } + } + } +} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java b/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java new file mode 100644 index 000000000..ff2bfbb63 --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java @@ -0,0 +1,729 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.context; + +import com.amazonaws.services.lambda.runtime.Context; +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.Supplier; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.lambda.model.OperationType; +import software.amazon.lambda.durable.CallbackConfig; +import software.amazon.lambda.durable.DurableCallbackFuture; +import software.amazon.lambda.durable.DurableConfig; +import software.amazon.lambda.durable.DurableContext; +import software.amazon.lambda.durable.DurableFuture; +import software.amazon.lambda.durable.InvokeConfig; +import software.amazon.lambda.durable.MapConfig; +import software.amazon.lambda.durable.MapFunction; +import software.amazon.lambda.durable.ParallelConfig; +import software.amazon.lambda.durable.ParallelContext; +import software.amazon.lambda.durable.StepConfig; +import software.amazon.lambda.durable.StepContext; +import software.amazon.lambda.durable.TypeToken; +import software.amazon.lambda.durable.WaitForCallbackConfig; +import software.amazon.lambda.durable.WaitForConditionConfig; +import software.amazon.lambda.durable.execution.ExecutionManager; +import software.amazon.lambda.durable.execution.OperationIdGenerator; +import software.amazon.lambda.durable.execution.ThreadType; +import software.amazon.lambda.durable.logging.DurableLogger; +import software.amazon.lambda.durable.model.MapResult; +import software.amazon.lambda.durable.model.OperationIdentifier; +import software.amazon.lambda.durable.model.OperationSubType; +import software.amazon.lambda.durable.operation.CallbackOperation; +import software.amazon.lambda.durable.operation.ChildContextOperation; +import software.amazon.lambda.durable.operation.InvokeOperation; +import software.amazon.lambda.durable.operation.MapOperation; +import software.amazon.lambda.durable.operation.ParallelOperation; +import software.amazon.lambda.durable.operation.StepOperation; +import software.amazon.lambda.durable.operation.WaitForConditionOperation; +import software.amazon.lambda.durable.operation.WaitOperation; +import software.amazon.lambda.durable.util.CompletedDurableFuture; +import software.amazon.lambda.durable.validation.ParameterValidator; + +/** + * User-facing API for defining durable operations within a workflow. + * + *

Provides methods for creating steps, waits, chained invokes, callbacks, and child contexts. Each method creates a + * checkpoint-backed operation that survives Lambda interruptions. + */ +public class DurableContextImpl extends BaseContextImpl implements DurableContext { + private static final String WAIT_FOR_CALLBACK_CALLBACK_SUFFIX = "-callback"; + private static final String WAIT_FOR_CALLBACK_SUBMITTER_SUFFIX = "-submitter"; + private static final int MAX_WAIT_FOR_CALLBACK_NAME_LENGTH = ParameterValidator.MAX_OPERATION_NAME_LENGTH + - Math.max(WAIT_FOR_CALLBACK_CALLBACK_SUFFIX.length(), WAIT_FOR_CALLBACK_SUBMITTER_SUFFIX.length()); + private final OperationIdGenerator operationIdGenerator; + private volatile DurableLogger logger; + + /** Shared initialization — sets all fields. */ + private DurableContextImpl( + ExecutionManager executionManager, + DurableConfig durableConfig, + Context lambdaContext, + String contextId, + String contextName) { + super(executionManager, durableConfig, lambdaContext, contextId, contextName, ThreadType.CONTEXT); + operationIdGenerator = new OperationIdGenerator(contextId); + } + + /** + * Creates a root context (contextId = null) + * + *

The context itself always has a null contextId (making it a root context). + * + * @param executionManager the execution manager + * @param durableConfig the durable configuration + * @param lambdaContext the Lambda context + * @return a new root DurableContext + */ + public static DurableContextImpl createRootContext( + ExecutionManager executionManager, DurableConfig durableConfig, Context lambdaContext) { + return new DurableContextImpl(executionManager, durableConfig, lambdaContext, null, null); + } + + /** + * Creates a child context. + * + * @param childContextId the child context's ID (the CONTEXT operation's operation ID) + * @param childContextName the name of the child context + * @return a new DurableContext for the child context + */ + public DurableContextImpl createChildContext(String childContextId, String childContextName) { + return new DurableContextImpl( + getExecutionManager(), getDurableConfig(), getLambdaContext(), childContextId, childContextName); + } + + /** + * Creates a step context for executing step operations. + * + * @param stepOperationId the ID of the step operation (used for thread registration) + * @param stepOperationName the name of the step operation + * @param attempt the current retry attempt number (0-based) + * @return a new StepContext instance + */ + public StepContextImpl createStepContext(String stepOperationId, String stepOperationName, int attempt) { + return new StepContextImpl( + getExecutionManager(), + getDurableConfig(), + getLambdaContext(), + stepOperationId, + stepOperationName, + attempt); + } + + // ========== step methods ========== + + @Override + public T step(String name, Class resultType, Function func) { + return step(name, TypeToken.get(resultType), func, StepConfig.builder().build()); + } + + @Override + public T step(String name, Class resultType, Function func, StepConfig config) { + // Simply delegate to stepAsync and block on the result + return stepAsync(name, resultType, func, config).get(); + } + + @Override + public T step(String name, TypeToken typeToken, Function func) { + return step(name, typeToken, func, StepConfig.builder().build()); + } + + @Override + public T step(String name, TypeToken typeToken, Function func, StepConfig config) { + // Simply delegate to stepAsync and block on the result + return stepAsync(name, typeToken, func, config).get(); + } + + @Override + public DurableFuture stepAsync(String name, Class resultType, Function func) { + return stepAsync( + name, TypeToken.get(resultType), func, StepConfig.builder().build()); + } + + @Override + public DurableFuture stepAsync( + String name, Class resultType, Function func, StepConfig config) { + return stepAsync(name, TypeToken.get(resultType), func, config); + } + + @Override + public DurableFuture stepAsync(String name, TypeToken typeToken, Function func) { + return stepAsync(name, typeToken, func, StepConfig.builder().build()); + } + + @Override + public DurableFuture stepAsync( + String name, TypeToken typeToken, Function func, StepConfig config) { + Objects.requireNonNull(config, "config cannot be null"); + Objects.requireNonNull(typeToken, "typeToken cannot be null"); + ParameterValidator.validateOperationName(name); + + if (config.serDes() == null) { + config = config.toBuilder().serDes(getDurableConfig().getSerDes()).build(); + } + var operationId = nextOperationId(); + + // Create and start step operation with TypeToken + var operation = new StepOperation<>( + OperationIdentifier.of(operationId, name, OperationType.STEP), func, typeToken, config, this); + + operation.execute(); // Start the step (returns immediately) + + return operation; + } + + /** @deprecated use the variants accepting StepContext instead */ + @Deprecated + @Override + public T step(String name, Class resultType, Supplier func) { + return stepAsync( + name, + TypeToken.get(resultType), + func, + StepConfig.builder().build()) + .get(); + } + + /** @deprecated use the variants accepting StepContext instead */ + @Deprecated + @Override + public T step(String name, Class resultType, Supplier func, StepConfig config) { + // Simply delegate to stepAsync and block on the result + return stepAsync(name, TypeToken.get(resultType), func, config).get(); + } + + /** @deprecated use the variants accepting StepContext instead */ + @Deprecated + @Override + public T step(String name, TypeToken typeToken, Supplier func) { + return stepAsync(name, typeToken, func, StepConfig.builder().build()).get(); + } + + /** @deprecated use the variants accepting StepContext instead */ + @Deprecated + @Override + public T step(String name, TypeToken typeToken, Supplier func, StepConfig config) { + // Simply delegate to stepAsync and block on the result + return stepAsync(name, typeToken, func, config).get(); + } + + /** @deprecated use the variants accepting StepContext instead */ + @Deprecated + @Override + public DurableFuture stepAsync(String name, Class resultType, Supplier func) { + return stepAsync( + name, TypeToken.get(resultType), func, StepConfig.builder().build()); + } + + /** @deprecated use the variants accepting StepContext instead */ + @Deprecated + @Override + public DurableFuture stepAsync(String name, Class resultType, Supplier func, StepConfig config) { + return stepAsync(name, TypeToken.get(resultType), func, config); + } + + /** @deprecated use the variants accepting StepContext instead */ + @Deprecated + @Override + public DurableFuture stepAsync(String name, TypeToken typeToken, Supplier func) { + return stepAsync(name, typeToken, func, StepConfig.builder().build()); + } + + /** @deprecated use the variants accepting StepContext instead */ + @Deprecated + @Override + public DurableFuture stepAsync(String name, TypeToken typeToken, Supplier func, StepConfig config) { + return stepAsync(name, typeToken, stepContext -> func.get(), config); + } + + // ========== wait methods ========== + + @Override + public Void wait(String name, Duration duration) { + // Block (will throw SuspendExecutionException if there is no active thread) + return waitAsync(name, duration).get(); + } + + @Override + public DurableFuture waitAsync(String name, Duration duration) { + ParameterValidator.validateDuration(duration, "Wait duration"); + ParameterValidator.validateOperationName(name); + + var operationId = nextOperationId(); + + // Create and start wait operation + var operation = + new WaitOperation(OperationIdentifier.of(operationId, name, OperationType.WAIT), duration, this); + + operation.execute(); // Checkpoint the wait + return operation; + } + + // ========== chained invoke methods ========== + + @Override + public T invoke(String name, String functionName, U payload, Class resultType) { + return invokeAsync( + name, + functionName, + payload, + TypeToken.get(resultType), + InvokeConfig.builder().build()) + .get(); + } + + @Override + public T invoke(String name, String functionName, U payload, Class resultType, InvokeConfig config) { + return invokeAsync(name, functionName, payload, TypeToken.get(resultType), config) + .get(); + } + + @Override + public T invoke(String name, String functionName, U payload, TypeToken typeToken) { + return invokeAsync( + name, + functionName, + payload, + typeToken, + InvokeConfig.builder().build()) + .get(); + } + + @Override + public T invoke(String name, String functionName, U payload, TypeToken typeToken, InvokeConfig config) { + return invokeAsync(name, functionName, payload, typeToken, config).get(); + } + + /** Asynchronously invokes another Lambda function with custom configuration. */ + @Override + public DurableFuture invokeAsync( + String name, String functionName, U payload, Class resultType, InvokeConfig config) { + return invokeAsync(name, functionName, payload, TypeToken.get(resultType), config); + } + + @Override + public DurableFuture invokeAsync(String name, String functionName, U payload, Class resultType) { + return invokeAsync( + name, + functionName, + payload, + TypeToken.get(resultType), + InvokeConfig.builder().build()); + } + + @Override + public DurableFuture invokeAsync(String name, String functionName, U payload, TypeToken resultType) { + return invokeAsync( + name, functionName, payload, resultType, InvokeConfig.builder().build()); + } + + @Override + public DurableFuture invokeAsync( + String name, String functionName, U payload, TypeToken typeToken, InvokeConfig config) { + Objects.requireNonNull(config, "config cannot be null"); + Objects.requireNonNull(typeToken, "typeToken cannot be null"); + ParameterValidator.validateOperationName(name); + + if (config.serDes() == null) { + config = config.toBuilder().serDes(getDurableConfig().getSerDes()).build(); + } + if (config.payloadSerDes() == null) { + config = config.toBuilder() + .payloadSerDes(getDurableConfig().getSerDes()) + .build(); + } + var operationId = nextOperationId(); + + // Create and start invoke operation + var operation = new InvokeOperation<>( + OperationIdentifier.of(operationId, name, OperationType.CHAINED_INVOKE), + functionName, + payload, + typeToken, + config, + this); + + operation.execute(); // checkpoint the invoke operation + return operation; // Block (will throw SuspendExecutionException if needed) + } + + // ========== createCallback methods ========== + + @Override + public DurableCallbackFuture createCallback(String name, Class resultType, CallbackConfig config) { + return createCallback(name, TypeToken.get(resultType), config); + } + + @Override + public DurableCallbackFuture createCallback(String name, TypeToken typeToken) { + return createCallback(name, typeToken, CallbackConfig.builder().build()); + } + + @Override + public DurableCallbackFuture createCallback(String name, Class resultType) { + return createCallback( + name, TypeToken.get(resultType), CallbackConfig.builder().build()); + } + + @Override + public DurableCallbackFuture createCallback(String name, TypeToken typeToken, CallbackConfig config) { + ParameterValidator.validateOperationName(name); + if (config.serDes() == null) { + config = config.toBuilder().serDes(getDurableConfig().getSerDes()).build(); + } + var operationId = nextOperationId(); + + var operation = new CallbackOperation<>( + OperationIdentifier.of(operationId, name, OperationType.CALLBACK), typeToken, config, this); + operation.execute(); + + return operation; + } + + // ========== runInChildContext methods ========== + + @Override + public T runInChildContext(String name, Class resultType, Function func) { + return runInChildContextAsync(name, TypeToken.get(resultType), func).get(); + } + + @Override + public T runInChildContext(String name, TypeToken typeToken, Function func) { + return runInChildContextAsync(name, typeToken, func).get(); + } + + @Override + public DurableFuture runInChildContextAsync( + String name, Class resultType, Function func) { + return runInChildContextAsync(name, TypeToken.get(resultType), func); + } + + @Override + public DurableFuture runInChildContextAsync( + String name, TypeToken typeToken, Function func) { + return runInChildContextAsync(name, typeToken, func, OperationSubType.RUN_IN_CHILD_CONTEXT); + } + + private DurableFuture runInChildContextAsync( + String name, TypeToken typeToken, Function func, OperationSubType subType) { + Objects.requireNonNull(typeToken, "typeToken cannot be null"); + ParameterValidator.validateOperationName(name); + var operationId = nextOperationId(); + + var operation = new ChildContextOperation<>( + OperationIdentifier.of(operationId, name, OperationType.CONTEXT, subType), + func, + typeToken, + getDurableConfig().getSerDes(), + this); + + operation.execute(); + return operation; + } + + // ========== map methods ========== + + @Override + public MapResult map(String name, Collection items, Class resultType, MapFunction function) { + return mapAsync( + name, + items, + TypeToken.get(resultType), + function, + MapConfig.builder().build()) + .get(); + } + + @Override + public MapResult map( + String name, Collection items, Class resultType, MapFunction function, MapConfig config) { + return mapAsync(name, items, TypeToken.get(resultType), function, config) + .get(); + } + + @Override + public MapResult map( + String name, Collection items, TypeToken resultType, MapFunction function) { + return mapAsync(name, items, resultType, function, MapConfig.builder().build()) + .get(); + } + + @Override + public MapResult map( + String name, Collection items, TypeToken resultType, MapFunction function, MapConfig config) { + return mapAsync(name, items, resultType, function, config).get(); + } + + @Override + public DurableFuture> mapAsync( + String name, Collection items, Class resultType, MapFunction function) { + return mapAsync( + name, + items, + TypeToken.get(resultType), + function, + MapConfig.builder().build()); + } + + @Override + public DurableFuture> mapAsync( + String name, Collection items, Class resultType, MapFunction function, MapConfig config) { + return mapAsync(name, items, TypeToken.get(resultType), function, config); + } + + @Override + public DurableFuture> mapAsync( + String name, Collection items, TypeToken resultType, MapFunction function) { + return mapAsync(name, items, resultType, function, MapConfig.builder().build()); + } + + @Override + public DurableFuture> mapAsync( + String name, Collection items, TypeToken resultType, MapFunction function, MapConfig config) { + Objects.requireNonNull(items, "items cannot be null"); + Objects.requireNonNull(function, "function cannot be null"); + Objects.requireNonNull(resultType, "resultType cannot be null"); + Objects.requireNonNull(config, "config cannot be null"); + ParameterValidator.validateOperationName(name); + ParameterValidator.validateOrderedCollection(items); + + if (config.serDes() == null) { + config = config.toBuilder().serDes(getDurableConfig().getSerDes()).build(); + } + + // Short-circuit for empty collections — no checkpoint overhead + if (items.isEmpty()) { + return new CompletedDurableFuture<>(MapResult.empty()); + } + + // Convert to List for deterministic index-based access + var itemList = List.copyOf(items); + var operationId = nextOperationId(); + + var operation = new MapOperation<>(operationId, name, itemList, function, resultType, config, this); + operation.execute(); + return operation; + } + + // ========== parallel methods ========== + + @Override + public ParallelContext parallel(String name, ParallelConfig config) { + Objects.requireNonNull(config, "config cannot be null"); + var operationId = nextOperationId(); + + var parallelOp = new ParallelOperation<>( + OperationIdentifier.of(operationId, name, OperationType.CONTEXT, OperationSubType.PARALLEL), + TypeToken.get(Void.class), + getDurableConfig().getSerDes(), + this, + config.maxConcurrency(), + config.minSuccessful(), + config.toleratedFailureCount()); + + parallelOp.execute(); + + return new ParallelContext(parallelOp, this); + } + + // ========= waitForCallback methods ============= + + @Override + public T waitForCallback(String name, Class resultType, BiConsumer func) { + return waitForCallbackAsync( + name, + TypeToken.get(resultType), + func, + WaitForCallbackConfig.builder().build()) + .get(); + } + + @Override + public T waitForCallback(String name, TypeToken typeToken, BiConsumer func) { + return waitForCallbackAsync( + name, typeToken, func, WaitForCallbackConfig.builder().build()) + .get(); + } + + @Override + public T waitForCallback( + String name, + Class resultType, + BiConsumer func, + WaitForCallbackConfig waitForCallbackConfig) { + return waitForCallbackAsync(name, TypeToken.get(resultType), func, waitForCallbackConfig) + .get(); + } + + @Override + public T waitForCallback( + String name, + TypeToken typeToken, + BiConsumer func, + WaitForCallbackConfig waitForCallbackConfig) { + return waitForCallbackAsync(name, typeToken, func, waitForCallbackConfig) + .get(); + } + + @Override + public DurableFuture waitForCallbackAsync( + String name, Class resultType, BiConsumer func) { + return waitForCallbackAsync( + name, + TypeToken.get(resultType), + func, + WaitForCallbackConfig.builder().build()); + } + + @Override + public DurableFuture waitForCallbackAsync( + String name, TypeToken typeToken, BiConsumer func) { + return waitForCallbackAsync( + name, typeToken, func, WaitForCallbackConfig.builder().build()); + } + + @Override + public DurableFuture waitForCallbackAsync( + String name, + Class resultType, + BiConsumer func, + WaitForCallbackConfig waitForCallbackConfig) { + return waitForCallbackAsync(name, TypeToken.get(resultType), func, waitForCallbackConfig); + } + + @Override + public DurableFuture waitForCallbackAsync( + String name, + TypeToken typeToken, + BiConsumer func, + WaitForCallbackConfig waitForCallbackConfig) { + Objects.requireNonNull(typeToken, "typeToken cannot be null"); + Objects.requireNonNull(waitForCallbackConfig, "waitForCallbackConfig cannot be null"); + // waitForCallback adds a suffix for the callback operation name and the submitter operation name so + // the length restriction of waitForCallback name is different from the other operations. + ParameterValidator.validateOperationName(name, MAX_WAIT_FOR_CALLBACK_NAME_LENGTH); + + var finalWaitForCallbackConfig = waitForCallbackConfig.stepConfig().serDes() == null + ? waitForCallbackConfig.toBuilder() + .stepConfig(waitForCallbackConfig.stepConfig().toBuilder() + .serDes(getDurableConfig().getSerDes()) + .build()) + .build() + : waitForCallbackConfig; + + return runInChildContextAsync( + name, + typeToken, + childCtx -> { + var callback = childCtx.createCallback( + name + WAIT_FOR_CALLBACK_CALLBACK_SUFFIX, + typeToken, + finalWaitForCallbackConfig.callbackConfig()); + childCtx.step( + name + WAIT_FOR_CALLBACK_SUBMITTER_SUFFIX, + Void.class, + stepCtx -> { + func.accept(callback.callbackId(), stepCtx); + return null; + }, + finalWaitForCallbackConfig.stepConfig()); + return callback.get(); + }, + OperationSubType.WAIT_FOR_CALLBACK); + } + + // ========== waitForCondition methods ========== + @Override + public T waitForCondition( + String name, + Class resultType, + BiFunction checkFunc, + WaitForConditionConfig config) { + return waitForConditionAsync(name, TypeToken.get(resultType), checkFunc, config) + .get(); + } + + @Override + public T waitForCondition( + String name, + TypeToken typeToken, + BiFunction checkFunc, + WaitForConditionConfig config) { + return waitForConditionAsync(name, typeToken, checkFunc, config).get(); + } + + @Override + public DurableFuture waitForConditionAsync( + String name, + Class resultType, + BiFunction checkFunc, + WaitForConditionConfig config) { + return waitForConditionAsync(name, TypeToken.get(resultType), checkFunc, config); + } + + @Override + public DurableFuture waitForConditionAsync( + String name, + TypeToken typeToken, + BiFunction checkFunc, + WaitForConditionConfig config) { + Objects.requireNonNull(config, "config cannot be null"); + Objects.requireNonNull(typeToken, "typeToken cannot be null"); + ParameterValidator.validateOperationName(name); + + if (config.serDes() == null) { + config = WaitForConditionConfig.builder(config.waitStrategy(), config.initialState()) + .serDes(getDurableConfig().getSerDes()) + .build(); + } + var operationId = nextOperationId(); + + var operation = new WaitForConditionOperation<>(operationId, name, checkFunc, typeToken, config, this); + + operation.execute(); + + return operation; + } + + // =============== accessors ================ + @Override + public DurableLogger getLogger() { + // lazy initialize logger + if (logger == null) { + synchronized (this) { + if (logger == null) { + logger = new DurableLogger(LoggerFactory.getLogger(DurableContext.class), this); + } + } + } + return logger; + } + + /** + * Clears the logger's thread properties. Called during context destruction to prevent memory leaks and ensure clean + * state for subsequent executions. + */ + @Override + public void close() { + if (logger != null) { + logger.close(); + } + super.close(); + } + + /** + * Get the next operationId. Returns a globally unique operation ID by hashing a sequential operation counter. For + * root contexts, the counter value is hashed directly (e.g. "1", "2", "3"). For child contexts, the values are + * prefixed with the parent hashed contextId (e.g. "-1", "-2" inside parent context ). This + * matches the Python SDK's stepPrefix convention and prevents ID collisions in checkpoint batches. + */ + private String nextOperationId() { + return operationIdGenerator.nextOperationId(); + } +} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/context/StepContextImpl.java b/sdk/src/main/java/software/amazon/lambda/durable/context/StepContextImpl.java new file mode 100644 index 000000000..874c5dce1 --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/context/StepContextImpl.java @@ -0,0 +1,72 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.context; + +import com.amazonaws.services.lambda.runtime.Context; +import org.slf4j.LoggerFactory; +import software.amazon.lambda.durable.BaseContext; +import software.amazon.lambda.durable.DurableConfig; +import software.amazon.lambda.durable.StepContext; +import software.amazon.lambda.durable.execution.ExecutionManager; +import software.amazon.lambda.durable.execution.ThreadType; +import software.amazon.lambda.durable.logging.DurableLogger; + +/** + * Context available inside a step operation's user function. + * + *

Provides access to the current retry attempt number and a logger that includes execution metadata. Extends + * {@link BaseContext} for thread lifecycle management. + */ +public class StepContextImpl extends BaseContextImpl implements StepContext { + private volatile DurableLogger logger; + private final int attempt; + + /** + * Creates a new StepContext instance for use in step operations. + * + * @param executionManager Manages durable execution state and operations + * @param durableConfig Configuration for durable execution behavior + * @param lambdaContext AWS Lambda runtime context + * @param stepOperationId Unique identifier for this context instance that equals to step operation id + * @param stepOperationName the name of the step operation + * @param attempt the current retry attempt number (0-based) + */ + protected StepContextImpl( + ExecutionManager executionManager, + DurableConfig durableConfig, + Context lambdaContext, + String stepOperationId, + String stepOperationName, + int attempt) { + super(executionManager, durableConfig, lambdaContext, stepOperationId, stepOperationName, ThreadType.STEP); + this.attempt = attempt; + } + + /** Returns the current retry attempt number (0-based). */ + @Override + public int getAttempt() { + return attempt; + } + + @Override + public DurableLogger getLogger() { + // lazy initialize logger + if (logger == null) { + synchronized (this) { + if (logger == null) { + logger = new DurableLogger(LoggerFactory.getLogger(StepContext.class), this); + } + } + } + return logger; + } + + /** Closes the logger for this context. */ + @Override + public void close() { + if (logger != null) { + logger.close(); + } + super.close(); + } +} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/logging/DurableLogger.java b/sdk/src/main/java/software/amazon/lambda/durable/logging/DurableLogger.java index 41f8157f7..5d95d34f2 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/logging/DurableLogger.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/logging/DurableLogger.java @@ -4,9 +4,9 @@ import org.slf4j.Logger; import org.slf4j.MDC; -import software.amazon.lambda.durable.BaseContext; import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.StepContext; +import software.amazon.lambda.durable.context.BaseContextImpl; /** * Logger wrapper that adds durable execution context to log entries via MDC and optionally suppresses logs during @@ -22,7 +22,7 @@ public class DurableLogger { static final String MDC_ATTEMPT = "attempt"; private final Logger delegate; - private final BaseContext context; + private final BaseContextImpl context; /** * Creates a DurableLogger wrapping the given SLF4J logger with execution context MDC entries. @@ -30,12 +30,12 @@ public class DurableLogger { * @param delegate the SLF4J logger to wrap * @param context the durable execution context providing MDC values */ - public DurableLogger(Logger delegate, BaseContext context) { + public DurableLogger(Logger delegate, BaseContextImpl context) { this.delegate = delegate; this.context = context; // execution arn - MDC.put(MDC_EXECUTION_ARN, context.getExecutionContext().getDurableExecutionArn()); + MDC.put(MDC_EXECUTION_ARN, context.getExecutionArn()); // lambda request id var requestId = diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseConcurrentOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseConcurrentOperation.java index 7a7456236..a7abbfb6b 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseConcurrentOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseConcurrentOperation.java @@ -23,6 +23,7 @@ import software.amazon.lambda.durable.CompletionConfig; import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.TypeToken; +import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.execution.OperationIdGenerator; import software.amazon.lambda.durable.model.CompletionReason; import software.amazon.lambda.durable.model.OperationIdentifier; @@ -56,7 +57,7 @@ public abstract class BaseConcurrentOperation extends BaseDurableOperation private final OperationSubType subType; private volatile CompletionReason completionReason; private volatile boolean earlyTermination = false; - private DurableContext rootContext; + private DurableContextImpl rootContext; private OperationIdGenerator operationIdGenerator; protected BaseConcurrentOperation( @@ -67,7 +68,7 @@ protected BaseConcurrentOperation( CompletionConfig completionConfig, TypeToken resultTypeToken, SerDes resultSerDes, - DurableContext durableContext) { + DurableContextImpl durableContext) { super( OperationIdentifier.of(operationId, name, OperationType.CONTEXT, subType), resultTypeToken, diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java index 1a058b55c..bd951b407 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java @@ -12,9 +12,9 @@ import software.amazon.awssdk.services.lambda.model.Operation; import software.amazon.awssdk.services.lambda.model.OperationType; import software.amazon.awssdk.services.lambda.model.OperationUpdate; -import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.DurableFuture; import software.amazon.lambda.durable.TypeToken; +import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.exception.IllegalDurableOperationException; import software.amazon.lambda.durable.exception.NonDeterministicExecutionException; import software.amazon.lambda.durable.exception.SerDesException; @@ -53,7 +53,7 @@ public abstract class BaseDurableOperation implements DurableFuture { private final TypeToken resultTypeToken; private final SerDes resultSerDes; protected final CompletableFuture completionFuture; - private final DurableContext durableContext; + private final DurableContextImpl durableContext; /** * Constructs a new durable operation. @@ -67,7 +67,7 @@ protected BaseDurableOperation( OperationIdentifier operationIdentifier, TypeToken resultTypeToken, SerDes resultSerDes, - DurableContext durableContext) { + DurableContextImpl durableContext) { this.operationIdentifier = operationIdentifier; this.durableContext = durableContext; this.executionManager = durableContext.getExecutionManager(); @@ -96,7 +96,7 @@ public String getName() { } /** Gets the parent context. */ - protected DurableContext getContext() { + protected DurableContextImpl getContext() { return durableContext; } @@ -116,6 +116,9 @@ public void execute() { validateReplay(existing); replay(existing); } else { + if (durableContext.isReplaying()) { + this.durableContext.setExecutionMode(); + } start(); } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/CallbackOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/CallbackOperation.java index 8d94464b0..9e1f63fc2 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/CallbackOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/CallbackOperation.java @@ -8,8 +8,8 @@ import software.amazon.awssdk.services.lambda.model.OperationUpdate; import software.amazon.lambda.durable.CallbackConfig; import software.amazon.lambda.durable.DurableCallbackFuture; -import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.TypeToken; +import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.exception.CallbackFailedException; import software.amazon.lambda.durable.exception.CallbackTimeoutException; import software.amazon.lambda.durable.model.OperationIdentifier; @@ -25,7 +25,7 @@ public CallbackOperation( OperationIdentifier operationIdentifier, TypeToken resultTypeToken, CallbackConfig config, - DurableContext durableContext) { + DurableContextImpl durableContext) { super(operationIdentifier, resultTypeToken, config.serDes(), durableContext); this.config = config; } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java index 45d3464de..d94e95610 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java @@ -17,6 +17,7 @@ import software.amazon.awssdk.services.lambda.model.OperationUpdate; import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.TypeToken; +import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.exception.CallbackFailedException; import software.amazon.lambda.durable.exception.CallbackSubmitterException; import software.amazon.lambda.durable.exception.CallbackTimeoutException; @@ -55,7 +56,7 @@ public ChildContextOperation( Function function, TypeToken resultTypeToken, SerDes resultSerDes, - DurableContext durableContext) { + DurableContextImpl durableContext) { this(operationIdentifier, function, resultTypeToken, resultSerDes, durableContext, null); } @@ -64,7 +65,7 @@ public ChildContextOperation( Function function, TypeToken resultTypeToken, SerDes resultSerDes, - DurableContext durableContext, + DurableContextImpl durableContext, ConcurrencyOperation parentOperation) { super(operationIdentifier, resultTypeToken, resultSerDes, durableContext); this.function = function; diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java index 99ab7224e..016cab11e 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java @@ -16,6 +16,7 @@ import org.slf4j.LoggerFactory; import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.TypeToken; +import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.execution.OperationIdGenerator; import software.amazon.lambda.durable.model.ConcurrencyCompletionStatus; import software.amazon.lambda.durable.model.OperationIdentifier; @@ -61,7 +62,7 @@ protected ConcurrencyOperation( OperationIdentifier operationIdentifier, TypeToken resultTypeToken, SerDes resultSerDes, - DurableContext durableContext, + DurableContextImpl durableContext, int maxConcurrency, int minSuccessful, int toleratedFailureCount, @@ -78,7 +79,7 @@ protected ConcurrencyOperation( OperationIdentifier operationIdentifier, TypeToken resultTypeToken, SerDes resultSerDes, - DurableContext durableContext, + DurableContextImpl durableContext, int maxConcurrency, int minSuccessful, int toleratedFailureCount) { @@ -113,7 +114,7 @@ protected abstract ChildContextOperation createItem( Function function, TypeToken resultType, SerDes serDes, - DurableContext parentContext); + DurableContextImpl parentContext); /** * Called when the concurrency operation succeeds (minSuccessful threshold met). Subclasses define checkpointing @@ -134,7 +135,6 @@ protected abstract ChildContextOperation createItem( * @param function the user function to execute * @param resultType the result type token * @param serDes the serializer/deserializer - * @param parentContext the parent durable context for creating child operations * @param the result type of the child operation * @return the created ChildContextOperation */ diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/InvokeOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/InvokeOperation.java index 8e1dc08d2..2eb540538 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/InvokeOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/InvokeOperation.java @@ -6,9 +6,9 @@ import software.amazon.awssdk.services.lambda.model.Operation; import software.amazon.awssdk.services.lambda.model.OperationAction; import software.amazon.awssdk.services.lambda.model.OperationUpdate; -import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.InvokeConfig; import software.amazon.lambda.durable.TypeToken; +import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.exception.InvokeException; import software.amazon.lambda.durable.exception.InvokeFailedException; import software.amazon.lambda.durable.exception.InvokeStoppedException; @@ -34,7 +34,7 @@ public InvokeOperation( U payload, TypeToken resultTypeToken, InvokeConfig config, - DurableContext durableContext) { + DurableContextImpl durableContext) { super(operationIdentifier, resultTypeToken, config.serDes(), durableContext); this.functionName = functionName; diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java index 924bb5211..e0f5e4760 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java @@ -6,10 +6,10 @@ import java.util.Collections; import java.util.List; import software.amazon.awssdk.services.lambda.model.OperationStatus; -import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.MapConfig; import software.amazon.lambda.durable.MapFunction; import software.amazon.lambda.durable.TypeToken; +import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.model.CompletionReason; import software.amazon.lambda.durable.model.MapResult; import software.amazon.lambda.durable.model.MapResultItem; @@ -37,7 +37,7 @@ public MapOperation( MapFunction function, TypeToken itemResultType, MapConfig config, - DurableContext durableContext) { + DurableContextImpl durableContext) { super( operationId, name, diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java index b4b0b5978..91388b8c7 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java @@ -10,6 +10,7 @@ import software.amazon.awssdk.services.lambda.model.OperationUpdate; import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.TypeToken; +import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.exception.ConcurrencyExecutionException; import software.amazon.lambda.durable.model.ConcurrencyCompletionStatus; import software.amazon.lambda.durable.model.OperationIdentifier; @@ -45,7 +46,7 @@ public ParallelOperation( OperationIdentifier operationIdentifier, TypeToken resultTypeToken, SerDes resultSerDes, - DurableContext durableContext, + DurableContextImpl durableContext, int maxConcurrency, int minSuccessful, int toleratedFailureCount) { @@ -66,7 +67,7 @@ protected ChildContextOperation createItem( Function function, TypeToken resultType, SerDes serDes, - DurableContext parentContext) { + DurableContextImpl parentContext) { return new ChildContextOperation<>( OperationIdentifier.of(operationId, name, OperationType.CONTEXT, OperationSubType.PARALLEL_BRANCH), function, diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java index 57898506f..afbf3b1fa 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java @@ -13,11 +13,11 @@ import software.amazon.awssdk.services.lambda.model.OperationStatus; import software.amazon.awssdk.services.lambda.model.OperationUpdate; import software.amazon.awssdk.services.lambda.model.StepOptions; -import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.StepConfig; import software.amazon.lambda.durable.StepContext; import software.amazon.lambda.durable.StepSemantics; import software.amazon.lambda.durable.TypeToken; +import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.exception.DurableOperationException; import software.amazon.lambda.durable.exception.StepFailedException; import software.amazon.lambda.durable.exception.StepInterruptedException; @@ -46,7 +46,7 @@ public StepOperation( Function function, TypeToken resultTypeToken, StepConfig config, - DurableContext durableContext) { + DurableContextImpl durableContext) { super(operationIdentifier, resultTypeToken, config.serDes(), durableContext); this.function = function; diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java index 58630b28c..4986ca90a 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java @@ -11,11 +11,11 @@ import software.amazon.awssdk.services.lambda.model.OperationType; import software.amazon.awssdk.services.lambda.model.OperationUpdate; import software.amazon.awssdk.services.lambda.model.StepOptions; -import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.StepContext; import software.amazon.lambda.durable.TypeToken; import software.amazon.lambda.durable.WaitForConditionConfig; import software.amazon.lambda.durable.WaitForConditionDecision; +import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.exception.DurableOperationException; import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException; import software.amazon.lambda.durable.exception.WaitForConditionException; @@ -45,7 +45,7 @@ public WaitForConditionOperation( BiFunction checkFunc, TypeToken resultTypeToken, WaitForConditionConfig config, - DurableContext durableContext) { + DurableContextImpl durableContext) { super( OperationIdentifier.of(operationId, name, OperationType.STEP, OperationSubType.WAIT_FOR_CONDITION), resultTypeToken, diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitOperation.java index c8efae138..5034eb7c0 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitOperation.java @@ -11,8 +11,8 @@ import software.amazon.awssdk.services.lambda.model.OperationStatus; import software.amazon.awssdk.services.lambda.model.OperationUpdate; import software.amazon.awssdk.services.lambda.model.WaitOptions; -import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.TypeToken; +import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.model.OperationIdentifier; import software.amazon.lambda.durable.serde.NoopSerDes; import software.amazon.lambda.durable.serde.SerDes; @@ -30,7 +30,8 @@ public class WaitOperation extends BaseDurableOperation { private final Duration duration; - public WaitOperation(OperationIdentifier operationIdentifier, Duration duration, DurableContext durableContext) { + public WaitOperation( + OperationIdentifier operationIdentifier, Duration duration, DurableContextImpl durableContext) { super(operationIdentifier, TypeToken.get(Void.class), NOOP_SER_DES, durableContext); this.duration = duration; } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/CompletedDurableFuture.java b/sdk/src/main/java/software/amazon/lambda/durable/util/CompletedDurableFuture.java similarity index 68% rename from sdk/src/main/java/software/amazon/lambda/durable/CompletedDurableFuture.java rename to sdk/src/main/java/software/amazon/lambda/durable/util/CompletedDurableFuture.java index 47245e2a9..b75f664dc 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/CompletedDurableFuture.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/util/CompletedDurableFuture.java @@ -1,6 +1,8 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -package software.amazon.lambda.durable; +package software.amazon.lambda.durable.util; + +import software.amazon.lambda.durable.DurableFuture; /** * A {@link DurableFuture} that is already completed with a value. @@ -9,10 +11,10 @@ * * @param the result type */ -class CompletedDurableFuture implements DurableFuture { +public class CompletedDurableFuture implements DurableFuture { private final T value; - CompletedDurableFuture(T value) { + public CompletedDurableFuture(T value) { this.value = value; } diff --git a/sdk/src/test/java/software/amazon/lambda/durable/DurableContextTest.java b/sdk/src/test/java/software/amazon/lambda/durable/DurableContextTest.java index 81e8c580d..5dab09890 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/DurableContextTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/DurableContextTest.java @@ -9,6 +9,7 @@ import java.util.List; import org.junit.jupiter.api.Test; import software.amazon.awssdk.services.lambda.model.*; +import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.execution.ExecutionManager; import software.amazon.lambda.durable.execution.SuspendExecutionException; import software.amazon.lambda.durable.execution.ThreadContext; @@ -46,7 +47,7 @@ private DurableContext createTestContext(List initialOperations) { "test-token", initialExecutionState), DurableConfig.builder().withDurableExecutionClient(client).build()); - var root = DurableContext.createRootContext( + var root = DurableContextImpl.createRootContext( executionManager, DurableConfig.builder().build(), null); executionManager.registerActiveThread(null); executionManager.setCurrentThreadContext(new ThreadContext(null, ThreadType.CONTEXT)); @@ -65,14 +66,11 @@ void testContextCreation() { void testGetExecutionContext() { var context = createTestContext(); - var executionContext = context.getExecutionContext(); - - assertNotNull(executionContext); - assertNotNull(executionContext.getDurableExecutionArn()); + assertNotNull(context.getExecutionArn()); assertEquals( "arn:aws:lambda:us-east-1:123456789012:function:test:$LATEST/durable-execution/" + EXECUTION_NAME + "/" + INVOCATION_ID, - executionContext.getDurableExecutionArn()); + context.getExecutionArn()); } @Test diff --git a/sdk/src/test/java/software/amazon/lambda/durable/ReplayValidationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/ReplayValidationTest.java index 13db5206e..05b49124f 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/ReplayValidationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/ReplayValidationTest.java @@ -15,6 +15,7 @@ import software.amazon.awssdk.services.lambda.model.OperationStatus; import software.amazon.awssdk.services.lambda.model.OperationType; import software.amazon.awssdk.services.lambda.model.StepDetails; +import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.exception.NonDeterministicExecutionException; import software.amazon.lambda.durable.execution.ExecutionManager; import software.amazon.lambda.durable.execution.ThreadContext; @@ -42,7 +43,7 @@ private DurableContext createTestContext(List initialOperations) { new DurableExecutionInput( "arn:aws:lambda:us-east-1:123456789012:function:test", "test-token", initialExecutionState), DurableConfig.builder().withDurableExecutionClient(client).build()); - var context = DurableContext.createRootContext( + var context = DurableContextImpl.createRootContext( executionManager, DurableConfig.builder().build(), null); executionManager.setCurrentThreadContext(new ThreadContext(INVOCATION_ID + "-execution", ThreadType.CONTEXT)); diff --git a/sdk/src/test/java/software/amazon/lambda/durable/logging/DurableLoggerTest.java b/sdk/src/test/java/software/amazon/lambda/durable/logging/DurableLoggerTest.java index 5847827c4..8b52ff6b1 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/logging/DurableLoggerTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/logging/DurableLoggerTest.java @@ -10,8 +10,8 @@ import org.slf4j.Logger; import org.slf4j.MDC; import software.amazon.lambda.durable.DurableConfig; -import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.TestContext; +import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.execution.ExecutionManager; class DurableLoggerTest { @@ -44,8 +44,8 @@ private DurableLogger createLogger(Mode mode, Suppression suppression) { return new DurableLogger(mockLogger, createDurableContext(REQUEST_ID, suppression)); } - private DurableContext createDurableContext(String requestId, Suppression suppression) { - return DurableContext.createRootContext( + private DurableContextImpl createDurableContext(String requestId, Suppression suppression) { + return DurableContextImpl.createRootContext( mockExecutionManager, DurableConfig.builder() .withLoggerConfig(new LoggerConfig(suppression == Suppression.ENABLED)) diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/BaseDurableOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/BaseDurableOperationTest.java index aedb2b046..8df56e796 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/BaseDurableOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/BaseDurableOperationTest.java @@ -27,8 +27,8 @@ import software.amazon.awssdk.services.lambda.model.OperationStatus; import software.amazon.awssdk.services.lambda.model.OperationType; import software.amazon.awssdk.services.lambda.model.OperationUpdate; -import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.TypeToken; +import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.exception.IllegalDurableOperationException; import software.amazon.lambda.durable.exception.NonDeterministicExecutionException; import software.amazon.lambda.durable.exception.SerDesException; @@ -54,12 +54,12 @@ class BaseDurableOperationTest { private final ExecutorService internalExecutor = Executors.newFixedThreadPool(2); private ExecutionManager executionManager; - private DurableContext durableContext; + private DurableContextImpl durableContext; @BeforeEach void setUp() { executionManager = mock(ExecutionManager.class); - durableContext = mock(DurableContext.class); + durableContext = mock(DurableContextImpl.class); when(durableContext.getExecutionManager()).thenReturn(executionManager); when(executionManager.getCurrentThreadContext()).thenReturn(new ThreadContext(CONTEXT_ID, ThreadType.CONTEXT)); when(executionManager.getOperationAndUpdateReplayState(OPERATION_ID)).thenReturn(OPERATION); diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/CallbackOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/CallbackOperationTest.java index 66bc312c0..a90b0c2ff 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/CallbackOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/CallbackOperationTest.java @@ -15,9 +15,9 @@ import software.amazon.awssdk.services.lambda.model.*; import software.amazon.lambda.durable.CallbackConfig; import software.amazon.lambda.durable.DurableConfig; -import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.TestUtils; import software.amazon.lambda.durable.TypeToken; +import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.exception.CallbackFailedException; import software.amazon.lambda.durable.exception.CallbackTimeoutException; import software.amazon.lambda.durable.exception.SerDesException; @@ -36,11 +36,11 @@ class CallbackOperationTest { private static final OperationIdentifier OPERATION_IDENTIFIER = OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.CALLBACK); - private DurableContext durableContext; + private DurableContextImpl durableContext; @BeforeEach void setUp() { - durableContext = mock(DurableContext.class); + durableContext = mock(DurableContextImpl.class); } /** Custom SerDes that tracks deserialization calls. */ diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/ChildContextOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/ChildContextOperationTest.java index ded393d20..0e6158ed1 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/ChildContextOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/ChildContextOperationTest.java @@ -20,6 +20,7 @@ import software.amazon.lambda.durable.DurableConfig; import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.TypeToken; +import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.exception.ChildContextFailedException; import software.amazon.lambda.durable.exception.NonDeterministicExecutionException; import software.amazon.lambda.durable.execution.ExecutionManager; @@ -34,12 +35,12 @@ class ChildContextOperationTest { private static final JacksonSerDes SERDES = new JacksonSerDes(); - private DurableContext durableContext; + private DurableContextImpl durableContext; private ExecutionManager executionManager; @BeforeEach void setUp() { - durableContext = mock(DurableContext.class); + durableContext = mock(DurableContextImpl.class); executionManager = mock(ExecutionManager.class); when(durableContext.getExecutionManager()).thenReturn(executionManager); when(executionManager.getCurrentThreadContext()).thenReturn(new ThreadContext("Root", ThreadType.CONTEXT)); diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java index 23bd8d0e9..8c4bb710f 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java @@ -20,6 +20,7 @@ import software.amazon.lambda.durable.DurableConfig; import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.TypeToken; +import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.execution.ExecutionManager; import software.amazon.lambda.durable.execution.OperationIdGenerator; import software.amazon.lambda.durable.execution.ThreadContext; @@ -36,14 +37,14 @@ class ConcurrencyOperationTest { private static final String OPERATION_ID = "op-1"; private static final TypeToken RESULT_TYPE = TypeToken.get(Void.class); - private DurableContext durableContext; + private DurableContextImpl durableContext; private ExecutionManager executionManager; private AtomicInteger operationIdCounter; private OperationIdGenerator mockIdGenerator; @BeforeEach void setUp() { - durableContext = mock(DurableContext.class); + durableContext = mock(DurableContextImpl.class); executionManager = mock(ExecutionManager.class); operationIdCounter = new AtomicInteger(0); @@ -186,7 +187,7 @@ static class TestConcurrencyOperation extends ConcurrencyOperation { OperationIdentifier operationIdentifier, TypeToken resultTypeToken, SerDes resultSerDes, - DurableContext durableContext, + DurableContextImpl durableContext, int maxConcurrency, int minSuccessful, int toleratedFailureCount) { @@ -207,7 +208,7 @@ protected ChildContextOperation createItem( Function function, TypeToken resultType, SerDes serDes, - DurableContext parentContext) { + DurableContextImpl parentContext) { return new ChildContextOperation( OperationIdentifier.of(operationId, name, OperationType.CONTEXT, OperationSubType.PARALLEL_BRANCH), function, diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/InvokeOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/InvokeOperationTest.java index 67277589a..f370805c9 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/InvokeOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/InvokeOperationTest.java @@ -14,9 +14,9 @@ import software.amazon.awssdk.services.lambda.model.Operation; import software.amazon.awssdk.services.lambda.model.OperationStatus; import software.amazon.awssdk.services.lambda.model.OperationType; -import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.InvokeConfig; import software.amazon.lambda.durable.TypeToken; +import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.exception.InvokeException; import software.amazon.lambda.durable.exception.InvokeFailedException; import software.amazon.lambda.durable.exception.InvokeStoppedException; @@ -34,12 +34,12 @@ class InvokeOperationTest { OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.CHAINED_INVOKE); private ExecutionManager executionManager; - private DurableContext durableContext; + private DurableContextImpl durableContext; @BeforeEach void setUp() { executionManager = mock(ExecutionManager.class); - durableContext = mock(DurableContext.class); + durableContext = mock(DurableContextImpl.class); when(durableContext.getExecutionManager()).thenReturn(executionManager); when(executionManager.getCurrentThreadContext()).thenReturn(new ThreadContext("root", ThreadType.CONTEXT)); } diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java index 9fa3a3e00..dc6a93bd6 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java @@ -19,8 +19,8 @@ import software.amazon.awssdk.services.lambda.model.OperationStatus; import software.amazon.awssdk.services.lambda.model.OperationType; import software.amazon.lambda.durable.DurableConfig; -import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.TypeToken; +import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.execution.ExecutionManager; import software.amazon.lambda.durable.execution.OperationIdGenerator; import software.amazon.lambda.durable.execution.ThreadContext; @@ -36,14 +36,14 @@ class ParallelOperationTest { private static final String OPERATION_ID = "parallel-op-1"; private static final TypeToken RESULT_TYPE = TypeToken.get(Void.class); - private DurableContext durableContext; + private DurableContextImpl durableContext; private ExecutionManager executionManager; private AtomicInteger operationIdCounter; private OperationIdGenerator mockIdGenerator; @BeforeEach void setUp() { - durableContext = mock(DurableContext.class); + durableContext = mock(DurableContextImpl.class); executionManager = mock(ExecutionManager.class); operationIdCounter = new AtomicInteger(0); diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/StepOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/StepOperationTest.java index 14ac0b9db..65cb64a7d 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/StepOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/StepOperationTest.java @@ -15,9 +15,9 @@ import software.amazon.awssdk.services.lambda.model.OperationType; import software.amazon.awssdk.services.lambda.model.StepDetails; import software.amazon.lambda.durable.DurableConfig; -import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.StepConfig; import software.amazon.lambda.durable.TypeToken; +import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.exception.StepFailedException; import software.amazon.lambda.durable.exception.StepInterruptedException; import software.amazon.lambda.durable.execution.ExecutionManager; @@ -34,12 +34,12 @@ class StepOperationTest { private static final OperationIdentifier OPERATION_IDENTIFIER = OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.STEP); private ExecutionManager executionManager; - private DurableContext durableContext; + private DurableContextImpl durableContext; @BeforeEach void setUp() { executionManager = mock(ExecutionManager.class); - durableContext = mock(DurableContext.class); + durableContext = mock(DurableContextImpl.class); when(durableContext.getExecutionManager()).thenReturn(executionManager); when(executionManager.getCurrentThreadContext()).thenReturn(new ThreadContext("handler", ThreadType.CONTEXT)); when(durableContext.getDurableConfig()) diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/WaitForConditionOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/WaitForConditionOperationTest.java index 6f966917b..fa1a66dfe 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/WaitForConditionOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/WaitForConditionOperationTest.java @@ -16,10 +16,10 @@ import software.amazon.awssdk.services.lambda.model.OperationType; import software.amazon.awssdk.services.lambda.model.StepDetails; import software.amazon.lambda.durable.DurableConfig; -import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.TypeToken; import software.amazon.lambda.durable.WaitForConditionConfig; import software.amazon.lambda.durable.WaitForConditionDecision; +import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.exception.NonDeterministicExecutionException; import software.amazon.lambda.durable.exception.WaitForConditionException; import software.amazon.lambda.durable.execution.ExecutionManager; @@ -34,12 +34,12 @@ class WaitForConditionOperationTest { private static final JacksonSerDes SERDES = new JacksonSerDes(); private ExecutionManager executionManager; - private DurableContext durableContext; + private DurableContextImpl durableContext; @BeforeEach void setUp() { executionManager = mock(ExecutionManager.class); - durableContext = mock(DurableContext.class); + durableContext = mock(DurableContextImpl.class); when(durableContext.getExecutionManager()).thenReturn(executionManager); when(executionManager.getCurrentThreadContext()).thenReturn(new ThreadContext("handler", ThreadType.CONTEXT)); when(durableContext.getDurableConfig()) diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/WaitOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/WaitOperationTest.java index d93656d64..d07501dbb 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/WaitOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/WaitOperationTest.java @@ -14,7 +14,7 @@ import software.amazon.awssdk.services.lambda.model.OperationStatus; import software.amazon.awssdk.services.lambda.model.OperationType; import software.amazon.awssdk.services.lambda.model.WaitDetails; -import software.amazon.lambda.durable.DurableContext; +import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.execution.ExecutionManager; import software.amazon.lambda.durable.execution.ThreadContext; import software.amazon.lambda.durable.execution.ThreadType; @@ -27,12 +27,12 @@ class WaitOperationTest { private static final OperationIdentifier OPERATION_IDENTIFIER = OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.WAIT); private ExecutionManager executionManager; - private DurableContext durableContext; + private DurableContextImpl durableContext; @BeforeEach void setUp() { executionManager = mock(ExecutionManager.class); - durableContext = mock(DurableContext.class); + durableContext = mock(DurableContextImpl.class); when(durableContext.getExecutionManager()).thenReturn(executionManager); } From b02a62e3ff278f1b9661ba1ce07fdd83c2e89b7b Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Tue, 17 Mar 2026 14:29:21 -0700 Subject: [PATCH 2/2] remove ExecutionContext --- .../lambda/durable/ExecutionContext.java | 38 ------------------- .../lambda/durable/ExecutionContextTest.java | 30 --------------- 2 files changed, 68 deletions(-) delete mode 100644 sdk/src/main/java/software/amazon/lambda/durable/ExecutionContext.java delete mode 100644 sdk/src/test/java/software/amazon/lambda/durable/ExecutionContextTest.java diff --git a/sdk/src/main/java/software/amazon/lambda/durable/ExecutionContext.java b/sdk/src/main/java/software/amazon/lambda/durable/ExecutionContext.java deleted file mode 100644 index 759d448c8..000000000 --- a/sdk/src/main/java/software/amazon/lambda/durable/ExecutionContext.java +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package software.amazon.lambda.durable; - -/** - * Provides metadata about the current durable execution. - * - *

This context contains information about the execution environment that remains constant throughout the execution - * lifecycle. Access it via {@link DurableContext#getExecutionContext()}. - */ -public class ExecutionContext { - private final String durableExecutionArn; - - ExecutionContext(String durableExecutionArn) { - this.durableExecutionArn = durableExecutionArn; - } - - /** - * Returns the ARN of the current durable execution. - * - *

The durable execution ARN uniquely identifies this execution instance and remains constant across all - * invocations and replays. Use this ARN to: - * - *

- * - *

Example ARN format: - * {@code arn:aws:lambda:us-east-1:123456789012:function:my-function:$LATEST/durable-execution/349beff4-a89d-4bc8-a56f-af7a8af67a5f/20dae574-53da-37a1-bfd5-b0e2e6ec715d} - * - * @return the durable execution ARN - */ - public String getDurableExecutionArn() { - return durableExecutionArn; - } -} diff --git a/sdk/src/test/java/software/amazon/lambda/durable/ExecutionContextTest.java b/sdk/src/test/java/software/amazon/lambda/durable/ExecutionContextTest.java deleted file mode 100644 index 0e7a72d26..000000000 --- a/sdk/src/test/java/software/amazon/lambda/durable/ExecutionContextTest.java +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package software.amazon.lambda.durable; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -import org.junit.jupiter.api.Test; - -class ExecutionContextTest { - - @Test - void constructorSetsArn() { - var arn = "arn:aws:lambda:us-east-1:123456789012:function:my-function:$LATEST/durable-execution/" - + "349beff4-a89d-4bc8-a56f-af7a8af67a5f/20dae574-53da-37a1-bfd5-b0e2e6ec715d"; - var context = new ExecutionContext(arn); - - assertEquals(arn, context.getDurableExecutionArn()); - } - - @Test - void getDurableExecutionArnReturnsCorrectValue() { - var arn = "arn:aws:lambda:eu-west-1:987654321098:function:test-fn:$LATEST/durable-execution/" - + "a1b2c3d4-e5f6-7890-abcd-ef1234567890/b2c3d4e5-f6a7-8901-bcde-f12345678901"; - var context = new ExecutionContext(arn); - - assertNotNull(context.getDurableExecutionArn()); - assertEquals(arn, context.getDurableExecutionArn()); - } -}