Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.examples;
package software.amazon.lambda.durable.examples.parallel;

import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.examples.parallel;

import java.util.ArrayList;
import java.util.List;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableFuture;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.ParallelConfig;
import software.amazon.lambda.durable.StepConfig;
import software.amazon.lambda.durable.retry.RetryStrategies;

/**
* Example demonstrating parallel execution with failure tolerance.
*
* <p>When {@code toleratedFailureCount} is set, the parallel operation completes successfully even if some branches
* fail — as long as the number of failures does not exceed the threshold. Failed branches produce {@code null} results
* that callers can filter out.
*
* <p>Use this pattern when partial success is acceptable, for example: sending notifications to multiple channels where
* some channels may be unavailable.
*/
public class ParallelFailureToleranceExample
extends DurableHandler<ParallelFailureToleranceExample.Input, ParallelFailureToleranceExample.Output> {

public record Input(List<String> services, int toleratedFailures) {}

public record Output(List<String> succeeded, List<String> failed) {}

@Override
public Output handleRequest(Input input, DurableContext context) {
var logger = context.getLogger();
logger.info("Starting parallel execution with toleratedFailureCount={}", input.toleratedFailures());

var config = ParallelConfig.builder()
.toleratedFailureCount(input.toleratedFailures())
.build();

var futures = new ArrayList<DurableFuture<String>>(input.services().size());

try (var parallel = context.parallel("call-services", config)) {
for (var service : input.services()) {
var future = parallel.branch("call-" + service, String.class, branchCtx -> {
return branchCtx.step(
"invoke-" + service,
String.class,
stepCtx -> {
if (service.startsWith("bad-")) {
throw new RuntimeException("Service unavailable: " + service);
}
return "ok:" + service;
},
StepConfig.builder()
.retryStrategy(RetryStrategies.Presets.NO_RETRY)
.build());
});
futures.add(future);
}
}

var succeeded = new ArrayList<String>();
var failed = new ArrayList<String>();

for (int i = 0; i < futures.size(); i++) {
try {
var result = futures.get(i).get();
succeeded.add(result);
} catch (Exception e) {
failed.add(input.services().get(i));
logger.info("Branch failed for service {}: {}", input.services().get(i), e.getMessage());
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a wait operation here to make sure replay works

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.info("Completed: {} succeeded, {} failed", succeeded.size(), failed.size());
return new Output(succeeded, failed);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.examples;
package software.amazon.lambda.durable.examples.parallel;

import static org.junit.jupiter.api.Assertions.*;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.examples.parallel;

import static org.junit.jupiter.api.Assertions.*;

import java.util.List;
import org.junit.jupiter.api.Test;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;

class ParallelFailureToleranceExampleTest {

@Test
void succeedsWhenFailuresAreWithinTolerance() {
var handler = new ParallelFailureToleranceExample();
var runner = LocalDurableTestRunner.create(ParallelFailureToleranceExample.Input.class, handler);

// 2 good services, 1 bad — toleratedFailureCount=1 so the parallel op still succeeds
var input = new ParallelFailureToleranceExample.Input(List.of("svc-a", "bad-svc-b", "svc-c"), 1);
var result = runner.runUntilComplete(input);

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());

var output = result.getResult(ParallelFailureToleranceExample.Output.class);
assertEquals(2, output.succeeded().size());
assertEquals(1, output.failed().size());
assertTrue(output.succeeded().contains("ok:svc-a"));
assertTrue(output.succeeded().contains("ok:svc-c"));
assertTrue(output.failed().contains("bad-svc-b"));
}

@Test
void succeedsWhenAllBranchesSucceed() {
var handler = new ParallelFailureToleranceExample();
var runner = LocalDurableTestRunner.create(ParallelFailureToleranceExample.Input.class, handler);

var input = new ParallelFailureToleranceExample.Input(List.of("svc-a", "svc-b", "svc-c"), 2);
var result = runner.runUntilComplete(input);

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());

var output = result.getResult(ParallelFailureToleranceExample.Output.class);
assertEquals(3, output.succeeded().size());
assertTrue(output.failed().isEmpty());
}

@Test
void failsWhenFailuresExceedTolerance() {
var handler = new ParallelFailureToleranceExample();
var runner = LocalDurableTestRunner.create(ParallelFailureToleranceExample.Input.class, handler);

// 2 bad services, toleratedFailureCount=1 — second failure exceeds tolerance
var input = new ParallelFailureToleranceExample.Input(List.of("svc-a", "bad-svc-b", "bad-svc-c"), 1);
var result = runner.runUntilComplete(input);

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());

var output = result.getResult(ParallelFailureToleranceExample.Output.class);
assertEquals(2, output.failed().size());
assertEquals(1, output.succeeded().size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package software.amazon.lambda.durable.operation;

import java.util.function.Function;
import software.amazon.awssdk.services.lambda.model.ErrorObject;
import software.amazon.awssdk.services.lambda.model.Operation;
import software.amazon.awssdk.services.lambda.model.OperationAction;
import software.amazon.awssdk.services.lambda.model.OperationType;
Expand Down Expand Up @@ -86,12 +85,7 @@ protected void handleSuccess() {

@Override
protected void handleFailure(ConcurrencyCompletionStatus concurrencyCompletionStatus) {
sendOperationUpdate(OperationUpdate.builder()
.action(OperationAction.FAIL)
.subType(getSubType().getValue())
.error(ErrorObject.builder()
.errorMessage("Parallel operation failed with " + concurrencyCompletionStatus + " status")
.build()));
handleSuccess();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,38 @@ void contextHierarchy_branchesUseParallelContextAsParent() throws Exception {
assertNotNull(childOp);
}

// ===== handleFailure still sends SUCCEED =====

@Test
void handleFailure_sendsSucceedCheckpointEvenWhenFailureToleranceExceeded() throws Exception {
// toleratedFailureCount=0, so the first failure triggers handleFailure
// ParallelOperation.handleFailure() delegates to handleSuccess(), so SUCCEED must be sent
when(executionManager.getOperationAndUpdateReplayState("child-1"))
.thenReturn(Operation.builder()
.id("child-1")
.name("branch-1")
.type(OperationType.CONTEXT)
.subType(OperationSubType.PARALLEL_BRANCH.getValue())
.status(OperationStatus.FAILED)
.build());

var op = createOperation(-1, -1, 0);
setOperationIdGenerator(op, mockIdGenerator);
op.addItem(
"branch-1",
ctx -> {
throw new RuntimeException("branch failed");
},
TypeToken.get(String.class),
SER_DES);

runJoin(op);

verify(executionManager).sendOperationUpdate(argThat(update -> update.action() == OperationAction.SUCCEED));
verify(executionManager, never())
.sendOperationUpdate(argThat(update -> update.action() == OperationAction.FAIL));
}

// ===== Helpers =====

private void runJoin(ParallelOperation<?> op) throws InterruptedException {
Expand Down
Loading