diff --git a/internal/error.go b/internal/error.go index ba844c0ca..469cfa1ba 100644 --- a/internal/error.go +++ b/internal/error.go @@ -822,6 +822,7 @@ func (e *ChildWorkflowExecutionError) Unwrap() error { return e.cause } +// Error implements the error interface. func (e *NexusOperationError) Error() string { msg := fmt.Sprintf( "%s (endpoint: %q, service: %q, operation: %q, operation ID: %q, scheduledEventID: %d)", @@ -842,6 +843,7 @@ func (e *NexusOperationError) failure() *failurepb.Failure { return e.Failure } +// Unwrap returns the Cause associated with this error. func (e *NexusOperationError) Unwrap() error { return e.Cause } diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 0f58aa08d..df53899cb 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -1697,14 +1697,18 @@ func isCommandMatchEvent(d *commandpb.Command, e *historypb.HistoryEvent, obes [ eventAttributes := e.GetNexusOperationScheduledEventAttributes() commandAttributes := d.GetScheduleNexusOperationCommandAttributes() - if eventAttributes.GetService() != commandAttributes.GetService() || eventAttributes.GetOperation() != commandAttributes.GetOperation() { + return eventAttributes.GetService() == commandAttributes.GetService() && + eventAttributes.GetOperation() == commandAttributes.GetOperation() + + case enumspb.COMMAND_TYPE_REQUEST_CANCEL_NEXUS_OPERATION: + if e.GetEventType() != enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED { return false } - return true + eventAttributes := e.GetNexusOperationCancelRequestedEventAttributes() + commandAttributes := d.GetRequestCancelNexusOperationCommandAttributes() - case enumspb.COMMAND_TYPE_REQUEST_CANCEL_NEXUS_OPERATION: - return e.GetEventType() == enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED + return eventAttributes.GetScheduledEventId() == commandAttributes.GetScheduledEventId() } return false diff --git a/internal/workflow.go b/internal/workflow.go index ef0e2b91e..186c450b1 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -2125,8 +2125,10 @@ type NexusOperationOptions struct { ScheduleToCloseTimeout time.Duration } -// NexusOperationExecution is the result of [NexusOperationFuture.GetNexusOperationExecution]. +// NexusOperationExecution is the result of NexusOperationFuture.GetNexusOperationExecution. type NexusOperationExecution struct { + // Operation ID as set by the Operation's handler. May be empty if the operation hasn't started yet or completed + // synchronously. OperationID string } @@ -2141,10 +2143,17 @@ type NexusOperationFuture interface { // synchronous operations. // // NOTE: Experimental + // + // fut := nexusClient.ExecuteOperation(ctx, op, ...) + // var exec workflow.NexusOperationExecution + // if err := fut.GetNexusOperationExecution().Get(ctx, &exec); err == nil { + // // Nexus Operation started, OperationID is optionally set. + // } GetNexusOperationExecution() Future } // NexusClient is a client for executing Nexus Operations from a workflow. +// NOTE to maintainers, this interface definition is duplicated in the workflow package to provide a better UX. type NexusClient interface { // The endpoint name this client uses. // @@ -2240,7 +2249,11 @@ func (wc *workflowEnvironmentInterceptor) ExecuteNexusOperation(ctx Context, cli var operationID string seq := wc.env.ExecuteNexusOperation(params, func(r *commonpb.Payload, e error) { - mainSettable.Set(&commonpb.Payloads{Payloads: []*commonpb.Payload{r}}, e) + var payloads *commonpb.Payloads + if r != nil { + payloads = &commonpb.Payloads{Payloads: []*commonpb.Payload{r}} + } + mainSettable.Set(payloads, e) if cancellable { // future is done, we don't need cancellation anymore ctxDone.removeReceiveCallback(cancellationCallback) diff --git a/test/nexus_test.go b/test/nexus_test.go index 5b518acb0..816b045eb 100644 --- a/test/nexus_test.go +++ b/test/nexus_test.go @@ -574,7 +574,7 @@ func TestAsyncOperationFromWorkflow(t *testing.T) { } func TestReplay(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() tc := newTestContext(t, ctx) diff --git a/workflow/workflow.go b/workflow/workflow.go index 7766a5d53..33100c3df 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -72,10 +72,25 @@ type ( UpdateHandlerOptions = internal.UpdateHandlerOptions + // NOTE to maintainers, this interface definition is duplicated in the internal package to provide a better UX. + // NexusClient is a client for executing Nexus Operations from a workflow. - // - // NOTE: Experimental - NexusClient = internal.NexusClient + NexusClient interface { + // The endpoint name this client uses. + // + // NOTE: Experimental + Endpoint() string + // The service name this client uses. + // + // NOTE: Experimental + Service() string + + // ExecuteOperation executes a Nexus Operation. + // The operation argument can be a string, a [nexus.Operation] or a [nexus.OperationReference]. + // + // NOTE: Experimental + ExecuteOperation(ctx Context, operation any, input any, options NexusOperationOptions) NexusOperationFuture + } // NexusOperationOptions are options for starting a Nexus Operation from a Workflow. //