diff --git a/AGENTS.md b/AGENTS.md index e772825d..a9d0fdf8 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -201,12 +201,14 @@ void testAgainstRealLambda() { | Class | Responsibility | |-------|----------------| | `DurableHandler` | Lambda entry point, extend this | -| `DurableContext` | User API: `step()`, `wait()` | +| `DurableContext` | User API: `step()`, `wait()`, `map()` | | `DurableExecutor` | Orchestrates execution lifecycle | | `ExecutionManager` | Thread coordination, state management | | `CheckpointBatcher` | Batches checkpoint API calls (750KB limit) | | `StepOperation` | Executes steps with retry logic | | `WaitOperation` | Handles wait checkpointing | +| `MapOperation` | Applies a function across items concurrently via child contexts | +| `BaseConcurrentOperation` | Shared base for map/parallel: concurrency limiting, completion evaluation | ## Common Tasks diff --git a/README.md b/README.md index d5fb450a..a6ddd7c6 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ Build resilient, long-running AWS Lambda functions that automatically checkpoint - **Configurable Retries** – Built-in retry strategies with exponential backoff and jitter - **Replay Safety** – Functions deterministically resume from checkpoints after interruptions - **Type Safety** – Full generic type support for step results +- **Data-Driven Concurrency** – Apply a function across a collection with `map()`, with per-item error isolation and configurable completion criteria ## How It Works @@ -30,6 +31,7 @@ Your durable function extends `DurableHandler` and implements `handleReque - `ctx.waitForCallback()` – Simplify callback handling by combining callback creation and submission in one operation - `ctx.invoke()` – Invoke another Lambda function and wait for the result - `ctx.runInChildContext()` – Run an isolated child context with its own checkpoint log +- `ctx.map()` – Apply a function to each item in a collection concurrently ## Quick Start @@ -91,6 +93,7 @@ See [Deploy Lambda durable functions with Infrastructure as Code](https://docs.a - [Callbacks](docs/core/callbacks.md) - Wait for external systems to respond - [Invoke](docs/core/invoke.md) - Call other durable functions - [Child Contexts](docs/core/child-contexts.md) - Organize complex workflows into isolated units +- [Map](docs/core/map.md) - Apply a function across a collection concurrently **Examples** diff --git a/docs/core/map.md b/docs/core/map.md new file mode 100644 index 00000000..3c564f54 --- /dev/null +++ b/docs/core/map.md @@ -0,0 +1,153 @@ +## map() – Data-Driven Concurrent Execution + +`map()` applies a function to each item in a collection concurrently, with each item running in its own child context. Results are collected into a `BatchResult` that maintains input order. + +```java +// Basic map: process items concurrently +var items = List.of("order-1", "order-2", "order-3"); +var result = ctx.map("process-orders", items, OrderResult.class, (childCtx, orderId, index) -> { + return childCtx.step("fetch-" + index, OrderResult.class, + stepCtx -> orderService.process(orderId)); +}); + +// Results maintain input order +OrderResult first = result.getResult(0); // result for "order-1" +OrderResult second = result.getResult(1); // result for "order-2" +assertTrue(result.allSucceeded()); +``` + +Each item's function receives its own `DurableContext`, so you can use any durable operation (`step()`, `wait()`, `invoke()`, etc.) inside the map function. + +### mapAsync() – Non-Blocking Map + +`mapAsync()` starts the map operation without blocking, returning a `DurableFuture>`: + +```java +var future = ctx.mapAsync("process-orders", items, OrderResult.class, (childCtx, orderId, index) -> { + return childCtx.step("process-" + index, OrderResult.class, stepCtx -> process(orderId)); +}); + +// Do other work while map runs... +var otherResult = ctx.step("other-work", String.class, stepCtx -> doOtherWork()); + +// Block when you need the results +BatchResult result = future.get(); +``` + +### BatchResult + +`BatchResult` holds ordered results and errors from the map operation: + +| Method | Description | +|--------|-------------| +| `getResult(i)` | Result at index `i`, or `null` if that item failed | +| `getError(i)` | Error at index `i`, or `null` if that item succeeded | +| `allSucceeded()` | `true` if every item succeeded | +| `size()` | Number of items in the batch | +| `results()` | All results as an unmodifiable list | +| `succeeded()` | Only the non-null (successful) results | +| `failed()` | Only the non-null errors | +| `completionReason()` | Why the operation completed (`ALL_COMPLETED`, `MIN_SUCCESSFUL_REACHED`, `FAILURE_TOLERANCE_EXCEEDED`) | + +### Error Isolation + +One item's failure does not prevent other items from completing. Failed items are captured in the `BatchResult` at their corresponding index: + +```java +var result = ctx.map("risky-work", items, String.class, (childCtx, item, index) -> { + if (item.equals("bad")) throw new RuntimeException("failed"); + return item.toUpperCase(); +}); + +assertFalse(result.allSucceeded()); +assertNotNull(result.getError(1)); // the failed item +assertEquals("A", result.getResult(0)); // other items still succeed +``` + +### MapConfig + +Configure concurrency limits and completion criteria with `MapConfig`: + +```java +var config = MapConfig.builder() + .maxConcurrency(5) // at most 5 items run at once + .completionConfig(CompletionConfig.allCompleted()) // default: run all items + .build(); + +var result = ctx.map("process-orders", items, OrderResult.class, + (childCtx, orderId, index) -> process(childCtx, orderId), config); +``` + +#### Concurrency Limiting + +`maxConcurrency` controls how many items execute concurrently. When set, items beyond the limit are queued and started as earlier items complete. Default is `null` (unlimited). + +```java +// Sequential execution: one item at a time +var sequential = MapConfig.builder().maxConcurrency(1).build(); + +// Limited concurrency +var limited = MapConfig.builder().maxConcurrency(3).build(); +``` + +#### CompletionConfig + +`CompletionConfig` controls when the map operation stops starting new items: + +| Factory Method | Behavior | +|----------------|----------| +| `allCompleted()` (default) | All items run regardless of failures | +| `allSuccessful()` | Stop if any item fails (zero failures tolerated) | +| `firstSuccessful()` | Stop after the first item succeeds | +| `minSuccessful(n)` | Stop after `n` items succeed | +| `toleratedFailureCount(n)` | Stop after more than `n` failures | +| `toleratedFailurePercentage(p)` | Stop when failure rate exceeds `p` (0.0–1.0) | + +```java +// Stop after 2 successes +var config = MapConfig.builder() + .maxConcurrency(1) + .completionConfig(CompletionConfig.minSuccessful(2)) + .build(); + +var result = ctx.map("find-two", items, String.class, fn, config); +assertEquals(CompletionReason.MIN_SUCCESSFUL_REACHED, result.completionReason()); +``` + +When early termination triggers, items that were never started have `null` for both result and error in the `BatchResult`. + +### Checkpoint-and-Replay + +Map operations are fully durable. On replay after interruption: + +- Completed items return cached results without re-execution +- Incomplete items resume from their last checkpoint +- Items that never started execute fresh + +Small results (< 256KB) are checkpointed directly. Large results are reconstructed from individual child context checkpoints on replay. + +### Input Collection Requirements + +The input collection must have deterministic iteration order. `List`, `LinkedList`, and `TreeSet` are accepted. `HashSet` and unordered map views are rejected with `IllegalArgumentException`. + +```java +// OK +ctx.map("work", List.of("a", "b"), String.class, fn); +ctx.map("work", new ArrayList<>(items), String.class, fn); + +// Throws IllegalArgumentException +ctx.map("work", new HashSet<>(items), String.class, fn); +``` + +### MapFunction Interface + +The function passed to `map()` is a `MapFunction`: + +```java +@FunctionalInterface +public interface MapFunction { + O apply(DurableContext context, I item, int index) throws Exception; +} +``` + +The `index` parameter is the zero-based position of the item in the input collection, useful for naming operations or correlating results. diff --git a/docs/design.md b/docs/design.md index 6c00be3f..606c3e44 100644 --- a/docs/design.md +++ b/docs/design.md @@ -59,6 +59,17 @@ DurableFuture invokeAsync(String name, String functionName, U payload, Class< DurableFuture invokeAsync(String name, String functionName, U payload, TypeToken resultType) DurableFuture invokeAsync(String name, String functionName, U payload, TypeToken resultType, InvokeConfig config) +// Map +BatchResult map(String name, Collection items, Class resultType, MapFunction function) +BatchResult map(String name, Collection items, Class resultType, MapFunction function, MapConfig config) +BatchResult map(String name, Collection items, TypeToken resultType, MapFunction function) +BatchResult map(String name, Collection items, TypeToken resultType, MapFunction function, MapConfig config) + +DurableFuture> mapAsync(String name, Collection items, Class resultType, MapFunction function) +DurableFuture> mapAsync(String name, Collection items, Class resultType, MapFunction function, MapConfig config) +DurableFuture> mapAsync(String name, Collection items, TypeToken resultType, MapFunction function) +DurableFuture> mapAsync(String name, Collection items, TypeToken resultType, MapFunction function, MapConfig config) + // Lambda context access Context getLambdaContext() ``` diff --git a/examples/README.md b/examples/README.md index 3ebeac2d..48ba579d 100644 --- a/examples/README.md +++ b/examples/README.md @@ -89,6 +89,7 @@ mvn test -Dtest=CloudBasedIntegrationTest \ | [RetryInProcessExample](src/main/java/com/amazonaws/lambda/durable/examples/RetryInProcessExample.java) | In-process retry with concurrent operations | | [WaitAtLeastInProcessExample](src/main/java/com/amazonaws/lambda/durable/examples/WaitAtLeastInProcessExample.java) | Wait completes before async step (no suspension) | | [ManyAsyncStepsExample](src/main/java/com/amazonaws/lambda/durable/examples/ManyAsyncStepsExample.java) | Performance test with 500 concurrent async steps | +| [SimpleMapExample](src/main/java/com/amazonaws/lambda/durable/examples/SimpleMapExample.java) | Concurrent map over a collection with durable steps | ## Cleanup diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/MapConfigExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/MapConfigExample.java new file mode 100644 index 00000000..ae78e924 --- /dev/null +++ b/examples/src/main/java/software/amazon/lambda/durable/examples/MapConfigExample.java @@ -0,0 +1,73 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples; + +import java.util.List; +import java.util.stream.Collectors; +import software.amazon.lambda.durable.CompletionConfig; +import software.amazon.lambda.durable.DurableContext; +import software.amazon.lambda.durable.DurableHandler; +import software.amazon.lambda.durable.MapConfig; + +/** + * Example demonstrating MapConfig options: concurrency limiting and completion strategies. + * + *

This handler runs two map operations to showcase different configurations: + * + *

    + *
  1. Sequential processing with {@code maxConcurrency(1)} — items run one at a time + *
  2. Early termination with {@code minSuccessful(2)} — stops after 2 items succeed + *
+ */ +public class MapConfigExample extends DurableHandler { + + @Override + public String handleRequest(GreetingRequest input, DurableContext context) { + var name = input.getName(); + context.getLogger().info("Starting map config example for {}", name); + + // Part 1: Sequential execution with maxConcurrency=1 + var items = List.of("alpha", "beta", "gamma"); + var sequentialConfig = MapConfig.builder().maxConcurrency(1).build(); + + var sequentialResult = context.map( + "sequential-processing", + items, + String.class, + (ctx, item, index) -> { + return ctx.step("transform-" + index, String.class, stepCtx -> item.toUpperCase() + "-" + name); + }, + sequentialConfig); + + var sequentialOutput = String.join(", ", sequentialResult.results()); + context.getLogger().info("Sequential result: {}", sequentialOutput); + + // Part 2: Early termination with minSuccessful(2) + var candidates = List.of("server-1", "server-2", "server-3", "server-4", "server-5"); + var earlyTermConfig = MapConfig.builder() + .maxConcurrency(1) + .completionConfig(CompletionConfig.minSuccessful(2)) + .build(); + + var earlyTermResult = context.map( + "find-healthy-servers", + candidates, + String.class, + (ctx, server, index) -> { + return ctx.step("health-check-" + index, String.class, stepCtx -> server + ":healthy"); + }, + earlyTermConfig); + + context.getLogger() + .info( + "Early termination: reason={}, succeeded={}", + earlyTermResult.completionReason(), + earlyTermResult.succeeded().size()); + + var healthyServers = earlyTermResult.succeeded().stream().collect(Collectors.joining(", ")); + + return String.format( + "sequential=[%s] | earlyTerm=[%s] reason=%s", + sequentialOutput, healthyServers, earlyTermResult.completionReason()); + } +} diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/MapErrorHandlingExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/MapErrorHandlingExample.java new file mode 100644 index 00000000..0988eeca --- /dev/null +++ b/examples/src/main/java/software/amazon/lambda/durable/examples/MapErrorHandlingExample.java @@ -0,0 +1,78 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples; + +import java.util.List; +import java.util.stream.Collectors; +import software.amazon.lambda.durable.DurableContext; +import software.amazon.lambda.durable.DurableHandler; +import software.amazon.lambda.durable.StepConfig; +import software.amazon.lambda.durable.retry.RetryStrategies; + +/** + * Example demonstrating error handling with the map operation. + * + *

Shows how individual item failures are isolated and captured in the {@code BatchResult}, while other items + * continue to succeed. Demonstrates inspecting partial results using {@code allSucceeded()}, {@code getError()}, + * {@code succeeded()}, and {@code failed()}. + * + *

    + *
  1. Map over a list of order IDs concurrently + *
  2. Some orders intentionally fail to simulate real-world partial failures + *
  3. Inspect the BatchResult to handle successes and failures separately + *
+ */ +public class MapErrorHandlingExample extends DurableHandler { + + @Override + public String handleRequest(GreetingRequest input, DurableContext context) { + var name = input.getName(); + context.getLogger().info("Starting map error handling example for {}", name); + + var orderIds = List.of("order-1", "order-INVALID", "order-3", "order-ERROR", "order-5"); + + // Map over orders — some will fail, but others continue processing + var result = context.map("process-orders", orderIds, String.class, (ctx, orderId, index) -> { + return ctx.step( + "process-" + index, + String.class, + stepCtx -> { + if (orderId.contains("INVALID")) { + throw new IllegalArgumentException("Invalid order: " + orderId); + } + if (orderId.contains("ERROR")) { + throw new RuntimeException("Processing error for: " + orderId); + } + return "Processed " + orderId + " for " + name; + }, + StepConfig.builder() + .retryStrategy(RetryStrategies.Presets.NO_RETRY) + .build()); + }); + + context.getLogger() + .info( + "Map completed: allSucceeded={}, succeeded={}, failed={}", + result.allSucceeded(), + result.succeeded().size(), + result.failed().size()); + + // Build a summary showing successful results and error messages + var successSummary = result.succeeded().stream().collect(Collectors.joining(", ")); + + var errorSummary = new StringBuilder(); + for (int i = 0; i < result.size(); i++) { + if (result.getError(i) != null) { + errorSummary.append( + String.format("index %d: %s; ", i, result.getError(i).getMessage())); + } + } + + return String.format( + "succeeded=%d, failed=%d | results=[%s] | errors=[%s]", + result.succeeded().size(), + result.failed().size(), + successSummary, + errorSummary.toString().trim()); + } +} diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java index 1fd49d93..e8585a42 100644 --- a/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java +++ b/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java @@ -548,4 +548,40 @@ void testSimpleMapExample() { assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); assertEquals("Hello, Alice! | Hello, ALICE! | Hello, alice!", result.getResult(String.class)); } + + @Test + void testMapErrorHandlingExample() { + var runner = + CloudDurableTestRunner.create(arn("map-error-handling-example"), GreetingRequest.class, String.class); + var result = runner.run(new GreetingRequest("Alice")); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + var output = result.getResult(String.class); + assertNotNull(output); + + // 3 of 5 orders succeed, 2 fail + assertTrue(output.contains("succeeded=3")); + assertTrue(output.contains("failed=2")); + assertTrue(output.contains("Processed order-1 for Alice")); + assertTrue(output.contains("Processed order-3 for Alice")); + assertTrue(output.contains("Processed order-5 for Alice")); + } + + @Test + void testMapConfigExample() { + var runner = CloudDurableTestRunner.create(arn("map-config-example"), GreetingRequest.class, String.class); + var result = runner.run(new GreetingRequest("Alice")); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + var output = result.getResult(String.class); + assertNotNull(output); + + // Sequential part: all 3 items processed + assertTrue(output.contains("ALPHA-Alice")); + assertTrue(output.contains("BETA-Alice")); + assertTrue(output.contains("GAMMA-Alice")); + + // Early termination part + assertTrue(output.contains("reason=MIN_SUCCESSFUL_REACHED")); + } } diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/MapConfigExampleTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/MapConfigExampleTest.java new file mode 100644 index 00000000..147ca425 --- /dev/null +++ b/examples/src/test/java/software/amazon/lambda/durable/examples/MapConfigExampleTest.java @@ -0,0 +1,47 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; +import software.amazon.lambda.durable.model.ExecutionStatus; +import software.amazon.lambda.durable.testing.LocalDurableTestRunner; + +class MapConfigExampleTest { + + @Test + void testSequentialAndEarlyTermination() { + var handler = new MapConfigExample(); + var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); + + var result = runner.runUntilComplete(new GreetingRequest("Alice")); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + var output = result.getResult(String.class); + + // Sequential part: all 3 items processed + assertTrue(output.contains("ALPHA-Alice")); + assertTrue(output.contains("BETA-Alice")); + assertTrue(output.contains("GAMMA-Alice")); + + // Early termination part: only 2 servers needed + assertTrue(output.contains("reason=MIN_SUCCESSFUL_REACHED")); + assertTrue(output.contains("server-1:healthy")); + assertTrue(output.contains("server-2:healthy")); + } + + @Test + void testReplay() { + var handler = new MapConfigExample(); + var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); + + var input = new GreetingRequest("Bob"); + var result1 = runner.runUntilComplete(input); + assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus()); + + // Replay — should return same results + var result2 = runner.runUntilComplete(input); + assertEquals(result1.getResult(String.class), result2.getResult(String.class)); + } +} diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/MapErrorHandlingExampleTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/MapErrorHandlingExampleTest.java new file mode 100644 index 00000000..bb166e14 --- /dev/null +++ b/examples/src/test/java/software/amazon/lambda/durable/examples/MapErrorHandlingExampleTest.java @@ -0,0 +1,57 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; +import software.amazon.lambda.durable.model.ExecutionStatus; +import software.amazon.lambda.durable.testing.LocalDurableTestRunner; + +class MapErrorHandlingExampleTest { + + @Test + void testPartialFailure() { + var handler = new MapErrorHandlingExample(); + var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); + + var result = runner.runUntilComplete(new GreetingRequest("Alice")); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + var output = result.getResult(String.class); + + // 3 of 5 orders succeed, 2 fail + assertTrue(output.contains("succeeded=3")); + assertTrue(output.contains("failed=2")); + + // Successful orders are in the results + assertTrue(output.contains("Processed order-1 for Alice")); + assertTrue(output.contains("Processed order-3 for Alice")); + assertTrue(output.contains("Processed order-5 for Alice")); + + // Error messages are captured + assertTrue(output.contains("Invalid order: order-INVALID")); + assertTrue(output.contains("Processing error for: order-ERROR")); + } + + @Test + void testReplay() { + var handler = new MapErrorHandlingExample(); + var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); + + var input = new GreetingRequest("Bob"); + var result1 = runner.runUntilComplete(input); + assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus()); + + // Replay — errors are not preserved in checkpoints (Throwable is not serializable), + // so the replay result will show failed=0 instead of failed=2. + // The successful results should still match. + var result2 = runner.runUntilComplete(input); + assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); + var output = result2.getResult(String.class); + assertTrue(output.contains("succeeded=3")); + assertTrue(output.contains("Processed order-1 for Bob")); + assertTrue(output.contains("Processed order-3 for Bob")); + assertTrue(output.contains("Processed order-5 for Bob")); + } +} diff --git a/examples/template.yaml b/examples/template.yaml index b66b10dc..0487db95 100644 --- a/examples/template.yaml +++ b/examples/template.yaml @@ -493,6 +493,56 @@ Resources: DockerContext: ../ DockerTag: durable-examples + MapErrorHandlingExampleFunction: + Type: AWS::Serverless::Function + Properties: + PackageType: Image + FunctionName: !Join + - '' + - - 'map-error-handling-example' + - !Ref FunctionNameSuffix + ImageConfig: + Command: ["software.amazon.lambda.durable.examples.MapErrorHandlingExample::handleRequest"] + DurableConfig: + ExecutionTimeout: 300 + RetentionPeriodInDays: 7 + Policies: + - Statement: + - Effect: Allow + Action: + - lambda:CheckpointDurableExecutions + - lambda:GetDurableExecutionState + Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:map-error-handling-example${FunctionNameSuffix}" + Metadata: + Dockerfile: !Ref DockerFile + DockerContext: ../ + DockerTag: durable-examples + + MapConfigExampleFunction: + Type: AWS::Serverless::Function + Properties: + PackageType: Image + FunctionName: !Join + - '' + - - 'map-config-example' + - !Ref FunctionNameSuffix + ImageConfig: + Command: ["software.amazon.lambda.durable.examples.MapConfigExample::handleRequest"] + DurableConfig: + ExecutionTimeout: 300 + RetentionPeriodInDays: 7 + Policies: + - Statement: + - Effect: Allow + Action: + - lambda:CheckpointDurableExecutions + - lambda:GetDurableExecutionState + Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:map-config-example${FunctionNameSuffix}" + Metadata: + Dockerfile: !Ref DockerFile + DockerContext: ../ + DockerTag: durable-examples + Outputs: NoopExampleFunction: Description: Noop Example Function ARN @@ -646,3 +696,19 @@ Outputs: Description: Simple Map Example Function Name Value: !Ref SimpleMapExampleFunction + MapErrorHandlingExampleFunction: + Description: Map Error Handling Example Function ARN + Value: !GetAtt MapErrorHandlingExampleFunction.Arn + + MapErrorHandlingExampleFunctionName: + Description: Map Error Handling Example Function Name + Value: !Ref MapErrorHandlingExampleFunction + + MapConfigExampleFunction: + Description: Map Config Example Function ARN + Value: !GetAtt MapConfigExampleFunction.Arn + + MapConfigExampleFunctionName: + Description: Map Config Example Function Name + Value: !Ref MapConfigExampleFunction + diff --git a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapInputValidationTest.java b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapInputValidationTest.java new file mode 100644 index 00000000..a433fca1 --- /dev/null +++ b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapInputValidationTest.java @@ -0,0 +1,65 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.HashSet; +import java.util.List; +import org.junit.jupiter.api.Test; +import software.amazon.lambda.durable.model.CompletionReason; +import software.amazon.lambda.durable.model.ExecutionStatus; +import software.amazon.lambda.durable.testing.LocalDurableTestRunner; + +class MapInputValidationTest { + + @Test + void mapWithNullCollection_throwsNullPointerException() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + context.map("test", null, String.class, (ctx, item, index) -> item); + return "done"; + }); + + var result = runner.run("test"); + assertEquals(ExecutionStatus.FAILED, result.getStatus()); + } + + @Test + void mapWithNullFunction_throwsNullPointerException() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + context.map("test", List.of("a"), String.class, null); + return "done"; + }); + + var result = runner.run("test"); + assertEquals(ExecutionStatus.FAILED, result.getStatus()); + } + + @Test + void mapWithHashSet_throwsIllegalArgumentException() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = new HashSet<>(List.of("a", "b")); + context.map("test", items, String.class, (ctx, item, index) -> item); + return "done"; + }); + + var result = runner.run("test"); + assertEquals(ExecutionStatus.FAILED, result.getStatus()); + } + + @Test + void mapWithEmptyCollection_returnsEmptyBatchResult() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var result = context.map("empty-map", List.of(), String.class, (ctx, item, index) -> item); + + assertEquals(0, result.size()); + assertTrue(result.allSucceeded()); + assertEquals(CompletionReason.ALL_COMPLETED, result.completionReason()); + + return "done"; + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + } +} diff --git a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java index 07ba7799..301f1222 100644 --- a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java +++ b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java @@ -4,8 +4,11 @@ import static org.junit.jupiter.api.Assertions.*; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.Test; +import software.amazon.lambda.durable.model.CompletionReason; import software.amazon.lambda.durable.model.ExecutionStatus; import software.amazon.lambda.durable.testing.LocalDurableTestRunner; @@ -49,4 +52,425 @@ void testMapWithStepsInsideBranches() { assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); assertEquals("HELLO WORLD", result.getResult(String.class)); } + + @Test + void testMapPartialFailure_failedItemDoesNotPreventOthers() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("a", "FAIL", "c"); + var result = context.map("partial-fail", items, String.class, (ctx, item, index) -> { + if ("FAIL".equals(item)) { + throw new RuntimeException("item failed"); + } + return item.toUpperCase(); + }); + + // other items complete despite one failure + assertFalse(result.allSucceeded()); + assertEquals(3, result.size()); + + // failed item captured at corresponding index + assertEquals("A", result.getResult(0)); + assertNull(result.getResult(1)); + assertNotNull(result.getError(1)); + assertTrue(result.getError(1).getMessage().contains("item failed")); + assertEquals("C", result.getResult(2)); + + // successful items have no error + assertNull(result.getError(0)); + assertNull(result.getError(2)); + + assertEquals(CompletionReason.ALL_COMPLETED, result.completionReason()); + + return "done"; + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + } + + @Test + void testMapMultipleFailures_allCapturedAtCorrectIndices() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("ok", "bad1", "ok2", "bad2"); + var result = context.map("multi-fail", items, String.class, (ctx, item, index) -> { + if (item.startsWith("bad")) { + throw new IllegalArgumentException("invalid: " + item); + } + return item.toUpperCase(); + }); + + assertFalse(result.allSucceeded()); + assertEquals(4, result.size()); + + // Successful items + assertEquals("OK", result.getResult(0)); + assertNull(result.getError(0)); + assertEquals("OK2", result.getResult(2)); + assertNull(result.getError(2)); + + // Failed items at correct indices + assertNull(result.getResult(1)); + assertNotNull(result.getError(1)); + assertTrue(result.getError(1).getMessage().contains("bad1")); + assertNull(result.getResult(3)); + assertNotNull(result.getError(3)); + assertTrue(result.getError(3).getMessage().contains("bad2")); + + assertEquals(2, result.succeeded().size()); + assertEquals(2, result.failed().size()); + + return "done"; + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + } + + @Test + void testMapAllItemsFail() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("x", "y"); + var result = context.map("all-fail", items, String.class, (ctx, item, index) -> { + throw new RuntimeException("fail-" + item); + }); + + assertFalse(result.allSucceeded()); + assertEquals(2, result.size()); + assertEquals(0, result.succeeded().size()); + assertEquals(2, result.failed().size()); + + for (int i = 0; i < result.size(); i++) { + assertNull(result.getResult(i)); + assertNotNull(result.getError(i)); + } + assertTrue(result.getError(0).getMessage().contains("fail-x")); + assertTrue(result.getError(1).getMessage().contains("fail-y")); + + return "done"; + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + } + + @Test + void testMapWithMaxConcurrency1_sequentialExecution() { + var peakConcurrency = new AtomicInteger(0); + var currentConcurrency = new AtomicInteger(0); + + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("a", "b", "c", "d"); + var config = MapConfig.builder().maxConcurrency(1).build(); + var result = context.map( + "sequential-map", + items, + String.class, + (ctx, item, index) -> { + var concurrent = currentConcurrency.incrementAndGet(); + peakConcurrency.updateAndGet(peak -> Math.max(peak, concurrent)); + // Simulate some work via a durable step + var stepResult = ctx.step("process-" + index, String.class, stepCtx -> item.toUpperCase()); + currentConcurrency.decrementAndGet(); + return stepResult; + }, + config); + + assertTrue(result.allSucceeded()); + assertEquals(4, result.size()); + assertEquals("A", result.getResult(0)); + assertEquals("B", result.getResult(1)); + assertEquals("C", result.getResult(2)); + assertEquals("D", result.getResult(3)); + + return String.join(",", result.results()); + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("A,B,C,D", result.getResult(String.class)); + // With maxConcurrency=1, at most 1 branch should run at a time + assertTrue(peakConcurrency.get() <= 1, "Expected peak concurrency <= 1 but was " + peakConcurrency.get()); + } + + @Test + void testMapWithMaxConcurrency2_limitedConcurrency() { + var peakConcurrency = new AtomicInteger(0); + var currentConcurrency = new AtomicInteger(0); + + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("a", "b", "c", "d", "e"); + var config = MapConfig.builder().maxConcurrency(2).build(); + var result = context.map( + "limited-map", + items, + String.class, + (ctx, item, index) -> { + var concurrent = currentConcurrency.incrementAndGet(); + peakConcurrency.updateAndGet(peak -> Math.max(peak, concurrent)); + var stepResult = ctx.step("process-" + index, String.class, stepCtx -> item.toUpperCase()); + currentConcurrency.decrementAndGet(); + return stepResult; + }, + config); + + assertTrue(result.allSucceeded()); + assertEquals(5, result.size()); + assertEquals("A", result.getResult(0)); + assertEquals("B", result.getResult(1)); + assertEquals("C", result.getResult(2)); + assertEquals("D", result.getResult(3)); + assertEquals("E", result.getResult(4)); + + return String.join(",", result.results()); + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("A,B,C,D,E", result.getResult(String.class)); + // With maxConcurrency=2, at most 2 branches should run at a time + assertTrue(peakConcurrency.get() <= 2, "Expected peak concurrency <= 2 but was " + peakConcurrency.get()); + } + + @Test + void testMapWithMaxConcurrencyNull_unlimitedConcurrency() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("a", "b", "c"); + // Default config has null maxConcurrency (unlimited) + var result = context.map("unlimited-map", items, String.class, (ctx, item, index) -> { + return item.toUpperCase(); + }); + + assertTrue(result.allSucceeded()); + assertEquals(3, result.size()); + assertEquals("A", result.getResult(0)); + assertEquals("B", result.getResult(1)); + assertEquals("C", result.getResult(2)); + + return String.join(",", result.results()); + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("A,B,C", result.getResult(String.class)); + } + + @Test + void testMapWithMaxConcurrency1_partialFailure() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("a", "FAIL", "c"); + var config = MapConfig.builder().maxConcurrency(1).build(); + var result = context.map( + "sequential-partial-fail", + items, + String.class, + (ctx, item, index) -> { + if ("FAIL".equals(item)) { + throw new RuntimeException("item failed"); + } + return item.toUpperCase(); + }, + config); + + assertFalse(result.allSucceeded()); + assertEquals(3, result.size()); + assertEquals("A", result.getResult(0)); + assertNull(result.getResult(1)); + assertNotNull(result.getError(1)); + assertEquals("C", result.getResult(2)); + + return "done"; + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + } + + @Test + void testMapWithToleratedFailureCount_earlyTermination() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + // 5 items, maxConcurrency=1 so they run sequentially: ok, FAIL1, FAIL2, ok2, ok3 + // toleratedFailureCount=1 means we stop after the 2nd failure + var items = List.of("ok", "FAIL1", "FAIL2", "ok2", "ok3"); + var config = MapConfig.builder() + .maxConcurrency(1) + .completionConfig(CompletionConfig.toleratedFailureCount(1)) + .build(); + var result = context.map( + "tolerated-fail", + items, + String.class, + (ctx, item, index) -> { + if (item.startsWith("FAIL")) { + throw new RuntimeException("failed: " + item); + } + return item.toUpperCase(); + }, + config); + + assertEquals(CompletionReason.FAILURE_TOLERANCE_EXCEEDED, result.completionReason()); + assertFalse(result.allSucceeded()); + // Items after early termination should not have been executed + assertEquals(5, result.size()); + assertEquals("OK", result.getResult(0)); + assertNull(result.getResult(1)); + assertNotNull(result.getError(1)); + assertNull(result.getResult(2)); + assertNotNull(result.getError(2)); + + return "done"; + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + } + + @Test + void testMapWithMinSuccessful_earlyTermination() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + // 5 items, maxConcurrency=1 so they run sequentially + // minSuccessful=2 means we stop after 2 successes + var items = List.of("a", "b", "c", "d", "e"); + var config = MapConfig.builder() + .maxConcurrency(1) + .completionConfig(CompletionConfig.minSuccessful(2)) + .build(); + var result = context.map( + "min-successful", items, String.class, (ctx, item, index) -> item.toUpperCase(), config); + + assertEquals(CompletionReason.MIN_SUCCESSFUL_REACHED, result.completionReason()); + // First 2 items should have results, remaining should be null (never started) + assertEquals(5, result.size()); + assertEquals("A", result.getResult(0)); + assertEquals("B", result.getResult(1)); + + return "done"; + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + } + + @Test + void testMapWithFirstSuccessful_stopsAfterFirstSuccess() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("a", "b", "c"); + var config = MapConfig.builder() + .maxConcurrency(1) + .completionConfig(CompletionConfig.firstSuccessful()) + .build(); + var result = context.map( + "first-successful", items, String.class, (ctx, item, index) -> item.toUpperCase(), config); + + assertEquals(CompletionReason.MIN_SUCCESSFUL_REACHED, result.completionReason()); + assertEquals(3, result.size()); + assertEquals("A", result.getResult(0)); + // Remaining items should not have been started + assertNull(result.getResult(1)); + assertNull(result.getResult(2)); + + return "done"; + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + } + + @Test + void testMapReplayAfterInterruption_cachedResultsUsed() { + var executionCounts = new AtomicInteger(0); + + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("a", "b", "c"); + var result = context.map("replay-map", items, String.class, (ctx, item, index) -> { + executionCounts.incrementAndGet(); + return item.toUpperCase(); + }); + + assertTrue(result.allSucceeded()); + assertEquals(3, result.size()); + assertEquals("A", result.getResult(0)); + assertEquals("B", result.getResult(1)); + assertEquals("C", result.getResult(2)); + + return String.join(",", result.results()); + }); + + // First execution — all items execute + var result1 = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus()); + assertEquals("A,B,C", result1.getResult(String.class)); + var firstRunCount = executionCounts.get(); + assertTrue(firstRunCount >= 3, "Expected at least 3 executions on first run but got " + firstRunCount); + + // Second execution — replay should return cached results without re-executing + var result2 = runner.run("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); + assertEquals("A,B,C", result2.getResult(String.class)); + assertEquals(firstRunCount, executionCounts.get(), "Map functions should not re-execute on replay"); + } + + @Test + void testMapReplayWithSteps_cachedStepResultsUsed() { + var stepExecutionCount = new AtomicInteger(0); + + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("hello", "world"); + var result = context.map("replay-steps-map", items, String.class, (ctx, item, index) -> { + return ctx.step("step-" + index, String.class, stepCtx -> { + stepExecutionCount.incrementAndGet(); + return item.toUpperCase(); + }); + }); + + assertTrue(result.allSucceeded()); + return String.join(" ", result.results()); + }); + + // First execution + var result1 = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus()); + assertEquals("HELLO WORLD", result1.getResult(String.class)); + var firstRunStepCount = stepExecutionCount.get(); + assertTrue(firstRunStepCount >= 2, "Expected at least 2 step executions but got " + firstRunStepCount); + + // Replay — steps should not re-execute + var result2 = runner.run("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); + assertEquals("HELLO WORLD", result2.getResult(String.class)); + assertEquals(firstRunStepCount, stepExecutionCount.get(), "Steps should not re-execute on replay"); + } + + @Test + void testNestedMap_mapInsideMapBranch() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var outerItems = List.of("group1", "group2"); + var outerResult = context.map("outer-map", outerItems, String.class, (outerCtx, group, outerIndex) -> { + // Each outer item runs an inner map + var innerItems = List.of(group + "-a", group + "-b"); + var innerResult = outerCtx.map( + "inner-map-" + outerIndex, + innerItems, + String.class, + (innerCtx, item, innerIndex) -> item.toUpperCase()); + + assertTrue(innerResult.allSucceeded()); + return String.join("+", innerResult.results()); + }); + + assertTrue(outerResult.allSucceeded()); + assertEquals(2, outerResult.size()); + assertEquals("GROUP1-A+GROUP1-B", outerResult.getResult(0)); + assertEquals("GROUP2-A+GROUP2-B", outerResult.getResult(1)); + + var combined = new ArrayList(); + for (int i = 0; i < outerResult.size(); i++) { + combined.add(outerResult.getResult(i)); + } + return String.join("|", combined); + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("GROUP1-A+GROUP1-B|GROUP2-A+GROUP2-B", result.getResult(String.class)); + } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/CompletionConfig.java b/sdk/src/main/java/software/amazon/lambda/durable/CompletionConfig.java index 21fe7c3f..df8de8f9 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/CompletionConfig.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/CompletionConfig.java @@ -34,6 +34,21 @@ public static CompletionConfig firstSuccessful() { return new CompletionConfig(1, null, null); } + /** Complete when the specified number of items have succeeded. */ + public static CompletionConfig minSuccessful(int count) { + return new CompletionConfig(count, null, null); + } + + /** Complete when more than the specified number of failures have occurred. */ + public static CompletionConfig toleratedFailureCount(int count) { + return new CompletionConfig(null, count, null); + } + + /** Complete when the failure percentage exceeds the specified threshold (0.0 to 1.0). */ + public static CompletionConfig toleratedFailurePercentage(double percentage) { + return new CompletionConfig(null, null, percentage); + } + /** @return minimum number of successful items required, or null if not set */ public Integer minSuccessful() { return minSuccessful; diff --git a/sdk/src/main/java/software/amazon/lambda/durable/model/BatchResult.java b/sdk/src/main/java/software/amazon/lambda/durable/model/BatchResult.java index 5f78259d..42f01aed 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/model/BatchResult.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/model/BatchResult.java @@ -2,6 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 package software.amazon.lambda.durable.model; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -11,11 +15,20 @@ *

Holds ordered results and errors from a batch of concurrent operations. Each index corresponds to the input item * at the same position. Includes the {@link CompletionReason} indicating why the operation completed. * + *

When serialized for checkpointing, only results and completionReason are included. Errors are excluded because + * they contain Throwable objects that are not reliably serializable. On replay from a small-result checkpoint, errors + * will be an empty list with null entries matching the results size. + * * @param the result type of each item */ public class BatchResult { + @JsonProperty("results") private final List results; + + @JsonIgnore private final List errors; + + @JsonProperty("completionReason") private final CompletionReason completionReason; public BatchResult(List results, List errors, CompletionReason completionReason) { @@ -24,6 +37,15 @@ public BatchResult(List results, List errors, CompletionReason com this.completionReason = completionReason; } + @JsonCreator + private BatchResult( + @JsonProperty("results") List results, + @JsonProperty("completionReason") CompletionReason completionReason) { + this.results = results != null ? results : Collections.emptyList(); + this.errors = new ArrayList<>(Collections.nCopies(this.results.size(), null)); + this.completionReason = completionReason != null ? completionReason : CompletionReason.ALL_COMPLETED; + } + /** Returns an empty BatchResult with no results and no errors. */ public static BatchResult empty() { return new BatchResult<>(Collections.emptyList(), Collections.emptyList(), CompletionReason.ALL_COMPLETED); diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseConcurrentOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseConcurrentOperation.java index c8339b44..ed97bdf3 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseConcurrentOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseConcurrentOperation.java @@ -7,6 +7,8 @@ import java.util.Collections; import java.util.List; import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -45,6 +47,7 @@ public abstract class BaseConcurrentOperation extends BaseDurableOperation private final List> branches = new ArrayList<>(); private final Queue> pendingQueue = new ConcurrentLinkedQueue<>(); + private final Set> startedBranches = ConcurrentHashMap.newKeySet(); private final AtomicInteger activeBranches = new AtomicInteger(0); private final AtomicInteger succeeded = new AtomicInteger(0); private final AtomicInteger failed = new AtomicInteger(0); @@ -136,8 +139,20 @@ protected ChildContextOperation branchInternal( rootContext); branches.add(branch); + // Attach callback BEFORE execution starts (or before future can complete). + // The thenRun runs synchronously inside the synchronized(completionFuture) block + // when completionFuture.complete(null) is called, so it executes on the checkpoint + // processing thread. This callback only does lightweight work: update counters, + // evaluate CompletionConfig, dequeue and start next branch. + branch.completionFuture.thenRun(() -> { + var op = branch.getOperation(); + boolean success = op != null && op.status() == OperationStatus.SUCCEEDED; + onChildContextComplete(branch, success); + }); + if (!earlyTermination && (maxConcurrency == null || activeBranches.get() < maxConcurrency)) { activeBranches.incrementAndGet(); + startedBranches.add(branch); branch.execute(); } else { pendingQueue.add(branch); @@ -147,6 +162,11 @@ protected ChildContextOperation branchInternal( // ========== completion callback ========== + /** + * Called on the checkpoint processing thread when a branch's completionFuture completes. Only does lightweight + * work: update counters, evaluate CompletionConfig, dequeue and start next branch. Does NOT call + * finalizeOperation() or checkpointResult() — those happen in get() on the context thread. + */ protected void onChildContextComplete(ChildContextOperation branch, boolean success) { if (success) { succeeded.incrementAndGet(); @@ -167,6 +187,7 @@ protected void onChildContextComplete(ChildContextOperation branch, boolean s var next = pendingQueue.poll(); if (next != null) { // activeBranches stays the same (one completing, one starting) + startedBranches.add(next); next.execute(); // registers new thread internally via ChildContextOperation.start() } else { activeBranches.decrementAndGet(); @@ -175,10 +196,6 @@ protected void onChildContextComplete(ChildContextOperation branch, boolean s activeBranches.decrementAndGet(); } // completed branch's thread is deregistered by ChildContextOperation's close() in BaseContext - - if (activeBranches.get() == 0 && pendingQueue.isEmpty()) { - finalizeOperation(); - } } // ========== completion evaluation ========== @@ -206,7 +223,7 @@ private boolean shouldTerminateEarly() { return false; } - private CompletionReason evaluateCompletionReason() { + protected CompletionReason evaluateCompletionReason() { if (completionConfig.minSuccessful() != null && succeeded.get() >= completionConfig.minSuccessful()) { return CompletionReason.MIN_SUCCESSFUL_REACHED; } @@ -222,18 +239,25 @@ private void finalizeOperation() { checkpointResult(result); } - private void checkpointResult(R result) { + /** + * Checkpoints the parent concurrent operation as SUCCEEDED. Uses synchronous {@code sendOperationUpdate} because + * this is called from the context thread in {@code get()}, where it is safe to block. + * + *

Small results (<256KB) are checkpointed directly as payload. Large results are checkpointed with + * {@code replayChildren=true} and an empty payload, so on replay the result is reconstructed from child contexts. + */ + protected void checkpointResult(R result) { var serialized = serializeResult(result); var serializedBytes = serialized.getBytes(StandardCharsets.UTF_8); if (serializedBytes.length < LARGE_RESULT_THRESHOLD) { - sendOperationUpdateAsync(OperationUpdate.builder() + sendOperationUpdate(OperationUpdate.builder() .action(OperationAction.SUCCEED) .subType(subType.getValue()) .payload(serialized)); } else { // Large result: checkpoint with empty payload + replayChildren flag - sendOperationUpdateAsync(OperationUpdate.builder() + sendOperationUpdate(OperationUpdate.builder() .action(OperationAction.SUCCEED) .subType(subType.getValue()) .payload("") @@ -296,4 +320,14 @@ protected boolean isEarlyTermination() { protected DurableContext getRootContext() { return rootContext; } + + /** Returns the pending queue of branches that have not yet been started. */ + protected Queue> getPendingQueue() { + return pendingQueue; + } + + /** Returns the set of branches that have been started (had execute() called). */ + protected Set> getStartedBranches() { + return startedBranches; + } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java index 0868e98f..654055b2 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java @@ -5,6 +5,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import software.amazon.awssdk.services.lambda.model.OperationStatus; import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.MapConfig; import software.amazon.lambda.durable.MapFunction; @@ -69,19 +70,61 @@ protected void startBranches() { } /** - * Waits for all branches to complete and aggregates results. Overrides the base class get() to directly wait on - * each branch rather than relying on the parent operation's completion future, which avoids thread coordination - * issues between the checkpoint processing thread and the context thread. + * Waits for all branches to complete and aggregates results, then checkpoints the parent MAP operation. + * + *

Handles three cases: + * + *

    + *
  • Replay with small result (parent SUCCEEDED, no replayChildren): deserialize cached BatchResult directly + *
  • Replay with large result (parent SUCCEEDED + replayChildren): aggregate from child replays, no + * re-checkpoint needed + *
  • First execution or STARTED replay: aggregate from branches, then checkpoint parent result + *
*/ @Override - @SuppressWarnings("unchecked") public BatchResult get() { + // Check if parent operation already completed (replay with small result) + if (isOperationCompleted()) { + var op = getOperation(); + if (op != null && op.status() == OperationStatus.SUCCEEDED) { + if (op.contextDetails() != null + && Boolean.TRUE.equals(op.contextDetails().replayChildren())) { + // Large result on replay: aggregate from child replays + return aggregateResults(); + } + // Small result on replay: deserialize cached BatchResult + var result = (op.contextDetails() != null) ? op.contextDetails().result() : null; + return deserializeResult(result); + } + } + + // First execution, STARTED replay, or SUCCEEDED+replayChildren replay: aggregate from branches + var batchResult = aggregateResults(); + + // Check if parent is already SUCCEEDED (replayChildren case) — skip re-checkpointing + var existingOp = getOperation(); + if (existingOp == null || existingOp.status() != OperationStatus.SUCCEEDED) { + // First execution or STARTED: checkpoint parent result from context thread (safe to .join() here) + checkpointResult(batchResult); + } + + return batchResult; + } + + @Override + @SuppressWarnings("unchecked") + protected BatchResult aggregateResults() { var branches = getBranches(); + var pendingQueue = getPendingQueue(); var results = new ArrayList(Collections.nCopies(items.size(), null)); var errors = new ArrayList(Collections.nCopies(items.size(), null)); for (int i = 0; i < branches.size(); i++) { var branch = (ChildContextOperation) branches.get(i); + // Skip branches still in the pending queue (never started due to early termination) + if (pendingQueue.contains(branch)) { + continue; + } try { results.set(i, branch.get()); } catch (Exception e) { @@ -89,13 +132,10 @@ public BatchResult get() { } } - var reason = getCompletionReason() != null ? getCompletionReason() : CompletionReason.ALL_COMPLETED; + var reason = getCompletionReason(); + if (reason == null) { + reason = !pendingQueue.isEmpty() ? evaluateCompletionReason() : CompletionReason.ALL_COMPLETED; + } return new BatchResult<>(results, errors, reason); } - - @Override - protected BatchResult aggregateResults() { - // Not used — get() handles aggregation directly - throw new UnsupportedOperationException("aggregateResults should not be called directly"); - } } diff --git a/sdk/src/test/java/software/amazon/lambda/durable/CompletionConfigTest.java b/sdk/src/test/java/software/amazon/lambda/durable/CompletionConfigTest.java new file mode 100644 index 00000000..f57ce3ed --- /dev/null +++ b/sdk/src/test/java/software/amazon/lambda/durable/CompletionConfigTest.java @@ -0,0 +1,64 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; + +class CompletionConfigTest { + + @Test + void allSuccessful_zeroFailuresTolerated() { + var config = CompletionConfig.allSuccessful(); + + assertNull(config.minSuccessful()); + assertEquals(0, config.toleratedFailureCount()); + assertNull(config.toleratedFailurePercentage()); + } + + @Test + void allCompleted_allFieldsNull() { + var config = CompletionConfig.allCompleted(); + + assertNull(config.minSuccessful()); + assertNull(config.toleratedFailureCount()); + assertNull(config.toleratedFailurePercentage()); + } + + @Test + void firstSuccessful_minSuccessfulIsOne() { + var config = CompletionConfig.firstSuccessful(); + + assertEquals(1, config.minSuccessful()); + assertNull(config.toleratedFailureCount()); + assertNull(config.toleratedFailurePercentage()); + } + + @Test + void minSuccessful_setsCount() { + var config = CompletionConfig.minSuccessful(5); + + assertEquals(5, config.minSuccessful()); + assertNull(config.toleratedFailureCount()); + assertNull(config.toleratedFailurePercentage()); + } + + @Test + void toleratedFailureCount_setsCount() { + var config = CompletionConfig.toleratedFailureCount(3); + + assertNull(config.minSuccessful()); + assertEquals(3, config.toleratedFailureCount()); + assertNull(config.toleratedFailurePercentage()); + } + + @Test + void toleratedFailurePercentage_setsPercentage() { + var config = CompletionConfig.toleratedFailurePercentage(0.25); + + assertNull(config.minSuccessful()); + assertNull(config.toleratedFailureCount()); + assertEquals(0.25, config.toleratedFailurePercentage()); + } +} diff --git a/sdk/src/test/java/software/amazon/lambda/durable/MapConfigTest.java b/sdk/src/test/java/software/amazon/lambda/durable/MapConfigTest.java new file mode 100644 index 00000000..c2a50759 --- /dev/null +++ b/sdk/src/test/java/software/amazon/lambda/durable/MapConfigTest.java @@ -0,0 +1,104 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; +import software.amazon.lambda.durable.serde.JacksonSerDes; + +class MapConfigTest { + + @Test + void defaultBuilder_hasNullMaxConcurrency() { + var config = MapConfig.builder().build(); + + assertNull(config.maxConcurrency()); + } + + @Test + void defaultBuilder_completionConfigDefaultsToAllCompleted() { + var config = MapConfig.builder().build(); + + var completion = config.completionConfig(); + assertNotNull(completion); + assertNull(completion.minSuccessful()); + assertNull(completion.toleratedFailureCount()); + assertNull(completion.toleratedFailurePercentage()); + } + + @Test + void defaultBuilder_hasNullSerDes() { + var config = MapConfig.builder().build(); + + assertNull(config.serDes()); + } + + @Test + void builderWithMaxConcurrency() { + var config = MapConfig.builder().maxConcurrency(5).build(); + + assertEquals(5, config.maxConcurrency()); + } + + @Test + void builderWithCompletionConfig() { + var completion = CompletionConfig.allSuccessful(); + + var config = MapConfig.builder().completionConfig(completion).build(); + + assertSame(completion, config.completionConfig()); + } + + @Test + void builderWithSerDes() { + var serDes = new JacksonSerDes(); + + var config = MapConfig.builder().serDes(serDes).build(); + + assertSame(serDes, config.serDes()); + } + + @Test + void builderChaining() { + var completion = CompletionConfig.firstSuccessful(); + var serDes = new JacksonSerDes(); + + var config = MapConfig.builder() + .maxConcurrency(3) + .completionConfig(completion) + .serDes(serDes) + .build(); + + assertEquals(3, config.maxConcurrency()); + assertSame(completion, config.completionConfig()); + assertSame(serDes, config.serDes()); + } + + @Test + void toBuilder_preservesValues() { + var completion = CompletionConfig.minSuccessful(2); + var serDes = new JacksonSerDes(); + var original = MapConfig.builder() + .maxConcurrency(4) + .completionConfig(completion) + .serDes(serDes) + .build(); + + var copy = original.toBuilder().build(); + + assertEquals(4, copy.maxConcurrency()); + assertSame(completion, copy.completionConfig()); + assertSame(serDes, copy.serDes()); + } + + @Test + void toBuilder_canOverrideValues() { + var original = MapConfig.builder().maxConcurrency(4).build(); + + var modified = original.toBuilder().maxConcurrency(10).build(); + + assertEquals(10, modified.maxConcurrency()); + assertEquals(4, original.maxConcurrency()); + } +} diff --git a/sdk/src/test/java/software/amazon/lambda/durable/MapFunctionTest.java b/sdk/src/test/java/software/amazon/lambda/durable/MapFunctionTest.java new file mode 100644 index 00000000..b2ebdf91 --- /dev/null +++ b/sdk/src/test/java/software/amazon/lambda/durable/MapFunctionTest.java @@ -0,0 +1,52 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; + +class MapFunctionTest { + + @Test + void isFunctionalInterface() { + assertTrue(MapFunction.class.isAnnotationPresent(FunctionalInterface.class)); + } + + @Test + void canBeUsedAsLambda() throws Exception { + MapFunction fn = (ctx, item, index) -> item.toUpperCase(); + + var result = fn.apply(null, "hello", 0); + + assertEquals("HELLO", result); + } + + @Test + void receivesCorrectIndex() throws Exception { + MapFunction fn = (ctx, item, index) -> index; + + assertEquals(0, fn.apply(null, "a", 0)); + assertEquals(5, fn.apply(null, "b", 5)); + } + + @Test + void canThrowCheckedException() { + MapFunction fn = (ctx, item, index) -> { + throw new Exception("checked"); + }; + + var ex = assertThrows(Exception.class, () -> fn.apply(null, "x", 0)); + assertEquals("checked", ex.getMessage()); + } + + @Test + void canThrowRuntimeException() { + MapFunction fn = (ctx, item, index) -> { + throw new IllegalArgumentException("bad input"); + }; + + var ex = assertThrows(IllegalArgumentException.class, () -> fn.apply(null, "x", 0)); + assertEquals("bad input", ex.getMessage()); + } +} diff --git a/sdk/src/test/java/software/amazon/lambda/durable/model/BatchResultTest.java b/sdk/src/test/java/software/amazon/lambda/durable/model/BatchResultTest.java new file mode 100644 index 00000000..4bdd9aef --- /dev/null +++ b/sdk/src/test/java/software/amazon/lambda/durable/model/BatchResultTest.java @@ -0,0 +1,101 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.model; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.Arrays; +import java.util.List; +import org.junit.jupiter.api.Test; + +class BatchResultTest { + + @Test + void empty_returnsZeroSizeResult() { + var result = BatchResult.empty(); + + assertEquals(0, result.size()); + assertTrue(result.allSucceeded()); + assertEquals(CompletionReason.ALL_COMPLETED, result.completionReason()); + assertTrue(result.results().isEmpty()); + assertTrue(result.succeeded().isEmpty()); + assertTrue(result.failed().isEmpty()); + } + + @Test + void allSucceeded_trueWhenNoErrors() { + var result = new BatchResult<>(List.of("a", "b"), Arrays.asList(null, null), CompletionReason.ALL_COMPLETED); + + assertTrue(result.allSucceeded()); + assertEquals(2, result.size()); + assertEquals("a", result.getResult(0)); + assertEquals("b", result.getResult(1)); + assertNull(result.getError(0)); + assertNull(result.getError(1)); + } + + @Test + void allSucceeded_falseWhenAnyError() { + var error = new RuntimeException("fail"); + var result = + new BatchResult<>(Arrays.asList("a", null), Arrays.asList(null, error), CompletionReason.ALL_COMPLETED); + + assertFalse(result.allSucceeded()); + } + + @Test + void getResult_returnsNullForFailedItem() { + var error = new RuntimeException("fail"); + var result = + new BatchResult<>(Arrays.asList("a", null), Arrays.asList(null, error), CompletionReason.ALL_COMPLETED); + + assertEquals("a", result.getResult(0)); + assertNull(result.getResult(1)); + } + + @Test + void getError_returnsNullForSucceededItem() { + var error = new RuntimeException("fail"); + var result = + new BatchResult<>(Arrays.asList("a", null), Arrays.asList(null, error), CompletionReason.ALL_COMPLETED); + + assertNull(result.getError(0)); + assertSame(error, result.getError(1)); + } + + @Test + void succeeded_filtersNullResults() { + var result = new BatchResult<>( + Arrays.asList("a", null, "c"), + Arrays.asList(null, new RuntimeException(), null), + CompletionReason.ALL_COMPLETED); + + assertEquals(List.of("a", "c"), result.succeeded()); + } + + @Test + void failed_filtersNullErrors() { + var error = new RuntimeException("fail"); + var result = new BatchResult<>( + Arrays.asList("a", null, "c"), Arrays.asList(null, error, null), CompletionReason.ALL_COMPLETED); + + var failures = result.failed(); + assertEquals(1, failures.size()); + assertSame(error, failures.get(0)); + } + + @Test + void completionReason_preserved() { + var result = new BatchResult<>( + List.of("a"), Arrays.asList((Throwable) null), CompletionReason.MIN_SUCCESSFUL_REACHED); + + assertEquals(CompletionReason.MIN_SUCCESSFUL_REACHED, result.completionReason()); + } + + @Test + void results_returnsUnmodifiableList() { + var result = new BatchResult<>(List.of("a"), Arrays.asList((Throwable) null), CompletionReason.ALL_COMPLETED); + + assertThrows(UnsupportedOperationException.class, () -> result.results().add("b")); + } +} diff --git a/sdk/src/test/java/software/amazon/lambda/durable/validation/ParameterValidatorTest.java b/sdk/src/test/java/software/amazon/lambda/durable/validation/ParameterValidatorTest.java index e8d74a9a..0ee0ee0c 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/validation/ParameterValidatorTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/validation/ParameterValidatorTest.java @@ -5,6 +5,15 @@ import static org.junit.jupiter.api.Assertions.*; import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import org.junit.jupiter.api.Test; class ParameterValidatorTest { @@ -290,4 +299,75 @@ void validateOperationName_withSingleCharacter_shouldPass() { assertDoesNotThrow(() -> ParameterValidator.validateOperationName("1")); assertDoesNotThrow(() -> ParameterValidator.validateOperationName("-")); } + + // ========== validateOrderedCollection ========== + + @Test + void validateOrderedCollection_withNull_shouldThrow() { + var exception = + assertThrows(IllegalArgumentException.class, () -> ParameterValidator.validateOrderedCollection(null)); + + assertEquals("items cannot be null", exception.getMessage()); + } + + @Test + void validateOrderedCollection_withList_shouldPass() { + assertDoesNotThrow(() -> ParameterValidator.validateOrderedCollection(List.of("a", "b"))); + assertDoesNotThrow(() -> ParameterValidator.validateOrderedCollection(new ArrayList<>(List.of(1, 2)))); + assertDoesNotThrow(() -> ParameterValidator.validateOrderedCollection(new LinkedList<>(List.of("x")))); + assertDoesNotThrow(() -> ParameterValidator.validateOrderedCollection(new CopyOnWriteArrayList<>(List.of(1)))); + } + + @Test + void validateOrderedCollection_withEmptyList_shouldPass() { + assertDoesNotThrow(() -> ParameterValidator.validateOrderedCollection(List.of())); + assertDoesNotThrow(() -> ParameterValidator.validateOrderedCollection(new ArrayList<>())); + } + + @Test + void validateOrderedCollection_withOrderedSet_shouldPass() { + // TreeSet has deterministic order and does not extend HashSet + assertDoesNotThrow(() -> ParameterValidator.validateOrderedCollection(new TreeSet<>(List.of("a", "b")))); + } + + @Test + void validateOrderedCollection_withLinkedHashSet_shouldThrow() { + // LinkedHashSet extends HashSet, so it's rejected even though it has deterministic order + assertThrows( + IllegalArgumentException.class, + () -> ParameterValidator.validateOrderedCollection(new LinkedHashSet<>(List.of("a", "b")))); + } + + @Test + void validateOrderedCollection_withHashSet_shouldThrow() { + var exception = assertThrows( + IllegalArgumentException.class, + () -> ParameterValidator.validateOrderedCollection(new HashSet<>(List.of("a")))); + + assertEquals("items must have deterministic iteration order", exception.getMessage()); + } + + @Test + void validateOrderedCollection_withHashMapKeySet_shouldThrow() { + var map = new HashMap(); + map.put("key", "value"); + + assertThrows(IllegalArgumentException.class, () -> ParameterValidator.validateOrderedCollection(map.keySet())); + } + + @Test + void validateOrderedCollection_withHashMapValues_shouldThrow() { + var map = new HashMap(); + map.put("key", "value"); + + assertThrows(IllegalArgumentException.class, () -> ParameterValidator.validateOrderedCollection(map.values())); + } + + @Test + void validateOrderedCollection_withConcurrentHashMapKeySet_shouldThrow() { + var map = new ConcurrentHashMap(); + map.put("key", "value"); + + assertThrows(IllegalArgumentException.class, () -> ParameterValidator.validateOrderedCollection(map.keySet())); + } }