diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/parallel/ParallelWithWaitExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/parallel/ParallelWithWaitExample.java
new file mode 100644
index 000000000..a25464b48
--- /dev/null
+++ b/examples/src/main/java/software/amazon/lambda/durable/examples/parallel/ParallelWithWaitExample.java
@@ -0,0 +1,69 @@
+// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+// SPDX-License-Identifier: Apache-2.0
+package software.amazon.lambda.durable.examples.parallel;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import software.amazon.lambda.durable.DurableContext;
+import software.amazon.lambda.durable.DurableFuture;
+import software.amazon.lambda.durable.DurableHandler;
+import software.amazon.lambda.durable.ParallelConfig;
+
+/**
+ * Example demonstrating parallel branches where some branches include wait operations.
+ *
+ *
This models a notification fan-out pattern where different channels have different delivery delays:
+ *
+ *
+ * - Email — sent immediately
+ *
- SMS — waits for a rate-limit window before sending
+ *
- Push notification — waits for a quiet-hours window before sending
+ *
+ *
+ * All three branches run concurrently. Branches with waits suspend without consuming compute resources and resume
+ * automatically once the wait elapses. The parallel operation completes once all branches finish.
+ */
+public class ParallelWithWaitExample
+ extends DurableHandler {
+
+ public record Input(String userId, String message) {}
+
+ public record Output(List deliveries) {}
+
+ @Override
+ public Output handleRequest(Input input, DurableContext context) {
+ var logger = context.getLogger();
+ logger.info("Sending notifications to user {}", input.userId());
+
+ var config = ParallelConfig.builder().build();
+ var futures = new ArrayList>(3);
+
+ try (var parallel = context.parallel("notify", config)) {
+
+ // Branch 1: email — no wait, deliver immediately
+ futures.add(parallel.branch("email", String.class, ctx -> {
+ ctx.wait("email-rate-limit-delay", Duration.ofSeconds(10));
+ return ctx.step("send-email", String.class, stepCtx -> "email:" + input.message());
+ }));
+
+ // Branch 2: SMS — wait for rate-limit window, then send
+ futures.add(parallel.branch("sms", String.class, ctx -> {
+ ctx.wait("sms-rate-limit-delay", Duration.ofSeconds(10));
+ return ctx.step("send-sms", String.class, stepCtx -> "sms:" + input.message());
+ }));
+
+ // Branch 3: push notification — wait for quiet-hours window, then send
+ futures.add(parallel.branch("push", String.class, ctx -> {
+ ctx.wait("push-quiet-delay", Duration.ofSeconds(10));
+ return ctx.step("send-push", String.class, stepCtx -> "push:" + input.message());
+ }));
+ }
+
+ var deliveries = futures.stream().map(DurableFuture::get).toList();
+ logger.info("All {} notifications delivered", deliveries.size());
+ // Test replay
+ context.wait("wait for finalization", Duration.ofSeconds(5));
+ return new Output(deliveries);
+ }
+}
diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/parallel/ParallelWithWaitExampleTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/parallel/ParallelWithWaitExampleTest.java
new file mode 100644
index 000000000..4352ed23f
--- /dev/null
+++ b/examples/src/test/java/software/amazon/lambda/durable/examples/parallel/ParallelWithWaitExampleTest.java
@@ -0,0 +1,33 @@
+// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+// SPDX-License-Identifier: Apache-2.0
+package software.amazon.lambda.durable.examples.parallel;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.util.List;
+import org.junit.jupiter.api.Test;
+import software.amazon.lambda.durable.model.ExecutionStatus;
+import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
+
+class ParallelWithWaitExampleTest {
+ @Test
+ void completesAfterManuallyAdvancingWaits() {
+ var handler = new ParallelWithWaitExample();
+ var runner = LocalDurableTestRunner.create(ParallelWithWaitExample.Input.class, handler);
+
+ var input = new ParallelWithWaitExample.Input("user-456", "world");
+
+ // First run suspends on wait branches
+ var first = runner.run(input);
+ assertEquals(ExecutionStatus.PENDING, first.getStatus());
+
+ // Advance waits and re-run to completion
+ runner.advanceTime();
+ var result = runner.runUntilComplete(input);
+
+ assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
+
+ var output = result.getResult(ParallelWithWaitExample.Output.class);
+ assertEquals(List.of("email:world", "sms:world", "push:world"), output.deliveries());
+ }
+}
diff --git a/sdk/src/main/java/software/amazon/lambda/durable/context/BaseContextImpl.java b/sdk/src/main/java/software/amazon/lambda/durable/context/BaseContextImpl.java
index 89902f6a4..67fe56834 100644
--- a/sdk/src/main/java/software/amazon/lambda/durable/context/BaseContextImpl.java
+++ b/sdk/src/main/java/software/amazon/lambda/durable/context/BaseContextImpl.java
@@ -37,6 +37,28 @@ protected BaseContextImpl(
String contextId,
String contextName,
ThreadType threadType) {
+ this(executionManager, durableConfig, lambdaContext, contextId, contextName, threadType, true);
+ }
+
+ /**
+ * Creates a new BaseContext instance.
+ *
+ * @param executionManager the execution manager for thread coordination and state management
+ * @param durableConfig the durable execution configuration
+ * @param lambdaContext the AWS Lambda runtime context
+ * @param contextId the context ID, null for root context, set for child contexts
+ * @param contextName the human-readable name for this context
+ * @param threadType the type of thread this context runs on
+ * @param setCurrentThreadContext whether to call setCurrentThreadContext on the execution manager
+ */
+ protected BaseContextImpl(
+ ExecutionManager executionManager,
+ DurableConfig durableConfig,
+ Context lambdaContext,
+ String contextId,
+ String contextName,
+ ThreadType threadType,
+ boolean setCurrentThreadContext) {
this.executionManager = executionManager;
this.durableConfig = durableConfig;
this.lambdaContext = lambdaContext;
@@ -45,8 +67,10 @@ protected BaseContextImpl(
this.isReplaying = executionManager.hasOperationsForContext(contextId);
this.threadType = threadType;
- // write the thread id and type to thread local
- executionManager.setCurrentThreadContext(new ThreadContext(contextId, threadType));
+ if (setCurrentThreadContext) {
+ // write the thread id and type to thread local
+ executionManager.setCurrentThreadContext(new ThreadContext(contextId, threadType));
+ }
}
// =============== accessors ================
diff --git a/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java b/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java
index ff2bfbb63..8ffb94e84 100644
--- a/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java
+++ b/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java
@@ -67,7 +67,24 @@ private DurableContextImpl(
Context lambdaContext,
String contextId,
String contextName) {
- super(executionManager, durableConfig, lambdaContext, contextId, contextName, ThreadType.CONTEXT);
+ this(executionManager, durableConfig, lambdaContext, contextId, contextName, true);
+ }
+
+ private DurableContextImpl(
+ ExecutionManager executionManager,
+ DurableConfig durableConfig,
+ Context lambdaContext,
+ String contextId,
+ String contextName,
+ boolean setCurrentThreadContext) {
+ super(
+ executionManager,
+ durableConfig,
+ lambdaContext,
+ contextId,
+ contextName,
+ ThreadType.CONTEXT,
+ setCurrentThreadContext);
operationIdGenerator = new OperationIdGenerator(contextId);
}
@@ -98,6 +115,22 @@ public DurableContextImpl createChildContext(String childContextId, String child
getExecutionManager(), getDurableConfig(), getLambdaContext(), childContextId, childContextName);
}
+ /**
+ * Creates a child context without setting the current thread context.
+ *
+ * Use this when the child context is being created on a thread that should not have its thread-local context
+ * overwritten (e.g. when constructing the context ahead of running it on a separate thread).
+ *
+ * @param childContextId the child context's ID (the CONTEXT operation's operation ID)
+ * @param childContextName the name of the child context
+ * @return a new DurableContext for the child context
+ */
+ public DurableContextImpl createChildContextWithoutSettingThreadContext(
+ String childContextId, String childContextName) {
+ return new DurableContextImpl(
+ getExecutionManager(), getDurableConfig(), getLambdaContext(), childContextId, childContextName, false);
+ }
+
/**
* Creates a step context for executing step operations.
*
diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java
index 6071f6d14..b01274500 100644
--- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java
+++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java
@@ -74,7 +74,7 @@ protected ConcurrencyOperation(
this.toleratedFailureCount = toleratedFailureCount;
this.failureRateThreshold = failureRateThreshold;
this.operationIdGenerator = new OperationIdGenerator(getOperationId());
- this.rootContext = durableContext.createChildContext(getOperationId(), getName());
+ this.rootContext = durableContext.createChildContextWithoutSettingThreadContext(getOperationId(), getName());
}
protected ConcurrencyOperation(
diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java
index 55d4395e5..439c3183d 100644
--- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java
+++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java
@@ -3,6 +3,7 @@
package software.amazon.lambda.durable.operation;
import java.util.function.Function;
+import software.amazon.awssdk.services.lambda.model.ContextOptions;
import software.amazon.awssdk.services.lambda.model.Operation;
import software.amazon.awssdk.services.lambda.model.OperationAction;
import software.amazon.awssdk.services.lambda.model.OperationType;
@@ -41,6 +42,8 @@
*/
public class ParallelOperation extends ConcurrencyOperation {
+ private boolean replaying = false;
+
public ParallelOperation(
OperationIdentifier operationIdentifier,
TypeToken resultTypeToken,
@@ -78,9 +81,14 @@ protected ChildContextOperation createItem(
@Override
protected void handleSuccess() {
+ if (replaying) {
+ // Do not send checkpoint during replay
+ return;
+ }
sendOperationUpdate(OperationUpdate.builder()
.action(OperationAction.SUCCEED)
- .subType(getSubType().getValue()));
+ .subType(getSubType().getValue())
+ .contextOptions(ContextOptions.builder().replayChildren(true).build()));
}
@Override
@@ -97,8 +105,9 @@ protected void start() {
@Override
protected void replay(Operation existing) {
- // Always replay child branches for parallel
- start();
+ // No-op: child branches handle their own replay via ChildContextOperation.replay().
+ // Set replaying=true so handleSuccess() skips re-checkpointing the already-completed parallel context.
+ replaying = true;
}
@Override
diff --git a/sdk/src/test/java/software/amazon/lambda/durable/context/BaseContextImplTest.java b/sdk/src/test/java/software/amazon/lambda/durable/context/BaseContextImplTest.java
new file mode 100644
index 000000000..c0d321ed5
--- /dev/null
+++ b/sdk/src/test/java/software/amazon/lambda/durable/context/BaseContextImplTest.java
@@ -0,0 +1,123 @@
+// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+// SPDX-License-Identifier: Apache-2.0
+package software.amazon.lambda.durable.context;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState;
+import software.amazon.awssdk.services.lambda.model.Operation;
+import software.amazon.awssdk.services.lambda.model.OperationStatus;
+import software.amazon.awssdk.services.lambda.model.OperationType;
+import software.amazon.lambda.durable.DurableConfig;
+import software.amazon.lambda.durable.TestUtils;
+import software.amazon.lambda.durable.execution.ExecutionManager;
+import software.amazon.lambda.durable.execution.ThreadContext;
+import software.amazon.lambda.durable.execution.ThreadType;
+import software.amazon.lambda.durable.model.DurableExecutionInput;
+
+class BaseContextImplTest {
+
+ private static final String INVOCATION_ID = "20dae574-53da-37a1-bfd5-b0e2e6ec715d";
+ private static final String EXECUTION_NAME = "349beff4-a89d-4bc8-a56f-af7a8af67a5f";
+ private static final Operation EXECUTION_OP = Operation.builder()
+ .id(INVOCATION_ID)
+ .type(OperationType.EXECUTION)
+ .status(OperationStatus.STARTED)
+ .build();
+
+ @BeforeEach
+ void clearThreadContext() {
+ // currentThreadContext is a static ThreadLocal on ExecutionManager — clear it
+ // before each test to prevent bleed-through from other tests on the same thread.
+ createExecutionManager().setCurrentThreadContext(null);
+ }
+
+ private ExecutionManager createExecutionManager() {
+ var client = TestUtils.createMockClient();
+ var initialState = CheckpointUpdatedExecutionState.builder()
+ .operations(new ArrayList<>(List.of(EXECUTION_OP)))
+ .build();
+ return new ExecutionManager(
+ new DurableExecutionInput(
+ "arn:aws:lambda:us-east-1:123456789012:function:test:$LATEST/durable-execution/"
+ + EXECUTION_NAME + "/" + INVOCATION_ID,
+ "test-token",
+ initialState),
+ DurableConfig.builder().withDurableExecutionClient(client).build());
+ }
+
+ @Test
+ void defaultConstructor_setsCurrentThreadContext() {
+ var executionManager = createExecutionManager();
+ // Precondition: no thread context set yet
+ assertNull(executionManager.getCurrentThreadContext());
+
+ // Creating a root context with the default constructor should set the thread context
+ DurableContextImpl.createRootContext(
+ executionManager, DurableConfig.builder().build(), null);
+
+ var threadContext = executionManager.getCurrentThreadContext();
+ assertNotNull(threadContext);
+ assertEquals(ThreadType.CONTEXT, threadContext.threadType());
+ assertNull(threadContext.threadId());
+ }
+
+ @Test
+ void constructorWithSetCurrentThreadContextTrue_setsCurrentThreadContext() {
+ var executionManager = createExecutionManager();
+
+ // createRootContext sets thread context to root (threadId=null)
+ var rootContext = DurableContextImpl.createRootContext(
+ executionManager, DurableConfig.builder().build(), null);
+ assertEquals(
+ ThreadType.CONTEXT, executionManager.getCurrentThreadContext().threadType());
+ assertNull(executionManager.getCurrentThreadContext().threadId());
+
+ // createChildContext (setCurrentThreadContext=true) should overwrite with child's context
+ rootContext.createChildContext("child-id", "child-name");
+
+ var threadContext = executionManager.getCurrentThreadContext();
+ assertNotNull(threadContext);
+ assertEquals(ThreadType.CONTEXT, threadContext.threadType());
+ assertEquals("child-id", threadContext.threadId());
+ }
+
+ @Test
+ void constructorWithSetCurrentThreadContextFalse_doesNotOverwriteThreadContext() {
+ var executionManager = createExecutionManager();
+
+ // Create root context first (it will set thread context to null/root)
+ var rootContext = DurableContextImpl.createRootContext(
+ executionManager, DurableConfig.builder().build(), null);
+
+ // Now set a sentinel — simulating a caller thread that already has context established
+ var sentinel = new ThreadContext("original-context", ThreadType.CONTEXT);
+ executionManager.setCurrentThreadContext(sentinel);
+
+ // createChildContextWithoutSettingThreadContext should NOT overwrite the sentinel
+ rootContext.createChildContextWithoutSettingThreadContext("child-id", "child-name");
+
+ // Thread context should still be the sentinel, not the child's context
+ var threadContext = executionManager.getCurrentThreadContext();
+ assertNotNull(threadContext);
+ assertEquals("original-context", threadContext.threadId());
+ }
+
+ @Test
+ void createChildContextWithoutSettingThreadContext_returnsValidChildContext() {
+ var executionManager = createExecutionManager();
+ executionManager.setCurrentThreadContext(new ThreadContext(null, ThreadType.CONTEXT));
+ var rootContext = DurableContextImpl.createRootContext(
+ executionManager, DurableConfig.builder().build(), null);
+
+ var childContext = rootContext.createChildContextWithoutSettingThreadContext("child-id", "child-name");
+
+ assertNotNull(childContext);
+ assertEquals("child-id", childContext.getContextId());
+ assertEquals("child-name", childContext.getContextName());
+ }
+}
diff --git a/sdk/src/test/java/software/amazon/lambda/durable/context/DurableContextImplTest.java b/sdk/src/test/java/software/amazon/lambda/durable/context/DurableContextImplTest.java
new file mode 100644
index 000000000..e3a5f732a
--- /dev/null
+++ b/sdk/src/test/java/software/amazon/lambda/durable/context/DurableContextImplTest.java
@@ -0,0 +1,93 @@
+// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+// SPDX-License-Identifier: Apache-2.0
+package software.amazon.lambda.durable.context;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState;
+import software.amazon.awssdk.services.lambda.model.Operation;
+import software.amazon.awssdk.services.lambda.model.OperationStatus;
+import software.amazon.awssdk.services.lambda.model.OperationType;
+import software.amazon.lambda.durable.DurableConfig;
+import software.amazon.lambda.durable.TestUtils;
+import software.amazon.lambda.durable.execution.ExecutionManager;
+import software.amazon.lambda.durable.execution.ThreadContext;
+import software.amazon.lambda.durable.execution.ThreadType;
+import software.amazon.lambda.durable.model.DurableExecutionInput;
+
+class DurableContextImplTest {
+
+ private static final String INVOCATION_ID = "20dae574-53da-37a1-bfd5-b0e2e6ec715d";
+ private static final String EXECUTION_NAME = "349beff4-a89d-4bc8-a56f-af7a8af67a5f";
+
+ private ExecutionManager executionManager;
+ private DurableContextImpl rootContext;
+
+ @BeforeEach
+ void setUp() {
+ var executionOp = Operation.builder()
+ .id(INVOCATION_ID)
+ .type(OperationType.EXECUTION)
+ .status(OperationStatus.STARTED)
+ .build();
+ var client = TestUtils.createMockClient();
+ var initialState = CheckpointUpdatedExecutionState.builder()
+ .operations(new ArrayList<>(List.of(executionOp)))
+ .build();
+ executionManager = new ExecutionManager(
+ new DurableExecutionInput(
+ "arn:aws:lambda:us-east-1:123456789012:function:test:$LATEST/durable-execution/"
+ + EXECUTION_NAME + "/" + INVOCATION_ID,
+ "test-token",
+ initialState),
+ DurableConfig.builder().withDurableExecutionClient(client).build());
+ // Simulate the root thread context as the executor would set it
+ executionManager.setCurrentThreadContext(new ThreadContext(null, ThreadType.CONTEXT));
+ rootContext = DurableContextImpl.createRootContext(
+ executionManager, DurableConfig.builder().build(), null);
+ }
+
+ @Test
+ void createChildContext_setsThreadContextToChild() {
+ rootContext.createChildContext("child-1", "my-child");
+
+ var threadContext = executionManager.getCurrentThreadContext();
+ assertNotNull(threadContext);
+ assertEquals("child-1", threadContext.threadId());
+ assertEquals(ThreadType.CONTEXT, threadContext.threadType());
+ }
+
+ @Test
+ void createChildContextWithoutSettingThreadContext_preservesCallerThreadContext() {
+ var callerContext = new ThreadContext("caller-thread", ThreadType.CONTEXT);
+ executionManager.setCurrentThreadContext(callerContext);
+
+ rootContext.createChildContextWithoutSettingThreadContext("child-1", "my-child");
+
+ // Thread context must remain unchanged
+ var threadContext = executionManager.getCurrentThreadContext();
+ assertEquals("caller-thread", threadContext.threadId());
+ }
+
+ @Test
+ void createChildContextWithoutSettingThreadContext_returnsCorrectChildMetadata() {
+ var child = rootContext.createChildContextWithoutSettingThreadContext("child-42", "child-name");
+
+ assertEquals("child-42", child.getContextId());
+ assertEquals("child-name", child.getContextName());
+ }
+
+ @Test
+ void createChildContextWithoutSettingThreadContext_whenNoThreadContextSet_leavesItNull() {
+ // Clear any existing thread context
+ executionManager.setCurrentThreadContext(null);
+
+ rootContext.createChildContextWithoutSettingThreadContext("child-1", "my-child");
+
+ assertNull(executionManager.getCurrentThreadContext());
+ }
+}
diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java
index a55d1c139..bf6179c26 100644
--- a/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java
+++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java
@@ -63,6 +63,8 @@ void setUp() {
.withExecutorService(Executors.newCachedThreadPool())
.build());
when(durableContext.createChildContext(anyString(), anyString())).thenReturn(childContext);
+ when(durableContext.createChildContextWithoutSettingThreadContext(anyString(), anyString()))
+ .thenReturn(childContext);
when(executionManager.getCurrentThreadContext()).thenReturn(new ThreadContext("Root", ThreadType.CONTEXT));
mockIdGenerator = mock(OperationIdGenerator.class);
when(mockIdGenerator.nextOperationId()).thenAnswer(inv -> "child-" + operationIdCounter.incrementAndGet());
diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java
index a9983accd..e537a54e1 100644
--- a/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java
+++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java
@@ -60,6 +60,8 @@ void setUp() {
.withExecutorService(Executors.newCachedThreadPool())
.build());
when(durableContext.createChildContext(anyString(), anyString())).thenReturn(childContext);
+ when(durableContext.createChildContextWithoutSettingThreadContext(anyString(), anyString()))
+ .thenReturn(childContext);
when(executionManager.getCurrentThreadContext()).thenReturn(new ThreadContext("Root", ThreadType.CONTEXT));
// Default: no existing operations (fresh execution)
mockIdGenerator = mock(OperationIdGenerator.class);
@@ -197,6 +199,100 @@ void contextHierarchy_branchesUseParallelContextAsParent() throws Exception {
assertNotNull(childOp);
}
+ // ===== Replay =====
+
+ @Test
+ void replay_doesNotSendStartCheckpoint() throws Exception {
+ // Simulate the parallel operation already existing in the service (STARTED status)
+ when(executionManager.getOperationAndUpdateReplayState(OPERATION_ID))
+ .thenReturn(Operation.builder()
+ .id(OPERATION_ID)
+ .name("test-parallel")
+ .type(OperationType.CONTEXT)
+ .subType(OperationSubType.PARALLEL.getValue())
+ .status(OperationStatus.STARTED)
+ .build());
+ // Both branches already succeeded
+ when(executionManager.getOperationAndUpdateReplayState("child-1"))
+ .thenReturn(Operation.builder()
+ .id("child-1")
+ .name("branch-1")
+ .type(OperationType.CONTEXT)
+ .subType(OperationSubType.PARALLEL_BRANCH.getValue())
+ .status(OperationStatus.SUCCEEDED)
+ .contextDetails(
+ ContextDetails.builder().result("\"r1\"").build())
+ .build());
+ when(executionManager.getOperationAndUpdateReplayState("child-2"))
+ .thenReturn(Operation.builder()
+ .id("child-2")
+ .name("branch-2")
+ .type(OperationType.CONTEXT)
+ .subType(OperationSubType.PARALLEL_BRANCH.getValue())
+ .status(OperationStatus.SUCCEEDED)
+ .contextDetails(
+ ContextDetails.builder().result("\"r2\"").build())
+ .build());
+
+ var op = createOperation(-1, -1, 0);
+ setOperationIdGenerator(op, mockIdGenerator);
+ op.execute();
+ op.addItem("branch-1", ctx -> "r1", TypeToken.get(String.class), SER_DES);
+ op.addItem("branch-2", ctx -> "r2", TypeToken.get(String.class), SER_DES);
+
+ runJoin(op);
+
+ verify(executionManager, never())
+ .sendOperationUpdate(argThat(update -> update.action() == OperationAction.START));
+ verify(executionManager, never())
+ .sendOperationUpdate(argThat(update -> update.action() == OperationAction.SUCCEED));
+ }
+
+ @Test
+ void replay_doesNotSendSucceedCheckpointWhenParallelAlreadySucceeded() throws Exception {
+ when(executionManager.getOperationAndUpdateReplayState(OPERATION_ID))
+ .thenReturn(Operation.builder()
+ .id(OPERATION_ID)
+ .name("test-parallel")
+ .type(OperationType.CONTEXT)
+ .subType(OperationSubType.PARALLEL.getValue())
+ .status(OperationStatus.SUCCEEDED)
+ .build());
+ when(executionManager.getOperationAndUpdateReplayState("child-1"))
+ .thenReturn(Operation.builder()
+ .id("child-1")
+ .name("branch-1")
+ .type(OperationType.CONTEXT)
+ .subType(OperationSubType.PARALLEL_BRANCH.getValue())
+ .status(OperationStatus.SUCCEEDED)
+ .contextDetails(
+ ContextDetails.builder().result("\"r1\"").build())
+ .build());
+ when(executionManager.getOperationAndUpdateReplayState("child-2"))
+ .thenReturn(Operation.builder()
+ .id("child-2")
+ .name("branch-2")
+ .type(OperationType.CONTEXT)
+ .subType(OperationSubType.PARALLEL_BRANCH.getValue())
+ .status(OperationStatus.SUCCEEDED)
+ .contextDetails(
+ ContextDetails.builder().result("\"r2\"").build())
+ .build());
+
+ var op = createOperation(-1, -1, 0);
+ setOperationIdGenerator(op, mockIdGenerator);
+ op.execute();
+ op.addItem("branch-1", ctx -> "r1", TypeToken.get(String.class), SER_DES);
+ op.addItem("branch-2", ctx -> "r2", TypeToken.get(String.class), SER_DES);
+
+ runJoin(op);
+
+ verify(executionManager, never())
+ .sendOperationUpdate(argThat(update -> update.action() == OperationAction.START));
+ verify(executionManager, never())
+ .sendOperationUpdate(argThat(update -> update.action() == OperationAction.SUCCEED));
+ }
+
// ===== handleFailure still sends SUCCEED =====
@Test