Skip to content

feat(map): Add completion config, BatchResult serialization, and thread-safe map execution#214

Merged
zhongkechen merged 1 commit intomainfrom
map_feature_2
Mar 14, 2026
Merged

feat(map): Add completion config, BatchResult serialization, and thread-safe map execution#214
zhongkechen merged 1 commit intomainfrom
map_feature_2

Conversation

@ayushiahjolia
Copy link
Contributor

…ad-safe map execution

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

Issue Link, if available

#39

Description

Improve MapOperation reliability by moving branch completion handling to CompletableFuture callbacks, tracking started branches explicitly, and checkpointing results synchronously from the context thread.

Add Jackson annotations to BatchResult for checkpoint serialization with graceful deserialization of errors. Add factory methods to CompletionConfig (minSuccessful, toleratedFailureCount, toleratedFailurePercentage). Add MapConfig/MapErrorHandling examples and comprehensive unit tests for CompletionConfig, MapConfig, MapFunction, BatchResult, ParameterValidator, and map integration.

Demo/Screenshots

Screenshot 2026-03-13 at 11 08 22 PM

Checklist

  • I have filled out every section of the PR template
  • I have thoroughly tested this change

Testing

Unit Tests

Have unit tests been written for these changes? Yes

Integration Tests

Have integration tests been written for these changes? Yes

Examples

Has a new example been added for the change? (if applicable) Yes

@ayushiahjolia ayushiahjolia requested a review from a team March 14, 2026 06:08
@ayushiahjolia ayushiahjolia self-assigned this Mar 14, 2026
```java
@FunctionalInterface
public interface MapFunction<I, O> {
O apply(DurableContext context, I item, int index) throws Exception;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep this consistent: DurableContext is always the last parameter of user provided functions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove throws Exception to be consistent with all other user provided functions

@zhongkechen zhongkechen merged commit 966b2ea into main Mar 14, 2026
11 checks passed
@zhongkechen zhongkechen deleted the map_feature_2 branch March 14, 2026 06:36
errorSummary.append(
String.format("index %d: %s; ", i, result.getError(i).getMessage()));
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a wait before the end of each example so that we know the replay works.

}

// Evaluate completion criteria
if (!earlyTermination && shouldTerminateEarly()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onChildContextComplete can be called concurrently by branches (each is a separate thread). So making earlyTermination a volatile isn't enough to prevent race condition. We have to add a lock around here to do check and modify of this variable, or make it an atomic variable.

@@ -167,6 +187,7 @@
var next = pendingQueue.poll();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onChildContextComplete can be called concurrently by branches. The code here isn't thread safe. And also pendingQueue.poll could unexpectedly block the caller if it's empty. And the logic here can be reused in branchInternal

var contextDetails = op.contextDetails();
var result = (contextDetails != null) ? contextDetails.result() : null;
return deserializeResult(result);
} else if (op.status() == OperationStatus.FAILED) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does map have FAILED status?

private final Integer maxConcurrency;
private final CompletionConfig completionConfig;
private final OperationSubType subType;
private volatile CompletionReason completionReason;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

completionReason doesn't seem necessary to be volatile, does it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants