feat(map): Add completion config, BatchResult serialization, and thread-safe map execution#214
feat(map): Add completion config, BatchResult serialization, and thread-safe map execution#214zhongkechen merged 1 commit intomainfrom
Conversation
…ad-safe map execution
e3fe797 to
9fa7eaa
Compare
| ```java | ||
| @FunctionalInterface | ||
| public interface MapFunction<I, O> { | ||
| O apply(DurableContext context, I item, int index) throws Exception; |
There was a problem hiding this comment.
Let's keep this consistent: DurableContext is always the last parameter of user provided functions
There was a problem hiding this comment.
Let's remove throws Exception to be consistent with all other user provided functions
| errorSummary.append( | ||
| String.format("index %d: %s; ", i, result.getError(i).getMessage())); | ||
| } | ||
| } |
There was a problem hiding this comment.
let's add a wait before the end of each example so that we know the replay works.
| } | ||
|
|
||
| // Evaluate completion criteria | ||
| if (!earlyTermination && shouldTerminateEarly()) { |
There was a problem hiding this comment.
onChildContextComplete can be called concurrently by branches (each is a separate thread). So making earlyTermination a volatile isn't enough to prevent race condition. We have to add a lock around here to do check and modify of this variable, or make it an atomic variable.
| @@ -167,6 +187,7 @@ | |||
| var next = pendingQueue.poll(); | |||
There was a problem hiding this comment.
onChildContextComplete can be called concurrently by branches. The code here isn't thread safe. And also pendingQueue.poll could unexpectedly block the caller if it's empty. And the logic here can be reused in branchInternal
| var contextDetails = op.contextDetails(); | ||
| var result = (contextDetails != null) ? contextDetails.result() : null; | ||
| return deserializeResult(result); | ||
| } else if (op.status() == OperationStatus.FAILED) { |
There was a problem hiding this comment.
Does map have FAILED status?
| private final Integer maxConcurrency; | ||
| private final CompletionConfig completionConfig; | ||
| private final OperationSubType subType; | ||
| private volatile CompletionReason completionReason; |
There was a problem hiding this comment.
completionReason doesn't seem necessary to be volatile, does it?
…ad-safe map execution
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
Issue Link, if available
#39
Description
Improve MapOperation reliability by moving branch completion handling to CompletableFuture callbacks, tracking started branches explicitly, and checkpointing results synchronously from the context thread.
Add Jackson annotations to BatchResult for checkpoint serialization with graceful deserialization of errors. Add factory methods to CompletionConfig (minSuccessful, toleratedFailureCount, toleratedFailurePercentage). Add MapConfig/MapErrorHandling examples and comprehensive unit tests for CompletionConfig, MapConfig, MapFunction, BatchResult, ParameterValidator, and map integration.
Demo/Screenshots
Checklist
Testing
Unit Tests
Have unit tests been written for these changes? Yes
Integration Tests
Have integration tests been written for these changes? Yes
Examples
Has a new example been added for the change? (if applicable) Yes