Skip to content

Commit 2db6bfa

Browse files
committed
feat(map): Implement Map Operation
1 parent 61f3876 commit 2db6bfa

File tree

17 files changed

+963
-1
lines changed

17 files changed

+963
-1
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples;
4+
5+
import java.util.List;
6+
import software.amazon.lambda.durable.DurableContext;
7+
import software.amazon.lambda.durable.DurableHandler;
8+
9+
/**
10+
* Example demonstrating the map operation with the Durable Execution SDK.
11+
*
12+
* <p>This handler processes a list of names concurrently using {@code map()}, where each item runs in its own child
13+
* context with full checkpoint-and-replay support.
14+
*
15+
* <ol>
16+
* <li>Create a list of names from the input
17+
* <li>Map over each name concurrently, applying a greeting transformation via a durable step
18+
* <li>Collect and join the results
19+
* </ol>
20+
*/
21+
public class SimpleMapExample extends DurableHandler<GreetingRequest, String> {
22+
23+
@Override
24+
public String handleRequest(GreetingRequest input, DurableContext context) {
25+
var name = input.getName();
26+
context.getLogger().info("Starting map example for {}", name);
27+
28+
var names = List.of(name, name.toUpperCase(), name.toLowerCase());
29+
30+
// Map over each name concurrently — each iteration runs in its own child context
31+
var result = context.map("greet-all", names, String.class, (ctx, item, index) -> {
32+
return ctx.step("greet-" + index, String.class, stepCtx -> "Hello, " + item + "!");
33+
});
34+
35+
context.getLogger().info("Map completed: allSucceeded={}, size={}", result.allSucceeded(), result.size());
36+
37+
return String.join(" | ", result.results());
38+
}
39+
}

examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,4 +539,13 @@ void testManyAsyncChildContextExample(int steps, long maxExecutionTime, long max
539539
assertTrue(minimalReplayTimeMs < maxReplayTime);
540540
assertTrue(minimalExecutionTimeMs < maxExecutionTime);
541541
}
542+
543+
@Test
544+
void testSimpleMapExample() {
545+
var runner = CloudDurableTestRunner.create(arn("simple-map-example"), GreetingRequest.class, String.class);
546+
var result = runner.run(new GreetingRequest("Alice"));
547+
548+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
549+
assertEquals("Hello, Alice! | Hello, ALICE! | Hello, alice!", result.getResult(String.class));
550+
}
542551
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples;
4+
5+
import static org.junit.jupiter.api.Assertions.*;
6+
7+
import org.junit.jupiter.api.Test;
8+
import software.amazon.lambda.durable.model.ExecutionStatus;
9+
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
10+
11+
class SimpleMapExampleTest {
12+
13+
@Test
14+
void testSimpleMapExample() {
15+
var handler = new SimpleMapExample();
16+
var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler);
17+
18+
var result = runner.runUntilComplete(new GreetingRequest("Alice"));
19+
20+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
21+
assertEquals("Hello, Alice! | Hello, ALICE! | Hello, alice!", result.getResult(String.class));
22+
}
23+
24+
@Test
25+
void testWithDefaultName() {
26+
var handler = new SimpleMapExample();
27+
var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler);
28+
29+
var result = runner.runUntilComplete(new GreetingRequest());
30+
31+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
32+
assertEquals("Hello, World! | Hello, WORLD! | Hello, world!", result.getResult(String.class));
33+
}
34+
35+
@Test
36+
void testReplay() {
37+
var handler = new SimpleMapExample();
38+
var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler);
39+
40+
var input = new GreetingRequest("Bob");
41+
var result1 = runner.runUntilComplete(input);
42+
assertEquals("Hello, Bob! | Hello, BOB! | Hello, bob!", result1.getResult(String.class));
43+
44+
// Replay — should use cached results
45+
var result2 = runner.runUntilComplete(input);
46+
assertEquals(result1.getResult(String.class), result2.getResult(String.class));
47+
}
48+
}

examples/template.yaml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,31 @@ Resources:
468468
DockerContext: ../
469469
DockerTag: durable-examples
470470

471+
SimpleMapExampleFunction:
472+
Type: AWS::Serverless::Function
473+
Properties:
474+
PackageType: Image
475+
FunctionName: !Join
476+
- ''
477+
- - 'simple-map-example'
478+
- !Ref FunctionNameSuffix
479+
ImageConfig:
480+
Command: ["software.amazon.lambda.durable.examples.SimpleMapExample::handleRequest"]
481+
DurableConfig:
482+
ExecutionTimeout: 300
483+
RetentionPeriodInDays: 7
484+
Policies:
485+
- Statement:
486+
- Effect: Allow
487+
Action:
488+
- lambda:CheckpointDurableExecutions
489+
- lambda:GetDurableExecutionState
490+
Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:simple-map-example${FunctionNameSuffix}"
491+
Metadata:
492+
Dockerfile: !Ref DockerFile
493+
DockerContext: ../
494+
DockerTag: durable-examples
495+
471496
Outputs:
472497
NoopExampleFunction:
473498
Description: Noop Example Function ARN
@@ -613,3 +638,11 @@ Outputs:
613638
Description: Many Async Child Context Example Function Name
614639
Value: !Ref ManyAsyncChildContextExampleFunction
615640

641+
SimpleMapExampleFunction:
642+
Description: Simple Map Example Function ARN
643+
Value: !GetAtt SimpleMapExampleFunction.Arn
644+
645+
SimpleMapExampleFunctionName:
646+
Description: Simple Map Example Function Name
647+
Value: !Ref SimpleMapExampleFunction
648+
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable;
4+
5+
import static org.junit.jupiter.api.Assertions.*;
6+
7+
import java.util.List;
8+
import org.junit.jupiter.api.Test;
9+
import software.amazon.lambda.durable.model.ExecutionStatus;
10+
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
11+
12+
class MapIntegrationTest {
13+
14+
@Test
15+
void testSimpleMap() {
16+
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
17+
var items = List.of("a", "b", "c");
18+
var result = context.map("process-items", items, String.class, (ctx, item, index) -> {
19+
return item.toUpperCase();
20+
});
21+
22+
assertTrue(result.allSucceeded());
23+
assertEquals(3, result.size());
24+
assertEquals("A", result.getResult(0));
25+
assertEquals("B", result.getResult(1));
26+
assertEquals("C", result.getResult(2));
27+
28+
return String.join(",", result.results());
29+
});
30+
31+
var result = runner.runUntilComplete("test");
32+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
33+
assertEquals("A,B,C", result.getResult(String.class));
34+
}
35+
36+
@Test
37+
void testMapWithStepsInsideBranches() {
38+
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
39+
var items = List.of("hello", "world");
40+
var result = context.map("map-with-steps", items, String.class, (ctx, item, index) -> {
41+
return ctx.step("process-" + index, String.class, stepCtx -> item.toUpperCase());
42+
});
43+
44+
assertTrue(result.allSucceeded());
45+
return String.join(" ", result.results());
46+
});
47+
48+
var result = runner.runUntilComplete("test");
49+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
50+
assertEquals("HELLO WORLD", result.getResult(String.class));
51+
}
52+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable;
4+
5+
/**
6+
* A {@link DurableFuture} that is already completed with a value.
7+
*
8+
* <p>Used for short-circuit cases (e.g., empty collection in map) where no checkpoint or async execution is needed.
9+
*
10+
* @param <T> the result type
11+
*/
12+
class CompletedDurableFuture<T> implements DurableFuture<T> {
13+
private final T value;
14+
15+
CompletedDurableFuture(T value) {
16+
this.value = value;
17+
}
18+
19+
@Override
20+
public T get() {
21+
return value;
22+
}
23+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable;
4+
5+
/**
6+
* Controls when a concurrent operation (map or parallel) completes.
7+
*
8+
* <p>Provides factory methods for common completion strategies and fine-grained control via {@code minSuccessful},
9+
* {@code toleratedFailureCount}, and {@code toleratedFailurePercentage}.
10+
*/
11+
public class CompletionConfig {
12+
private final Integer minSuccessful;
13+
private final Integer toleratedFailureCount;
14+
private final Double toleratedFailurePercentage;
15+
16+
private CompletionConfig(Integer minSuccessful, Integer toleratedFailureCount, Double toleratedFailurePercentage) {
17+
this.minSuccessful = minSuccessful;
18+
this.toleratedFailureCount = toleratedFailureCount;
19+
this.toleratedFailurePercentage = toleratedFailurePercentage;
20+
}
21+
22+
/** All items must succeed. Zero failures tolerated. */
23+
public static CompletionConfig allSuccessful() {
24+
return new CompletionConfig(null, 0, null);
25+
}
26+
27+
/** All items run regardless of failures. Failures captured per-item. */
28+
public static CompletionConfig allCompleted() {
29+
return new CompletionConfig(null, null, null);
30+
}
31+
32+
/** Complete as soon as the first item succeeds. */
33+
public static CompletionConfig firstSuccessful() {
34+
return new CompletionConfig(1, null, null);
35+
}
36+
37+
/** @return minimum number of successful items required, or null if not set */
38+
public Integer minSuccessful() {
39+
return minSuccessful;
40+
}
41+
42+
/** @return maximum number of failures tolerated, or null if unlimited */
43+
public Integer toleratedFailureCount() {
44+
return toleratedFailureCount;
45+
}
46+
47+
/** @return maximum percentage of failures tolerated (0.0 to 1.0), or null if not set */
48+
public Double toleratedFailurePercentage() {
49+
return toleratedFailurePercentage;
50+
}
51+
}

sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
import com.amazonaws.services.lambda.runtime.Context;
66
import java.time.Duration;
7+
import java.util.Collection;
8+
import java.util.List;
79
import java.util.Objects;
810
import java.util.function.BiConsumer;
911
import java.util.function.Function;
@@ -14,11 +16,13 @@
1416
import software.amazon.lambda.durable.execution.OperationIdGenerator;
1517
import software.amazon.lambda.durable.execution.ThreadType;
1618
import software.amazon.lambda.durable.logging.DurableLogger;
19+
import software.amazon.lambda.durable.model.BatchResult;
1720
import software.amazon.lambda.durable.model.OperationIdentifier;
1821
import software.amazon.lambda.durable.model.OperationSubType;
1922
import software.amazon.lambda.durable.operation.CallbackOperation;
2023
import software.amazon.lambda.durable.operation.ChildContextOperation;
2124
import software.amazon.lambda.durable.operation.InvokeOperation;
25+
import software.amazon.lambda.durable.operation.MapOperation;
2226
import software.amazon.lambda.durable.operation.StepOperation;
2327
import software.amazon.lambda.durable.operation.WaitOperation;
2428
import software.amazon.lambda.durable.validation.ParameterValidator;
@@ -361,6 +365,83 @@ private <T> DurableFuture<T> runInChildContextAsync(
361365
return operation;
362366
}
363367

368+
// ========== map methods ==========
369+
370+
public <I, O> BatchResult<O> map(
371+
String name, Collection<I> items, Class<O> resultType, MapFunction<I, O> function) {
372+
return mapAsync(
373+
name,
374+
items,
375+
TypeToken.get(resultType),
376+
function,
377+
MapConfig.builder().build())
378+
.get();
379+
}
380+
381+
public <I, O> BatchResult<O> map(
382+
String name, Collection<I> items, Class<O> resultType, MapFunction<I, O> function, MapConfig config) {
383+
return mapAsync(name, items, TypeToken.get(resultType), function, config)
384+
.get();
385+
}
386+
387+
public <I, O> BatchResult<O> map(
388+
String name, Collection<I> items, TypeToken<O> resultType, MapFunction<I, O> function) {
389+
return mapAsync(name, items, resultType, function, MapConfig.builder().build())
390+
.get();
391+
}
392+
393+
public <I, O> BatchResult<O> map(
394+
String name, Collection<I> items, TypeToken<O> resultType, MapFunction<I, O> function, MapConfig config) {
395+
return mapAsync(name, items, resultType, function, config).get();
396+
}
397+
398+
public <I, O> DurableFuture<BatchResult<O>> mapAsync(
399+
String name, Collection<I> items, Class<O> resultType, MapFunction<I, O> function) {
400+
return mapAsync(
401+
name,
402+
items,
403+
TypeToken.get(resultType),
404+
function,
405+
MapConfig.builder().build());
406+
}
407+
408+
public <I, O> DurableFuture<BatchResult<O>> mapAsync(
409+
String name, Collection<I> items, Class<O> resultType, MapFunction<I, O> function, MapConfig config) {
410+
return mapAsync(name, items, TypeToken.get(resultType), function, config);
411+
}
412+
413+
public <I, O> DurableFuture<BatchResult<O>> mapAsync(
414+
String name, Collection<I> items, TypeToken<O> resultType, MapFunction<I, O> function) {
415+
return mapAsync(name, items, resultType, function, MapConfig.builder().build());
416+
}
417+
418+
public <I, O> DurableFuture<BatchResult<O>> mapAsync(
419+
String name, Collection<I> items, TypeToken<O> resultType, MapFunction<I, O> function, MapConfig config) {
420+
Objects.requireNonNull(items, "items cannot be null");
421+
Objects.requireNonNull(function, "function cannot be null");
422+
Objects.requireNonNull(resultType, "resultType cannot be null");
423+
Objects.requireNonNull(config, "config cannot be null");
424+
ParameterValidator.validateOperationName(name);
425+
ParameterValidator.validateOrderedCollection(items);
426+
427+
if (config.serDes() == null) {
428+
config = config.toBuilder().serDes(getDurableConfig().getSerDes()).build();
429+
}
430+
431+
// Short-circuit for empty collections — no checkpoint overhead
432+
if (items.isEmpty()) {
433+
return new CompletedDurableFuture<>(BatchResult.empty());
434+
}
435+
436+
// Convert to List for deterministic index-based access
437+
var itemList = List.copyOf(items);
438+
var operationId = nextOperationId();
439+
440+
var operation = new MapOperation<>(operationId, name, itemList, function, resultType, config, this);
441+
operation.execute();
442+
return operation;
443+
}
444+
364445
// ========= waitForCallback methods =============
365446
public <T> T waitForCallback(String name, Class<T> resultType, BiConsumer<String, StepContext> func) {
366447
return waitForCallbackAsync(

0 commit comments

Comments
 (0)