Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,14 @@ void testAgainstRealLambda() {
| Class | Responsibility |
|-------|----------------|
| `DurableHandler<I,O>` | Lambda entry point, extend this |
| `DurableContext` | User API: `step()`, `wait()` |
| `DurableContext` | User API: `step()`, `wait()`, `map()` |
| `DurableExecutor` | Orchestrates execution lifecycle |
| `ExecutionManager` | Thread coordination, state management |
| `CheckpointBatcher` | Batches checkpoint API calls (750KB limit) |
| `StepOperation` | Executes steps with retry logic |
| `WaitOperation` | Handles wait checkpointing |
| `MapOperation` | Applies a function across items concurrently via child contexts |
| `BaseConcurrentOperation` | Shared base for map/parallel: concurrency limiting, completion evaluation |

## Common Tasks

Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Build resilient, long-running AWS Lambda functions that automatically checkpoint
- **Configurable Retries** – Built-in retry strategies with exponential backoff and jitter
- **Replay Safety** – Functions deterministically resume from checkpoints after interruptions
- **Type Safety** – Full generic type support for step results
- **Data-Driven Concurrency** – Apply a function across a collection with `map()`, with per-item error isolation and configurable completion criteria

## How It Works

Expand All @@ -30,6 +31,7 @@ Your durable function extends `DurableHandler<I, O>` and implements `handleReque
- `ctx.waitForCallback()` – Simplify callback handling by combining callback creation and submission in one operation
- `ctx.invoke()` – Invoke another Lambda function and wait for the result
- `ctx.runInChildContext()` – Run an isolated child context with its own checkpoint log
- `ctx.map()` – Apply a function to each item in a collection concurrently

## Quick Start

Expand Down Expand Up @@ -91,6 +93,7 @@ See [Deploy Lambda durable functions with Infrastructure as Code](https://docs.a
- [<u>Callbacks</u>](docs/core/callbacks.md) - Wait for external systems to respond
- [<u>Invoke</u>](docs/core/invoke.md) - Call other durable functions
- [<u>Child Contexts</u>](docs/core/child-contexts.md) - Organize complex workflows into isolated units
- [<u>Map</u>](docs/core/map.md) - Apply a function across a collection concurrently

**Examples**

Expand Down
153 changes: 153 additions & 0 deletions docs/core/map.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
## map() – Data-Driven Concurrent Execution

`map()` applies a function to each item in a collection concurrently, with each item running in its own child context. Results are collected into a `BatchResult<T>` that maintains input order.

```java
// Basic map: process items concurrently
var items = List.of("order-1", "order-2", "order-3");
var result = ctx.map("process-orders", items, OrderResult.class, (childCtx, orderId, index) -> {
return childCtx.step("fetch-" + index, OrderResult.class,
stepCtx -> orderService.process(orderId));
});

// Results maintain input order
OrderResult first = result.getResult(0); // result for "order-1"
OrderResult second = result.getResult(1); // result for "order-2"
assertTrue(result.allSucceeded());
```

Each item's function receives its own `DurableContext`, so you can use any durable operation (`step()`, `wait()`, `invoke()`, etc.) inside the map function.

### mapAsync() – Non-Blocking Map

`mapAsync()` starts the map operation without blocking, returning a `DurableFuture<BatchResult<T>>`:

```java
var future = ctx.mapAsync("process-orders", items, OrderResult.class, (childCtx, orderId, index) -> {
return childCtx.step("process-" + index, OrderResult.class, stepCtx -> process(orderId));
});

// Do other work while map runs...
var otherResult = ctx.step("other-work", String.class, stepCtx -> doOtherWork());

// Block when you need the results
BatchResult<OrderResult> result = future.get();
```

### BatchResult

`BatchResult<T>` holds ordered results and errors from the map operation:

| Method | Description |
|--------|-------------|
| `getResult(i)` | Result at index `i`, or `null` if that item failed |
| `getError(i)` | Error at index `i`, or `null` if that item succeeded |
| `allSucceeded()` | `true` if every item succeeded |
| `size()` | Number of items in the batch |
| `results()` | All results as an unmodifiable list |
| `succeeded()` | Only the non-null (successful) results |
| `failed()` | Only the non-null errors |
| `completionReason()` | Why the operation completed (`ALL_COMPLETED`, `MIN_SUCCESSFUL_REACHED`, `FAILURE_TOLERANCE_EXCEEDED`) |

### Error Isolation

One item's failure does not prevent other items from completing. Failed items are captured in the `BatchResult` at their corresponding index:

```java
var result = ctx.map("risky-work", items, String.class, (childCtx, item, index) -> {
if (item.equals("bad")) throw new RuntimeException("failed");
return item.toUpperCase();
});

assertFalse(result.allSucceeded());
assertNotNull(result.getError(1)); // the failed item
assertEquals("A", result.getResult(0)); // other items still succeed
```

### MapConfig

Configure concurrency limits and completion criteria with `MapConfig`:

```java
var config = MapConfig.builder()
.maxConcurrency(5) // at most 5 items run at once
.completionConfig(CompletionConfig.allCompleted()) // default: run all items
.build();

var result = ctx.map("process-orders", items, OrderResult.class,
(childCtx, orderId, index) -> process(childCtx, orderId), config);
```

#### Concurrency Limiting

`maxConcurrency` controls how many items execute concurrently. When set, items beyond the limit are queued and started as earlier items complete. Default is `null` (unlimited).

```java
// Sequential execution: one item at a time
var sequential = MapConfig.builder().maxConcurrency(1).build();

// Limited concurrency
var limited = MapConfig.builder().maxConcurrency(3).build();
```

#### CompletionConfig

`CompletionConfig` controls when the map operation stops starting new items:

| Factory Method | Behavior |
|----------------|----------|
| `allCompleted()` (default) | All items run regardless of failures |
| `allSuccessful()` | Stop if any item fails (zero failures tolerated) |
| `firstSuccessful()` | Stop after the first item succeeds |
| `minSuccessful(n)` | Stop after `n` items succeed |
| `toleratedFailureCount(n)` | Stop after more than `n` failures |
| `toleratedFailurePercentage(p)` | Stop when failure rate exceeds `p` (0.0–1.0) |

```java
// Stop after 2 successes
var config = MapConfig.builder()
.maxConcurrency(1)
.completionConfig(CompletionConfig.minSuccessful(2))
.build();

var result = ctx.map("find-two", items, String.class, fn, config);
assertEquals(CompletionReason.MIN_SUCCESSFUL_REACHED, result.completionReason());
```

When early termination triggers, items that were never started have `null` for both result and error in the `BatchResult`.

### Checkpoint-and-Replay

Map operations are fully durable. On replay after interruption:

- Completed items return cached results without re-execution
- Incomplete items resume from their last checkpoint
- Items that never started execute fresh

Small results (< 256KB) are checkpointed directly. Large results are reconstructed from individual child context checkpoints on replay.

### Input Collection Requirements

The input collection must have deterministic iteration order. `List`, `LinkedList`, and `TreeSet` are accepted. `HashSet` and unordered map views are rejected with `IllegalArgumentException`.

```java
// OK
ctx.map("work", List.of("a", "b"), String.class, fn);
ctx.map("work", new ArrayList<>(items), String.class, fn);

// Throws IllegalArgumentException
ctx.map("work", new HashSet<>(items), String.class, fn);
```

### MapFunction Interface

The function passed to `map()` is a `MapFunction<I, O>`:

```java
@FunctionalInterface
public interface MapFunction<I, O> {
O apply(DurableContext context, I item, int index) throws Exception;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep this consistent: DurableContext is always the last parameter of user provided functions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove throws Exception to be consistent with all other user provided functions

}
```

The `index` parameter is the zero-based position of the item in the input collection, useful for naming operations or correlating results.
11 changes: 11 additions & 0 deletions docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ DurableFuture<T> invokeAsync(String name, String functionName, U payload, Class<
DurableFuture<T> invokeAsync(String name, String functionName, U payload, TypeToken<T> resultType)
DurableFuture<T> invokeAsync(String name, String functionName, U payload, TypeToken<T> resultType, InvokeConfig config)

// Map
BatchResult<O> map(String name, Collection<I> items, Class<O> resultType, MapFunction<I, O> function)
BatchResult<O> map(String name, Collection<I> items, Class<O> resultType, MapFunction<I, O> function, MapConfig config)
BatchResult<O> map(String name, Collection<I> items, TypeToken<O> resultType, MapFunction<I, O> function)
BatchResult<O> map(String name, Collection<I> items, TypeToken<O> resultType, MapFunction<I, O> function, MapConfig config)

DurableFuture<BatchResult<O>> mapAsync(String name, Collection<I> items, Class<O> resultType, MapFunction<I, O> function)
DurableFuture<BatchResult<O>> mapAsync(String name, Collection<I> items, Class<O> resultType, MapFunction<I, O> function, MapConfig config)
DurableFuture<BatchResult<O>> mapAsync(String name, Collection<I> items, TypeToken<O> resultType, MapFunction<I, O> function)
DurableFuture<BatchResult<O>> mapAsync(String name, Collection<I> items, TypeToken<O> resultType, MapFunction<I, O> function, MapConfig config)

// Lambda context access
Context getLambdaContext()
```
Expand Down
1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ mvn test -Dtest=CloudBasedIntegrationTest \
| [RetryInProcessExample](src/main/java/com/amazonaws/lambda/durable/examples/RetryInProcessExample.java) | In-process retry with concurrent operations |
| [WaitAtLeastInProcessExample](src/main/java/com/amazonaws/lambda/durable/examples/WaitAtLeastInProcessExample.java) | Wait completes before async step (no suspension) |
| [ManyAsyncStepsExample](src/main/java/com/amazonaws/lambda/durable/examples/ManyAsyncStepsExample.java) | Performance test with 500 concurrent async steps |
| [SimpleMapExample](src/main/java/com/amazonaws/lambda/durable/examples/SimpleMapExample.java) | Concurrent map over a collection with durable steps |

## Cleanup

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.examples;

import java.util.List;
import java.util.stream.Collectors;
import software.amazon.lambda.durable.CompletionConfig;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.MapConfig;

/**
* Example demonstrating MapConfig options: concurrency limiting and completion strategies.
*
* <p>This handler runs two map operations to showcase different configurations:
*
* <ol>
* <li>Sequential processing with {@code maxConcurrency(1)} — items run one at a time
* <li>Early termination with {@code minSuccessful(2)} — stops after 2 items succeed
* </ol>
*/
public class MapConfigExample extends DurableHandler<GreetingRequest, String> {

@Override
public String handleRequest(GreetingRequest input, DurableContext context) {
var name = input.getName();
context.getLogger().info("Starting map config example for {}", name);

// Part 1: Sequential execution with maxConcurrency=1
var items = List.of("alpha", "beta", "gamma");
var sequentialConfig = MapConfig.builder().maxConcurrency(1).build();

var sequentialResult = context.map(
"sequential-processing",
items,
String.class,
(ctx, item, index) -> {
return ctx.step("transform-" + index, String.class, stepCtx -> item.toUpperCase() + "-" + name);
},
sequentialConfig);

var sequentialOutput = String.join(", ", sequentialResult.results());
context.getLogger().info("Sequential result: {}", sequentialOutput);

// Part 2: Early termination with minSuccessful(2)
var candidates = List.of("server-1", "server-2", "server-3", "server-4", "server-5");
var earlyTermConfig = MapConfig.builder()
.maxConcurrency(1)
.completionConfig(CompletionConfig.minSuccessful(2))
.build();

var earlyTermResult = context.map(
"find-healthy-servers",
candidates,
String.class,
(ctx, server, index) -> {
return ctx.step("health-check-" + index, String.class, stepCtx -> server + ":healthy");
},
earlyTermConfig);

context.getLogger()
.info(
"Early termination: reason={}, succeeded={}",
earlyTermResult.completionReason(),
earlyTermResult.succeeded().size());

var healthyServers = earlyTermResult.succeeded().stream().collect(Collectors.joining(", "));

return String.format(
"sequential=[%s] | earlyTerm=[%s] reason=%s",
sequentialOutput, healthyServers, earlyTermResult.completionReason());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.examples;

import java.util.List;
import java.util.stream.Collectors;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.StepConfig;
import software.amazon.lambda.durable.retry.RetryStrategies;

/**
* Example demonstrating error handling with the map operation.
*
* <p>Shows how individual item failures are isolated and captured in the {@code BatchResult}, while other items
* continue to succeed. Demonstrates inspecting partial results using {@code allSucceeded()}, {@code getError()},
* {@code succeeded()}, and {@code failed()}.
*
* <ol>
* <li>Map over a list of order IDs concurrently
* <li>Some orders intentionally fail to simulate real-world partial failures
* <li>Inspect the BatchResult to handle successes and failures separately
* </ol>
*/
public class MapErrorHandlingExample extends DurableHandler<GreetingRequest, String> {

@Override
public String handleRequest(GreetingRequest input, DurableContext context) {
var name = input.getName();
context.getLogger().info("Starting map error handling example for {}", name);

var orderIds = List.of("order-1", "order-INVALID", "order-3", "order-ERROR", "order-5");

// Map over orders — some will fail, but others continue processing
var result = context.map("process-orders", orderIds, String.class, (ctx, orderId, index) -> {
return ctx.step(
"process-" + index,
String.class,
stepCtx -> {
if (orderId.contains("INVALID")) {
throw new IllegalArgumentException("Invalid order: " + orderId);
}
if (orderId.contains("ERROR")) {
throw new RuntimeException("Processing error for: " + orderId);
}
return "Processed " + orderId + " for " + name;
},
StepConfig.builder()
.retryStrategy(RetryStrategies.Presets.NO_RETRY)
.build());
});

context.getLogger()
.info(
"Map completed: allSucceeded={}, succeeded={}, failed={}",
result.allSucceeded(),
result.succeeded().size(),
result.failed().size());

// Build a summary showing successful results and error messages
var successSummary = result.succeeded().stream().collect(Collectors.joining(", "));

var errorSummary = new StringBuilder();
for (int i = 0; i < result.size(); i++) {
if (result.getError(i) != null) {
errorSummary.append(
String.format("index %d: %s; ", i, result.getError(i).getMessage()));
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a wait before the end of each example so that we know the replay works.


return String.format(
"succeeded=%d, failed=%d | results=[%s] | errors=[%s]",
result.succeeded().size(),
result.failed().size(),
successSummary,
errorSummary.toString().trim());
}
}
Loading
Loading