Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.examples.parallel;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableFuture;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.ParallelConfig;

/**
* Example demonstrating parallel branches where some branches include wait operations.
*
* <p>This models a notification fan-out pattern where different channels have different delivery delays:
*
* <ul>
* <li>Email — sent immediately
* <li>SMS — waits for a rate-limit window before sending
* <li>Push notification — waits for a quiet-hours window before sending
* </ul>
*
* <p>All three branches run concurrently. Branches with waits suspend without consuming compute resources and resume
* automatically once the wait elapses. The parallel operation completes once all branches finish.
*/
public class ParallelWithWaitExample
extends DurableHandler<ParallelWithWaitExample.Input, ParallelWithWaitExample.Output> {

public record Input(String userId, String message) {}

public record Output(List<String> deliveries) {}

@Override
public Output handleRequest(Input input, DurableContext context) {
var logger = context.getLogger();
logger.info("Sending notifications to user {}", input.userId());

var config = ParallelConfig.builder().build();
var futures = new ArrayList<DurableFuture<String>>(3);

try (var parallel = context.parallel("notify", config)) {

// Branch 1: email — no wait, deliver immediately
futures.add(parallel.branch("email", String.class, ctx -> {
ctx.wait("email-rate-limit-delay", Duration.ofSeconds(10));
return ctx.step("send-email", String.class, stepCtx -> "email:" + input.message());
}));

// Branch 2: SMS — wait for rate-limit window, then send
futures.add(parallel.branch("sms", String.class, ctx -> {
ctx.wait("sms-rate-limit-delay", Duration.ofSeconds(10));
return ctx.step("send-sms", String.class, stepCtx -> "sms:" + input.message());
}));

// Branch 3: push notification — wait for quiet-hours window, then send
futures.add(parallel.branch("push", String.class, ctx -> {
ctx.wait("push-quiet-delay", Duration.ofSeconds(10));
return ctx.step("send-push", String.class, stepCtx -> "push:" + input.message());
}));
}

var deliveries = futures.stream().map(DurableFuture::get).toList();
logger.info("All {} notifications delivered", deliveries.size());
// Test replay
context.wait("wait for finalization", Duration.ofSeconds(5));
return new Output(deliveries);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.examples.parallel;

import static org.junit.jupiter.api.Assertions.*;

import java.util.List;
import org.junit.jupiter.api.Test;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;

class ParallelWithWaitExampleTest {
@Test
void completesAfterManuallyAdvancingWaits() {
var handler = new ParallelWithWaitExample();
var runner = LocalDurableTestRunner.create(ParallelWithWaitExample.Input.class, handler);

var input = new ParallelWithWaitExample.Input("user-456", "world");

// First run suspends on wait branches
var first = runner.run(input);
assertEquals(ExecutionStatus.PENDING, first.getStatus());

// Advance waits and re-run to completion
runner.advanceTime();
var result = runner.runUntilComplete(input);

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());

var output = result.getResult(ParallelWithWaitExample.Output.class);
assertEquals(List.of("email:world", "sms:world", "push:world"), output.deliveries());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,28 @@ protected BaseContextImpl(
String contextId,
String contextName,
ThreadType threadType) {
this(executionManager, durableConfig, lambdaContext, contextId, contextName, threadType, true);
}

/**
* Creates a new BaseContext instance.
*
* @param executionManager the execution manager for thread coordination and state management
* @param durableConfig the durable execution configuration
* @param lambdaContext the AWS Lambda runtime context
* @param contextId the context ID, null for root context, set for child contexts
* @param contextName the human-readable name for this context
* @param threadType the type of thread this context runs on
* @param setCurrentThreadContext whether to call setCurrentThreadContext on the execution manager
*/
protected BaseContextImpl(
ExecutionManager executionManager,
DurableConfig durableConfig,
Context lambdaContext,
String contextId,
String contextName,
ThreadType threadType,
boolean setCurrentThreadContext) {
this.executionManager = executionManager;
this.durableConfig = durableConfig;
this.lambdaContext = lambdaContext;
Expand All @@ -45,8 +67,10 @@ protected BaseContextImpl(
this.isReplaying = executionManager.hasOperationsForContext(contextId);
this.threadType = threadType;

// write the thread id and type to thread local
executionManager.setCurrentThreadContext(new ThreadContext(contextId, threadType));
if (setCurrentThreadContext) {
// write the thread id and type to thread local
executionManager.setCurrentThreadContext(new ThreadContext(contextId, threadType));
}
}

// =============== accessors ================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,24 @@ private DurableContextImpl(
Context lambdaContext,
String contextId,
String contextName) {
super(executionManager, durableConfig, lambdaContext, contextId, contextName, ThreadType.CONTEXT);
this(executionManager, durableConfig, lambdaContext, contextId, contextName, true);
}

private DurableContextImpl(
ExecutionManager executionManager,
DurableConfig durableConfig,
Context lambdaContext,
String contextId,
String contextName,
boolean setCurrentThreadContext) {
super(
executionManager,
durableConfig,
lambdaContext,
contextId,
contextName,
ThreadType.CONTEXT,
setCurrentThreadContext);
operationIdGenerator = new OperationIdGenerator(contextId);
}

Expand Down Expand Up @@ -98,6 +115,22 @@ public DurableContextImpl createChildContext(String childContextId, String child
getExecutionManager(), getDurableConfig(), getLambdaContext(), childContextId, childContextName);
}

/**
* Creates a child context without setting the current thread context.
*
* <p>Use this when the child context is being created on a thread that should not have its thread-local context
* overwritten (e.g. when constructing the context ahead of running it on a separate thread).
*
* @param childContextId the child context's ID (the CONTEXT operation's operation ID)
* @param childContextName the name of the child context
* @return a new DurableContext for the child context
*/
public DurableContextImpl createChildContextWithoutSettingThreadContext(
String childContextId, String childContextName) {
return new DurableContextImpl(
getExecutionManager(), getDurableConfig(), getLambdaContext(), childContextId, childContextName, false);
}

/**
* Creates a step context for executing step operations.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected ConcurrencyOperation(
this.toleratedFailureCount = toleratedFailureCount;
this.failureRateThreshold = failureRateThreshold;
this.operationIdGenerator = new OperationIdGenerator(getOperationId());
this.rootContext = durableContext.createChildContext(getOperationId(), getName());
this.rootContext = durableContext.createChildContextWithoutSettingThreadContext(getOperationId(), getName());
}

protected ConcurrencyOperation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package software.amazon.lambda.durable.operation;

import java.util.function.Function;
import software.amazon.awssdk.services.lambda.model.ContextOptions;
import software.amazon.awssdk.services.lambda.model.Operation;
import software.amazon.awssdk.services.lambda.model.OperationAction;
import software.amazon.awssdk.services.lambda.model.OperationType;
Expand Down Expand Up @@ -41,6 +42,8 @@
*/
public class ParallelOperation<T> extends ConcurrencyOperation<T> {

private boolean replaying = false;

public ParallelOperation(
OperationIdentifier operationIdentifier,
TypeToken<T> resultTypeToken,
Expand Down Expand Up @@ -78,9 +81,14 @@ protected <R> ChildContextOperation<R> createItem(

@Override
protected void handleSuccess() {
if (replaying) {
// Do not send checkpoint during replay
return;
}
sendOperationUpdate(OperationUpdate.builder()
.action(OperationAction.SUCCEED)
.subType(getSubType().getValue()));
.subType(getSubType().getValue())
.contextOptions(ContextOptions.builder().replayChildren(true).build()));
}

@Override
Expand All @@ -97,8 +105,9 @@ protected void start() {

@Override
protected void replay(Operation existing) {
// Always replay child branches for parallel
start();
// No-op: child branches handle their own replay via ChildContextOperation.replay().
// Set replaying=true so handleSuccess() skips re-checkpointing the already-completed parallel context.
replaying = true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.context;

import static org.junit.jupiter.api.Assertions.*;

import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState;
import software.amazon.awssdk.services.lambda.model.Operation;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.lambda.durable.DurableConfig;
import software.amazon.lambda.durable.TestUtils;
import software.amazon.lambda.durable.execution.ExecutionManager;
import software.amazon.lambda.durable.execution.ThreadContext;
import software.amazon.lambda.durable.execution.ThreadType;
import software.amazon.lambda.durable.model.DurableExecutionInput;

class BaseContextImplTest {

private static final String INVOCATION_ID = "20dae574-53da-37a1-bfd5-b0e2e6ec715d";
private static final String EXECUTION_NAME = "349beff4-a89d-4bc8-a56f-af7a8af67a5f";
private static final Operation EXECUTION_OP = Operation.builder()
.id(INVOCATION_ID)
.type(OperationType.EXECUTION)
.status(OperationStatus.STARTED)
.build();

@BeforeEach
void clearThreadContext() {
// currentThreadContext is a static ThreadLocal on ExecutionManager — clear it
// before each test to prevent bleed-through from other tests on the same thread.
createExecutionManager().setCurrentThreadContext(null);
}

private ExecutionManager createExecutionManager() {
var client = TestUtils.createMockClient();
var initialState = CheckpointUpdatedExecutionState.builder()
.operations(new ArrayList<>(List.of(EXECUTION_OP)))
.build();
return new ExecutionManager(
new DurableExecutionInput(
"arn:aws:lambda:us-east-1:123456789012:function:test:$LATEST/durable-execution/"
+ EXECUTION_NAME + "/" + INVOCATION_ID,
"test-token",
initialState),
DurableConfig.builder().withDurableExecutionClient(client).build());
}

@Test
void defaultConstructor_setsCurrentThreadContext() {
var executionManager = createExecutionManager();
// Precondition: no thread context set yet
assertNull(executionManager.getCurrentThreadContext());

// Creating a root context with the default constructor should set the thread context
DurableContextImpl.createRootContext(
executionManager, DurableConfig.builder().build(), null);

var threadContext = executionManager.getCurrentThreadContext();
assertNotNull(threadContext);
assertEquals(ThreadType.CONTEXT, threadContext.threadType());
assertNull(threadContext.threadId());
}

@Test
void constructorWithSetCurrentThreadContextTrue_setsCurrentThreadContext() {
var executionManager = createExecutionManager();

// createRootContext sets thread context to root (threadId=null)
var rootContext = DurableContextImpl.createRootContext(
executionManager, DurableConfig.builder().build(), null);
assertEquals(
ThreadType.CONTEXT, executionManager.getCurrentThreadContext().threadType());
assertNull(executionManager.getCurrentThreadContext().threadId());

// createChildContext (setCurrentThreadContext=true) should overwrite with child's context
rootContext.createChildContext("child-id", "child-name");

var threadContext = executionManager.getCurrentThreadContext();
assertNotNull(threadContext);
assertEquals(ThreadType.CONTEXT, threadContext.threadType());
assertEquals("child-id", threadContext.threadId());
}

@Test
void constructorWithSetCurrentThreadContextFalse_doesNotOverwriteThreadContext() {
var executionManager = createExecutionManager();

// Create root context first (it will set thread context to null/root)
var rootContext = DurableContextImpl.createRootContext(
executionManager, DurableConfig.builder().build(), null);

// Now set a sentinel — simulating a caller thread that already has context established
var sentinel = new ThreadContext("original-context", ThreadType.CONTEXT);
executionManager.setCurrentThreadContext(sentinel);

// createChildContextWithoutSettingThreadContext should NOT overwrite the sentinel
rootContext.createChildContextWithoutSettingThreadContext("child-id", "child-name");

// Thread context should still be the sentinel, not the child's context
var threadContext = executionManager.getCurrentThreadContext();
assertNotNull(threadContext);
assertEquals("original-context", threadContext.threadId());
}

@Test
void createChildContextWithoutSettingThreadContext_returnsValidChildContext() {
var executionManager = createExecutionManager();
executionManager.setCurrentThreadContext(new ThreadContext(null, ThreadType.CONTEXT));
var rootContext = DurableContextImpl.createRootContext(
executionManager, DurableConfig.builder().build(), null);

var childContext = rootContext.createChildContextWithoutSettingThreadContext("child-id", "child-name");

assertNotNull(childContext);
assertEquals("child-id", childContext.getContextId());
assertEquals("child-name", childContext.getContextName());
}
}
Loading
Loading