Skip to content

Commit f4c5388

Browse files
committed
Use SendEventAction for entity ops to fix callEntity with Azure Functions extension
1 parent b536473 commit f4c5388

File tree

4 files changed

+339
-52
lines changed

4 files changed

+339
-52
lines changed

client/src/main/java/com/microsoft/durabletask/TaskOrchestrationContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -757,4 +757,4 @@ default Task<AutoCloseable> lockEntities(@Nonnull EntityInstanceId... entityIds)
757757
* Clears the orchestration's custom status.
758758
*/
759759
void clearCustomStatus();
760-
}
760+
}

client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java

Lines changed: 108 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
import com.fasterxml.jackson.databind.JsonNode;
66
import com.fasterxml.jackson.databind.ObjectMapper;
7+
import com.fasterxml.jackson.databind.node.ArrayNode;
8+
import com.fasterxml.jackson.databind.node.ObjectNode;
79
import com.google.protobuf.StringValue;
810
import com.google.protobuf.Timestamp;
911
import com.microsoft.durabletask.interruption.ContinueAsNewInterruption;
@@ -64,7 +66,11 @@ public TaskOrchestratorResult execute(List<HistoryEvent> pastEvents, List<Histor
6466
while (context.processNextEvent()) { /* no method body */ }
6567
completed = true;
6668
} catch (OrchestratorBlockedException orchestratorBlockedException) {
67-
logger.fine("The orchestrator has yielded and will await for new events.");
69+
logger.info(String.format(
70+
"%s: Orchestrator yielded. Waiting for events. Outstanding event keys: %s, Pending actions: %d",
71+
context.instanceId,
72+
context.outstandingEvents.keySet(),
73+
context.pendingActions.size()));
6874
} catch (ContinueAsNewInterruption continueAsNewInterruption) {
6975
logger.fine("The orchestrator has continued as new.");
7076
context.complete(null);
@@ -376,22 +382,31 @@ public void signalEntity(EntityInstanceId entityId, String operationName, Object
376382
String requestId = this.newUUID().toString();
377383
String serializedInput = this.dataConverter.serialize(input);
378384

379-
EntityOperationSignaledEvent.Builder signalBuilder = EntityOperationSignaledEvent.newBuilder()
380-
.setRequestId(requestId)
381-
.setOperation(operationName)
382-
.setTargetInstanceId(StringValue.of(entityId.toString()));
385+
// Build DTFx RequestMessage JSON payload matching the legacy format that the
386+
// Azure Functions extension (DTFx backend) understands. The extension processes
387+
// entity messages as external events (SendEventAction), NOT the newer proto-native
388+
// SendEntityMessageAction which is designed for the DTS backend.
389+
ObjectNode requestMessage = JSON_MAPPER.createObjectNode();
390+
requestMessage.put("op", operationName);
391+
requestMessage.put("signal", true);
383392
if (serializedInput != null) {
384-
signalBuilder.setInput(StringValue.of(serializedInput));
393+
requestMessage.put("input", serializedInput);
385394
}
395+
requestMessage.put("id", requestId);
396+
String eventName = "op";
386397
if (options != null && options.getScheduledTime() != null) {
387-
signalBuilder.setScheduledTime(DataConverter.getTimestampFromInstant(options.getScheduledTime()));
398+
String scheduledTimeStr = options.getScheduledTime().toString();
399+
requestMessage.put("due", scheduledTimeStr);
400+
eventName = "op@" + scheduledTimeStr;
388401
}
389402

390403
this.pendingActions.put(id, OrchestratorAction.newBuilder()
391404
.setId(id)
392-
.setSendEntityMessage(SendEntityMessageAction.newBuilder()
393-
.setEntityOperationSignaled(signalBuilder)
394-
.build())
405+
.setSendEvent(SendEventAction.newBuilder()
406+
.setInstance(OrchestrationInstance.newBuilder()
407+
.setInstanceId(entityId.toString()))
408+
.setName(eventName)
409+
.setData(StringValue.of(requestMessage.toString())))
395410
.build());
396411

397412
if (!this.isReplaying) {
@@ -429,27 +444,31 @@ public <V> Task<V> callEntity(EntityInstanceId entityId, String operationName, O
429444
String requestId = this.newUUID().toString();
430445
String serializedInput = this.dataConverter.serialize(input);
431446

432-
EntityOperationCalledEvent.Builder callBuilder = EntityOperationCalledEvent.newBuilder()
433-
.setRequestId(requestId)
434-
.setOperation(operationName)
435-
.setTargetInstanceId(StringValue.of(entityId.toString()))
436-
.setParentInstanceId(StringValue.of(this.instanceId));
437-
if (this.executionId != null) {
438-
callBuilder.setParentExecutionId(StringValue.of(this.executionId));
439-
}
447+
// Build DTFx RequestMessage JSON for entity call (two-way operation).
448+
// Uses SendEventAction (external event) instead of SendEntityMessageAction for
449+
// compatibility with the Azure Functions extension (DTFx backend).
450+
ObjectNode requestMessage = JSON_MAPPER.createObjectNode();
451+
requestMessage.put("op", operationName);
440452
if (serializedInput != null) {
441-
callBuilder.setInput(StringValue.of(serializedInput));
453+
requestMessage.put("input", serializedInput);
454+
}
455+
requestMessage.put("id", requestId);
456+
requestMessage.put("parent", this.instanceId);
457+
if (this.executionId != null) {
458+
requestMessage.put("parentExecution", this.executionId);
442459
}
443460

444461
this.pendingActions.put(id, OrchestratorAction.newBuilder()
445462
.setId(id)
446-
.setSendEntityMessage(SendEntityMessageAction.newBuilder()
447-
.setEntityOperationCalled(callBuilder)
448-
.build())
463+
.setSendEvent(SendEventAction.newBuilder()
464+
.setInstance(OrchestrationInstance.newBuilder()
465+
.setInstanceId(entityId.toString()))
466+
.setName("op")
467+
.setData(StringValue.of(requestMessage.toString())))
449468
.build());
450469

451470
if (!this.isReplaying) {
452-
this.logger.fine(() -> String.format(
471+
this.logger.info(() -> String.format(
453472
"%s: calling entity '%s' operation '%s' (#%d) requestId=%s",
454473
this.instanceId,
455474
entityId,
@@ -520,20 +539,32 @@ public Task<AutoCloseable> lockEntities(List<EntityInstanceId> entityIds) {
520539
lockSet.add(eid.toString());
521540
}
522541

523-
// Send a lock request for each entity in sorted order
524-
for (int position = 0; position < sortedIds.size(); position++) {
542+
// Send a lock request to the FIRST entity in the sorted lock set.
543+
// DTFx entity infrastructure handles chaining the lock acquisition
544+
// through subsequent entities in the lock set.
545+
{
525546
int id = this.sequenceNumber++;
526-
EntityLockRequestedEvent.Builder lockBuilder = EntityLockRequestedEvent.newBuilder()
527-
.setCriticalSectionId(criticalSectionId)
528-
.addAllLockSet(lockSet)
529-
.setPosition(position)
530-
.setParentInstanceId(StringValue.of(this.instanceId));
547+
ObjectNode lockRequestMessage = JSON_MAPPER.createObjectNode();
548+
lockRequestMessage.putNull("op");
549+
lockRequestMessage.put("id", criticalSectionId);
550+
ArrayNode lockSetArray = lockRequestMessage.putArray("lockset");
551+
for (EntityInstanceId eid : sortedIds) {
552+
ObjectNode entityIdNode = JSON_MAPPER.createObjectNode();
553+
entityIdNode.put("name", eid.getName());
554+
entityIdNode.put("key", eid.getKey());
555+
lockSetArray.add(entityIdNode);
556+
}
557+
lockRequestMessage.put("pos", 0);
558+
lockRequestMessage.put("parent", this.instanceId);
531559

560+
String targetEntityId = lockSet.get(0);
532561
this.pendingActions.put(id, OrchestratorAction.newBuilder()
533562
.setId(id)
534-
.setSendEntityMessage(SendEntityMessageAction.newBuilder()
535-
.setEntityLockRequested(lockBuilder)
536-
.build())
563+
.setSendEvent(SendEventAction.newBuilder()
564+
.setInstance(OrchestrationInstance.newBuilder()
565+
.setInstanceId(targetEntityId))
566+
.setName("op")
567+
.setData(StringValue.of(lockRequestMessage.toString())))
537568
.build());
538569
}
539570

@@ -561,16 +592,18 @@ public Task<AutoCloseable> lockEntities(List<EntityInstanceId> entityIds) {
561592
// Release all locks
562593
for (EntityInstanceId lockedEntity : sortedIds) {
563594
int unlockId = this.sequenceNumber++;
564-
EntityUnlockSentEvent.Builder unlockBuilder = EntityUnlockSentEvent.newBuilder()
565-
.setCriticalSectionId(criticalSectionId)
566-
.setParentInstanceId(StringValue.of(this.instanceId))
567-
.setTargetInstanceId(StringValue.of(lockedEntity.toString()));
595+
// Build DTFx ReleaseMessage JSON for releasing entity locks
596+
ObjectNode releaseMessage = JSON_MAPPER.createObjectNode();
597+
releaseMessage.put("parent", this.instanceId);
598+
releaseMessage.put("id", criticalSectionId);
568599

569600
this.pendingActions.put(unlockId, OrchestratorAction.newBuilder()
570601
.setId(unlockId)
571-
.setSendEntityMessage(SendEntityMessageAction.newBuilder()
572-
.setEntityUnlockSent(unlockBuilder)
573-
.build())
602+
.setSendEvent(SendEventAction.newBuilder()
603+
.setInstance(OrchestrationInstance.newBuilder()
604+
.setInstanceId(lockedEntity.toString()))
605+
.setName("release")
606+
.setData(StringValue.of(releaseMessage.toString())))
574607
.build());
575608
}
576609

@@ -839,6 +872,13 @@ private void handleEventRaised(HistoryEvent e) {
839872
Queue<TaskRecord<?>> outstandingEventQueue = this.outstandingEvents.get(eventName);
840873
if (outstandingEventQueue == null) {
841874
// No code is waiting for this event. Buffer it in case user-code waits for it later.
875+
if (!this.isReplaying) {
876+
this.logger.info(() -> String.format(
877+
"%s: Received EventRaised '%s' but no outstanding waiter found. Buffering as unprocessed. Raw input: %s",
878+
this.instanceId,
879+
eventName,
880+
eventRaised.getInput().getValue()));
881+
}
842882
this.unprocessedEvents.add(e);
843883
return;
844884
}
@@ -856,6 +896,14 @@ private void handleEventRaised(HistoryEvent e) {
856896
// responses in a ResponseMessage JSON format: {"result":"<value>","errorMessage":...,"failureDetails":...}
857897
// We detect entity call responses by checking if the task record has an associated entityId.
858898
if (matchingTaskRecord.getEntityId() != null) {
899+
if (!this.isReplaying) {
900+
this.logger.info(() -> String.format(
901+
"%s: Routing EventRaised '%s' to entity response handler for entity '%s'. Raw result: %s",
902+
this.instanceId,
903+
eventName,
904+
matchingTaskRecord.getEntityId(),
905+
rawResult != null ? rawResult : "(null)"));
906+
}
859907
this.handleEntityResponseFromEventRaised(matchingTaskRecord, rawResult);
860908
} else {
861909
try {
@@ -954,7 +1002,7 @@ private void handleEntityResponseFromEventRaised(TaskRecord<?> matchingTaskRecor
9541002
String innerResult = (resultNode == null || resultNode.isNull()) ? null : resultNode.asText();
9551003

9561004
if (!this.isReplaying) {
957-
this.logger.fine(() -> String.format(
1005+
this.logger.info(() -> String.format(
9581006
"%s: Entity operation on '%s' completed via EventRaised with result: %s",
9591007
this.instanceId,
9601008
matchingTaskRecord.getEntityId(),
@@ -972,6 +1020,14 @@ private void handleEntityResponseFromEventRaised(TaskRecord<?> matchingTaskRecor
9721020
}
9731021
}
9741022

1023+
private void handleEventSent(HistoryEvent e) {
1024+
// During replay, remove the pending action so we don't re-send already-processed
1025+
// events. This applies to entity operations (signal, call, lock, unlock) which
1026+
// now use SendEventAction, as well as regular sendEvent calls.
1027+
int taskId = e.getEventId();
1028+
this.pendingActions.remove(taskId);
1029+
}
1030+
9751031
// region Entity event handlers (Phase 4)
9761032

9771033
private void handleEntityOperationSignaled(HistoryEvent e) {
@@ -1003,7 +1059,7 @@ private void handleEntityOperationCompleted(HistoryEvent e) {
10031059

10041060
Queue<TaskRecord<?>> outstandingQueue = this.outstandingEvents.get(requestId);
10051061
if (outstandingQueue == null) {
1006-
this.logger.warning("Discarding entity operation completed event with requestId=" + requestId + ": no waiter found");
1062+
this.logger.warning("Discarding entity operation completed event with requestId=" + requestId + ": no waiter found. Outstanding event keys: " + this.outstandingEvents.keySet());
10071063
return;
10081064
}
10091065

@@ -1015,7 +1071,7 @@ private void handleEntityOperationCompleted(HistoryEvent e) {
10151071
String rawResult = completedEvent.hasOutput() ? completedEvent.getOutput().getValue() : null;
10161072

10171073
if (!this.isReplaying) {
1018-
this.logger.fine(() -> String.format(
1074+
this.logger.info(() -> String.format(
10191075
"%s: Entity operation completed for requestId=%s with output: %s",
10201076
this.instanceId,
10211077
requestId,
@@ -1049,7 +1105,7 @@ private void handleEntityOperationFailed(HistoryEvent e) {
10491105
FailureDetails details = new FailureDetails(failedEvent.getFailureDetails());
10501106

10511107
if (!this.isReplaying) {
1052-
this.logger.fine(() -> String.format(
1108+
this.logger.info(() -> String.format(
10531109
"%s: Entity operation failed for requestId=%s: %s",
10541110
this.instanceId,
10551111
requestId,
@@ -1370,6 +1426,13 @@ private boolean processNextEvent() {
13701426
}
13711427

13721428
private void processEvent(HistoryEvent e) {
1429+
if (!this.isReplaying) {
1430+
this.logger.info(() -> String.format(
1431+
"%s: Processing new event: %s (eventId=%d)",
1432+
this.instanceId,
1433+
e.getEventTypeCase(),
1434+
e.getEventId()));
1435+
}
13731436
boolean overrideSuspension = e.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONRESUMED || e.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONTERMINATED;
13741437
if (this.isSuspended && !overrideSuspension) {
13751438
this.handleEventWhileSuspended(e);
@@ -1441,7 +1504,7 @@ private void processEvent(HistoryEvent e) {
14411504
this.handleSubOrchestrationFailed(e);
14421505
break;
14431506
case EVENTSENT:
1444-
// No action needed — sendEvent is fire-and-forget with no replay validation
1507+
this.handleEventSent(e);
14451508
break;
14461509
case EVENTRAISED:
14471510
this.handleEventRaised(e);

0 commit comments

Comments
 (0)