Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public abstract class ConcurrencyOperation<T> extends BaseDurableOperation<T> {
private final Set<String> completedOperations = Collections.synchronizedSet(new HashSet<String>());
private ConcurrencyCompletionStatus completionStatus;
private OperationIdGenerator operationIdGenerator;
private final DurableContextImpl rootContext;

protected ConcurrencyOperation(
OperationIdentifier operationIdentifier,
Expand All @@ -73,6 +74,7 @@ protected ConcurrencyOperation(
this.toleratedFailureCount = toleratedFailureCount;
this.failureRateThreshold = failureRateThreshold;
this.operationIdGenerator = new OperationIdGenerator(getOperationId());
this.rootContext = durableContext.createChildContext(getOperationId(), getName());
}

protected ConcurrencyOperation(
Expand Down Expand Up @@ -142,7 +144,7 @@ public <R> ChildContextOperation<R> addItem(
String name, Function<DurableContext, R> function, TypeToken<R> resultType, SerDes serDes) {
if (isOperationCompleted()) throw new IllegalStateException("Cannot add items to a completed operation");
var operationId = this.operationIdGenerator.nextOperationId();
var childOp = createItem(operationId, name, function, resultType, serDes, getContext());
var childOp = createItem(operationId, name, function, resultType, serDes, this.rootContext);
childOperations.add(childOp);
pendingQueue.add(childOp);
logger.debug("Item added {}", name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class ConcurrencyOperationTest {
private static final TypeToken<Void> RESULT_TYPE = TypeToken.get(Void.class);

private DurableContextImpl durableContext;
private DurableContextImpl childContext;
private ExecutionManager executionManager;
private AtomicInteger operationIdCounter;
private OperationIdGenerator mockIdGenerator;
Expand All @@ -48,11 +49,20 @@ void setUp() {
executionManager = mock(ExecutionManager.class);
operationIdCounter = new AtomicInteger(0);

var childContext = mock(DurableContextImpl.class);
this.childContext = childContext;
when(childContext.getExecutionManager()).thenReturn(executionManager);
when(childContext.getDurableConfig())
.thenReturn(DurableConfig.builder()
.withExecutorService(Executors.newCachedThreadPool())
.build());

when(durableContext.getExecutionManager()).thenReturn(executionManager);
when(durableContext.getDurableConfig())
.thenReturn(DurableConfig.builder()
.withExecutorService(Executors.newCachedThreadPool())
.build());
when(durableContext.createChildContext(anyString(), anyString())).thenReturn(childContext);
when(executionManager.getCurrentThreadContext()).thenReturn(new ThreadContext("Root", ThreadType.CONTEXT));
mockIdGenerator = mock(OperationIdGenerator.class);
when(mockIdGenerator.nextOperationId()).thenAnswer(inv -> "child-" + operationIdCounter.incrementAndGet());
Expand Down Expand Up @@ -167,6 +177,18 @@ void singleChildAlreadySucceeds_fullCycle() throws Exception {
assertFalse(functionCalled.get(), "Function should not be called during SUCCEEDED replay");
}

@Test
void addItem_usesRootChildContextAsParent() throws Exception {
var op = createOperation(-1, -1, 0);

op.addItem("branch-1", ctx -> "result", TypeToken.get(String.class), SER_DES);

// rootContext is created via durableContext.createChildContext(...) in the constructor,
// so the parentContext passed to createItem must be that child context, not durableContext itself
assertNotSame(durableContext, op.getLastParentContext());
assertSame(childContext, op.getLastParentContext());
}

// ===== Helpers =====

private void runJoin(TestConcurrencyOperation op) throws InterruptedException {
Expand All @@ -182,6 +204,7 @@ static class TestConcurrencyOperation extends ConcurrencyOperation<Void> {
private boolean successHandled = false;
private boolean failureHandled = false;
private final AtomicInteger executingCount = new AtomicInteger(0);
private DurableContextImpl lastParentContext;

TestConcurrencyOperation(
OperationIdentifier operationIdentifier,
Expand Down Expand Up @@ -209,6 +232,7 @@ protected <R> ChildContextOperation<R> createItem(
TypeToken<R> resultType,
SerDes serDes,
DurableContextImpl parentContext) {
lastParentContext = parentContext;
return new ChildContextOperation<R>(
OperationIdentifier.of(operationId, name, OperationType.CONTEXT, OperationSubType.PARALLEL_BRANCH),
function,
Expand Down Expand Up @@ -260,5 +284,9 @@ boolean isSuccessHandled() {
boolean isFailureHandled() {
return failureHandled;
}

DurableContextImpl getLastParentContext() {
return lastParentContext;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,19 @@ void setUp() {
executionManager = mock(ExecutionManager.class);
operationIdCounter = new AtomicInteger(0);

var childContext = mock(DurableContextImpl.class);
when(childContext.getExecutionManager()).thenReturn(executionManager);
when(childContext.getDurableConfig())
.thenReturn(DurableConfig.builder()
.withExecutorService(Executors.newCachedThreadPool())
.build());

when(durableContext.getExecutionManager()).thenReturn(executionManager);
when(durableContext.getDurableConfig())
.thenReturn(DurableConfig.builder()
.withExecutorService(Executors.newCachedThreadPool())
.build());
when(durableContext.createChildContext(anyString(), anyString())).thenReturn(childContext);
when(executionManager.getCurrentThreadContext()).thenReturn(new ThreadContext("Root", ThreadType.CONTEXT));
// Default: no existing operations (fresh execution)
mockIdGenerator = mock(OperationIdGenerator.class);
Expand Down
Loading