generated from amazon-archives/__template_Apache-2.0
-
Notifications
You must be signed in to change notification settings - Fork 6
feat(map): Add completion config, BatchResult serialization, and thread-safe map execution #214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,153 @@ | ||
| ## map() – Data-Driven Concurrent Execution | ||
|
|
||
| `map()` applies a function to each item in a collection concurrently, with each item running in its own child context. Results are collected into a `BatchResult<T>` that maintains input order. | ||
|
|
||
| ```java | ||
| // Basic map: process items concurrently | ||
| var items = List.of("order-1", "order-2", "order-3"); | ||
| var result = ctx.map("process-orders", items, OrderResult.class, (childCtx, orderId, index) -> { | ||
| return childCtx.step("fetch-" + index, OrderResult.class, | ||
| stepCtx -> orderService.process(orderId)); | ||
| }); | ||
|
|
||
| // Results maintain input order | ||
| OrderResult first = result.getResult(0); // result for "order-1" | ||
| OrderResult second = result.getResult(1); // result for "order-2" | ||
| assertTrue(result.allSucceeded()); | ||
| ``` | ||
|
|
||
| Each item's function receives its own `DurableContext`, so you can use any durable operation (`step()`, `wait()`, `invoke()`, etc.) inside the map function. | ||
|
|
||
| ### mapAsync() – Non-Blocking Map | ||
|
|
||
| `mapAsync()` starts the map operation without blocking, returning a `DurableFuture<BatchResult<T>>`: | ||
|
|
||
| ```java | ||
| var future = ctx.mapAsync("process-orders", items, OrderResult.class, (childCtx, orderId, index) -> { | ||
| return childCtx.step("process-" + index, OrderResult.class, stepCtx -> process(orderId)); | ||
| }); | ||
|
|
||
| // Do other work while map runs... | ||
| var otherResult = ctx.step("other-work", String.class, stepCtx -> doOtherWork()); | ||
|
|
||
| // Block when you need the results | ||
| BatchResult<OrderResult> result = future.get(); | ||
| ``` | ||
|
|
||
| ### BatchResult | ||
|
|
||
| `BatchResult<T>` holds ordered results and errors from the map operation: | ||
|
|
||
| | Method | Description | | ||
| |--------|-------------| | ||
| | `getResult(i)` | Result at index `i`, or `null` if that item failed | | ||
| | `getError(i)` | Error at index `i`, or `null` if that item succeeded | | ||
| | `allSucceeded()` | `true` if every item succeeded | | ||
| | `size()` | Number of items in the batch | | ||
| | `results()` | All results as an unmodifiable list | | ||
| | `succeeded()` | Only the non-null (successful) results | | ||
| | `failed()` | Only the non-null errors | | ||
| | `completionReason()` | Why the operation completed (`ALL_COMPLETED`, `MIN_SUCCESSFUL_REACHED`, `FAILURE_TOLERANCE_EXCEEDED`) | | ||
|
|
||
| ### Error Isolation | ||
|
|
||
| One item's failure does not prevent other items from completing. Failed items are captured in the `BatchResult` at their corresponding index: | ||
|
|
||
| ```java | ||
| var result = ctx.map("risky-work", items, String.class, (childCtx, item, index) -> { | ||
| if (item.equals("bad")) throw new RuntimeException("failed"); | ||
| return item.toUpperCase(); | ||
| }); | ||
|
|
||
| assertFalse(result.allSucceeded()); | ||
| assertNotNull(result.getError(1)); // the failed item | ||
| assertEquals("A", result.getResult(0)); // other items still succeed | ||
| ``` | ||
|
|
||
| ### MapConfig | ||
|
|
||
| Configure concurrency limits and completion criteria with `MapConfig`: | ||
|
|
||
| ```java | ||
| var config = MapConfig.builder() | ||
| .maxConcurrency(5) // at most 5 items run at once | ||
| .completionConfig(CompletionConfig.allCompleted()) // default: run all items | ||
| .build(); | ||
|
|
||
| var result = ctx.map("process-orders", items, OrderResult.class, | ||
| (childCtx, orderId, index) -> process(childCtx, orderId), config); | ||
| ``` | ||
|
|
||
| #### Concurrency Limiting | ||
|
|
||
| `maxConcurrency` controls how many items execute concurrently. When set, items beyond the limit are queued and started as earlier items complete. Default is `null` (unlimited). | ||
|
|
||
| ```java | ||
| // Sequential execution: one item at a time | ||
| var sequential = MapConfig.builder().maxConcurrency(1).build(); | ||
|
|
||
| // Limited concurrency | ||
| var limited = MapConfig.builder().maxConcurrency(3).build(); | ||
| ``` | ||
|
|
||
| #### CompletionConfig | ||
|
|
||
| `CompletionConfig` controls when the map operation stops starting new items: | ||
|
|
||
| | Factory Method | Behavior | | ||
| |----------------|----------| | ||
| | `allCompleted()` (default) | All items run regardless of failures | | ||
| | `allSuccessful()` | Stop if any item fails (zero failures tolerated) | | ||
| | `firstSuccessful()` | Stop after the first item succeeds | | ||
| | `minSuccessful(n)` | Stop after `n` items succeed | | ||
| | `toleratedFailureCount(n)` | Stop after more than `n` failures | | ||
| | `toleratedFailurePercentage(p)` | Stop when failure rate exceeds `p` (0.0–1.0) | | ||
|
|
||
| ```java | ||
| // Stop after 2 successes | ||
| var config = MapConfig.builder() | ||
| .maxConcurrency(1) | ||
| .completionConfig(CompletionConfig.minSuccessful(2)) | ||
| .build(); | ||
|
|
||
| var result = ctx.map("find-two", items, String.class, fn, config); | ||
| assertEquals(CompletionReason.MIN_SUCCESSFUL_REACHED, result.completionReason()); | ||
| ``` | ||
|
|
||
| When early termination triggers, items that were never started have `null` for both result and error in the `BatchResult`. | ||
|
|
||
| ### Checkpoint-and-Replay | ||
|
|
||
| Map operations are fully durable. On replay after interruption: | ||
|
|
||
| - Completed items return cached results without re-execution | ||
| - Incomplete items resume from their last checkpoint | ||
| - Items that never started execute fresh | ||
|
|
||
| Small results (< 256KB) are checkpointed directly. Large results are reconstructed from individual child context checkpoints on replay. | ||
|
|
||
| ### Input Collection Requirements | ||
|
|
||
| The input collection must have deterministic iteration order. `List`, `LinkedList`, and `TreeSet` are accepted. `HashSet` and unordered map views are rejected with `IllegalArgumentException`. | ||
|
|
||
| ```java | ||
| // OK | ||
| ctx.map("work", List.of("a", "b"), String.class, fn); | ||
| ctx.map("work", new ArrayList<>(items), String.class, fn); | ||
|
|
||
| // Throws IllegalArgumentException | ||
| ctx.map("work", new HashSet<>(items), String.class, fn); | ||
| ``` | ||
|
|
||
| ### MapFunction Interface | ||
|
|
||
| The function passed to `map()` is a `MapFunction<I, O>`: | ||
|
|
||
| ```java | ||
| @FunctionalInterface | ||
| public interface MapFunction<I, O> { | ||
| O apply(DurableContext context, I item, int index) throws Exception; | ||
| } | ||
| ``` | ||
|
|
||
| The `index` parameter is the zero-based position of the item in the input collection, useful for naming operations or correlating results. | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
73 changes: 73 additions & 0 deletions
73
examples/src/main/java/software/amazon/lambda/durable/examples/MapConfigExample.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| package software.amazon.lambda.durable.examples; | ||
|
|
||
| import java.util.List; | ||
| import java.util.stream.Collectors; | ||
| import software.amazon.lambda.durable.CompletionConfig; | ||
| import software.amazon.lambda.durable.DurableContext; | ||
| import software.amazon.lambda.durable.DurableHandler; | ||
| import software.amazon.lambda.durable.MapConfig; | ||
|
|
||
| /** | ||
| * Example demonstrating MapConfig options: concurrency limiting and completion strategies. | ||
| * | ||
| * <p>This handler runs two map operations to showcase different configurations: | ||
| * | ||
| * <ol> | ||
| * <li>Sequential processing with {@code maxConcurrency(1)} — items run one at a time | ||
| * <li>Early termination with {@code minSuccessful(2)} — stops after 2 items succeed | ||
| * </ol> | ||
| */ | ||
| public class MapConfigExample extends DurableHandler<GreetingRequest, String> { | ||
|
|
||
| @Override | ||
| public String handleRequest(GreetingRequest input, DurableContext context) { | ||
| var name = input.getName(); | ||
| context.getLogger().info("Starting map config example for {}", name); | ||
|
|
||
| // Part 1: Sequential execution with maxConcurrency=1 | ||
| var items = List.of("alpha", "beta", "gamma"); | ||
| var sequentialConfig = MapConfig.builder().maxConcurrency(1).build(); | ||
|
|
||
| var sequentialResult = context.map( | ||
| "sequential-processing", | ||
| items, | ||
| String.class, | ||
| (ctx, item, index) -> { | ||
| return ctx.step("transform-" + index, String.class, stepCtx -> item.toUpperCase() + "-" + name); | ||
| }, | ||
| sequentialConfig); | ||
|
|
||
| var sequentialOutput = String.join(", ", sequentialResult.results()); | ||
| context.getLogger().info("Sequential result: {}", sequentialOutput); | ||
|
|
||
| // Part 2: Early termination with minSuccessful(2) | ||
| var candidates = List.of("server-1", "server-2", "server-3", "server-4", "server-5"); | ||
| var earlyTermConfig = MapConfig.builder() | ||
| .maxConcurrency(1) | ||
| .completionConfig(CompletionConfig.minSuccessful(2)) | ||
| .build(); | ||
|
|
||
| var earlyTermResult = context.map( | ||
| "find-healthy-servers", | ||
| candidates, | ||
| String.class, | ||
| (ctx, server, index) -> { | ||
| return ctx.step("health-check-" + index, String.class, stepCtx -> server + ":healthy"); | ||
| }, | ||
| earlyTermConfig); | ||
|
|
||
| context.getLogger() | ||
| .info( | ||
| "Early termination: reason={}, succeeded={}", | ||
| earlyTermResult.completionReason(), | ||
| earlyTermResult.succeeded().size()); | ||
|
|
||
| var healthyServers = earlyTermResult.succeeded().stream().collect(Collectors.joining(", ")); | ||
|
|
||
| return String.format( | ||
| "sequential=[%s] | earlyTerm=[%s] reason=%s", | ||
| sequentialOutput, healthyServers, earlyTermResult.completionReason()); | ||
| } | ||
| } |
78 changes: 78 additions & 0 deletions
78
examples/src/main/java/software/amazon/lambda/durable/examples/MapErrorHandlingExample.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| package software.amazon.lambda.durable.examples; | ||
|
|
||
| import java.util.List; | ||
| import java.util.stream.Collectors; | ||
| import software.amazon.lambda.durable.DurableContext; | ||
| import software.amazon.lambda.durable.DurableHandler; | ||
| import software.amazon.lambda.durable.StepConfig; | ||
| import software.amazon.lambda.durable.retry.RetryStrategies; | ||
|
|
||
| /** | ||
| * Example demonstrating error handling with the map operation. | ||
| * | ||
| * <p>Shows how individual item failures are isolated and captured in the {@code BatchResult}, while other items | ||
| * continue to succeed. Demonstrates inspecting partial results using {@code allSucceeded()}, {@code getError()}, | ||
| * {@code succeeded()}, and {@code failed()}. | ||
| * | ||
| * <ol> | ||
| * <li>Map over a list of order IDs concurrently | ||
| * <li>Some orders intentionally fail to simulate real-world partial failures | ||
| * <li>Inspect the BatchResult to handle successes and failures separately | ||
| * </ol> | ||
| */ | ||
| public class MapErrorHandlingExample extends DurableHandler<GreetingRequest, String> { | ||
|
|
||
| @Override | ||
| public String handleRequest(GreetingRequest input, DurableContext context) { | ||
| var name = input.getName(); | ||
| context.getLogger().info("Starting map error handling example for {}", name); | ||
|
|
||
| var orderIds = List.of("order-1", "order-INVALID", "order-3", "order-ERROR", "order-5"); | ||
|
|
||
| // Map over orders — some will fail, but others continue processing | ||
| var result = context.map("process-orders", orderIds, String.class, (ctx, orderId, index) -> { | ||
| return ctx.step( | ||
| "process-" + index, | ||
| String.class, | ||
| stepCtx -> { | ||
| if (orderId.contains("INVALID")) { | ||
| throw new IllegalArgumentException("Invalid order: " + orderId); | ||
| } | ||
| if (orderId.contains("ERROR")) { | ||
| throw new RuntimeException("Processing error for: " + orderId); | ||
| } | ||
| return "Processed " + orderId + " for " + name; | ||
| }, | ||
| StepConfig.builder() | ||
| .retryStrategy(RetryStrategies.Presets.NO_RETRY) | ||
| .build()); | ||
| }); | ||
|
|
||
| context.getLogger() | ||
| .info( | ||
| "Map completed: allSucceeded={}, succeeded={}, failed={}", | ||
| result.allSucceeded(), | ||
| result.succeeded().size(), | ||
| result.failed().size()); | ||
|
|
||
| // Build a summary showing successful results and error messages | ||
| var successSummary = result.succeeded().stream().collect(Collectors.joining(", ")); | ||
|
|
||
| var errorSummary = new StringBuilder(); | ||
| for (int i = 0; i < result.size(); i++) { | ||
| if (result.getError(i) != null) { | ||
| errorSummary.append( | ||
| String.format("index %d: %s; ", i, result.getError(i).getMessage())); | ||
| } | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's add a wait before the end of each example so that we know the replay works. |
||
|
|
||
| return String.format( | ||
| "succeeded=%d, failed=%d | results=[%s] | errors=[%s]", | ||
| result.succeeded().size(), | ||
| result.failed().size(), | ||
| successSummary, | ||
| errorSummary.toString().trim()); | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep this consistent: DurableContext is always the last parameter of user provided functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove
throws Exceptionto be consistent with all other user provided functions