diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/ParallelExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/parallel/ParallelExample.java similarity index 97% rename from examples/src/main/java/software/amazon/lambda/durable/examples/ParallelExample.java rename to examples/src/main/java/software/amazon/lambda/durable/examples/parallel/ParallelExample.java index 0c9871c3..d9d79d3e 100644 --- a/examples/src/main/java/software/amazon/lambda/durable/examples/ParallelExample.java +++ b/examples/src/main/java/software/amazon/lambda/durable/examples/parallel/ParallelExample.java @@ -1,6 +1,6 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -package software.amazon.lambda.durable.examples; +package software.amazon.lambda.durable.examples.parallel; import java.util.ArrayList; import java.util.List; diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/parallel/ParallelFailureToleranceExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/parallel/ParallelFailureToleranceExample.java new file mode 100644 index 00000000..38f4b903 --- /dev/null +++ b/examples/src/main/java/software/amazon/lambda/durable/examples/parallel/ParallelFailureToleranceExample.java @@ -0,0 +1,78 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples.parallel; + +import java.util.ArrayList; +import java.util.List; +import software.amazon.lambda.durable.DurableContext; +import software.amazon.lambda.durable.DurableFuture; +import software.amazon.lambda.durable.DurableHandler; +import software.amazon.lambda.durable.ParallelConfig; +import software.amazon.lambda.durable.StepConfig; +import software.amazon.lambda.durable.retry.RetryStrategies; + +/** + * Example demonstrating parallel execution with failure tolerance. + * + *

When {@code toleratedFailureCount} is set, the parallel operation completes successfully even if some branches + * fail — as long as the number of failures does not exceed the threshold. Failed branches produce {@code null} results + * that callers can filter out. + * + *

Use this pattern when partial success is acceptable, for example: sending notifications to multiple channels where + * some channels may be unavailable. + */ +public class ParallelFailureToleranceExample + extends DurableHandler { + + public record Input(List services, int toleratedFailures) {} + + public record Output(List succeeded, List failed) {} + + @Override + public Output handleRequest(Input input, DurableContext context) { + var logger = context.getLogger(); + logger.info("Starting parallel execution with toleratedFailureCount={}", input.toleratedFailures()); + + var config = ParallelConfig.builder() + .toleratedFailureCount(input.toleratedFailures()) + .build(); + + var futures = new ArrayList>(input.services().size()); + + try (var parallel = context.parallel("call-services", config)) { + for (var service : input.services()) { + var future = parallel.branch("call-" + service, String.class, branchCtx -> { + return branchCtx.step( + "invoke-" + service, + String.class, + stepCtx -> { + if (service.startsWith("bad-")) { + throw new RuntimeException("Service unavailable: " + service); + } + return "ok:" + service; + }, + StepConfig.builder() + .retryStrategy(RetryStrategies.Presets.NO_RETRY) + .build()); + }); + futures.add(future); + } + } + + var succeeded = new ArrayList(); + var failed = new ArrayList(); + + for (int i = 0; i < futures.size(); i++) { + try { + var result = futures.get(i).get(); + succeeded.add(result); + } catch (Exception e) { + failed.add(input.services().get(i)); + logger.info("Branch failed for service {}: {}", input.services().get(i), e.getMessage()); + } + } + + logger.info("Completed: {} succeeded, {} failed", succeeded.size(), failed.size()); + return new Output(succeeded, failed); + } +} diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/ParallelExampleTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/parallel/ParallelExampleTest.java similarity index 97% rename from examples/src/test/java/software/amazon/lambda/durable/examples/ParallelExampleTest.java rename to examples/src/test/java/software/amazon/lambda/durable/examples/parallel/ParallelExampleTest.java index 999c23d4..68bba068 100644 --- a/examples/src/test/java/software/amazon/lambda/durable/examples/ParallelExampleTest.java +++ b/examples/src/test/java/software/amazon/lambda/durable/examples/parallel/ParallelExampleTest.java @@ -1,6 +1,6 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -package software.amazon.lambda.durable.examples; +package software.amazon.lambda.durable.examples.parallel; import static org.junit.jupiter.api.Assertions.*; diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/parallel/ParallelFailureToleranceExampleTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/parallel/ParallelFailureToleranceExampleTest.java new file mode 100644 index 00000000..2b57970e --- /dev/null +++ b/examples/src/test/java/software/amazon/lambda/durable/examples/parallel/ParallelFailureToleranceExampleTest.java @@ -0,0 +1,63 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples.parallel; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.List; +import org.junit.jupiter.api.Test; +import software.amazon.lambda.durable.model.ExecutionStatus; +import software.amazon.lambda.durable.testing.LocalDurableTestRunner; + +class ParallelFailureToleranceExampleTest { + + @Test + void succeedsWhenFailuresAreWithinTolerance() { + var handler = new ParallelFailureToleranceExample(); + var runner = LocalDurableTestRunner.create(ParallelFailureToleranceExample.Input.class, handler); + + // 2 good services, 1 bad — toleratedFailureCount=1 so the parallel op still succeeds + var input = new ParallelFailureToleranceExample.Input(List.of("svc-a", "bad-svc-b", "svc-c"), 1); + var result = runner.runUntilComplete(input); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + + var output = result.getResult(ParallelFailureToleranceExample.Output.class); + assertEquals(2, output.succeeded().size()); + assertEquals(1, output.failed().size()); + assertTrue(output.succeeded().contains("ok:svc-a")); + assertTrue(output.succeeded().contains("ok:svc-c")); + assertTrue(output.failed().contains("bad-svc-b")); + } + + @Test + void succeedsWhenAllBranchesSucceed() { + var handler = new ParallelFailureToleranceExample(); + var runner = LocalDurableTestRunner.create(ParallelFailureToleranceExample.Input.class, handler); + + var input = new ParallelFailureToleranceExample.Input(List.of("svc-a", "svc-b", "svc-c"), 2); + var result = runner.runUntilComplete(input); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + + var output = result.getResult(ParallelFailureToleranceExample.Output.class); + assertEquals(3, output.succeeded().size()); + assertTrue(output.failed().isEmpty()); + } + + @Test + void failsWhenFailuresExceedTolerance() { + var handler = new ParallelFailureToleranceExample(); + var runner = LocalDurableTestRunner.create(ParallelFailureToleranceExample.Input.class, handler); + + // 2 bad services, toleratedFailureCount=1 — second failure exceeds tolerance + var input = new ParallelFailureToleranceExample.Input(List.of("svc-a", "bad-svc-b", "bad-svc-c"), 1); + var result = runner.runUntilComplete(input); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + + var output = result.getResult(ParallelFailureToleranceExample.Output.class); + assertEquals(2, output.failed().size()); + assertEquals(1, output.succeeded().size()); + } +} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java index 91388b8c..55d4395e 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java @@ -3,7 +3,6 @@ package software.amazon.lambda.durable.operation; import java.util.function.Function; -import software.amazon.awssdk.services.lambda.model.ErrorObject; import software.amazon.awssdk.services.lambda.model.Operation; import software.amazon.awssdk.services.lambda.model.OperationAction; import software.amazon.awssdk.services.lambda.model.OperationType; @@ -86,12 +85,7 @@ protected void handleSuccess() { @Override protected void handleFailure(ConcurrencyCompletionStatus concurrencyCompletionStatus) { - sendOperationUpdate(OperationUpdate.builder() - .action(OperationAction.FAIL) - .subType(getSubType().getValue()) - .error(ErrorObject.builder() - .errorMessage("Parallel operation failed with " + concurrencyCompletionStatus + " status") - .build())); + handleSuccess(); } @Override diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java index 00345e60..a9983acc 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java @@ -197,6 +197,38 @@ void contextHierarchy_branchesUseParallelContextAsParent() throws Exception { assertNotNull(childOp); } + // ===== handleFailure still sends SUCCEED ===== + + @Test + void handleFailure_sendsSucceedCheckpointEvenWhenFailureToleranceExceeded() throws Exception { + // toleratedFailureCount=0, so the first failure triggers handleFailure + // ParallelOperation.handleFailure() delegates to handleSuccess(), so SUCCEED must be sent + when(executionManager.getOperationAndUpdateReplayState("child-1")) + .thenReturn(Operation.builder() + .id("child-1") + .name("branch-1") + .type(OperationType.CONTEXT) + .subType(OperationSubType.PARALLEL_BRANCH.getValue()) + .status(OperationStatus.FAILED) + .build()); + + var op = createOperation(-1, -1, 0); + setOperationIdGenerator(op, mockIdGenerator); + op.addItem( + "branch-1", + ctx -> { + throw new RuntimeException("branch failed"); + }, + TypeToken.get(String.class), + SER_DES); + + runJoin(op); + + verify(executionManager).sendOperationUpdate(argThat(update -> update.action() == OperationAction.SUCCEED)); + verify(executionManager, never()) + .sendOperationUpdate(argThat(update -> update.action() == OperationAction.FAIL)); + } + // ===== Helpers ===== private void runJoin(ParallelOperation op) throws InterruptedException {