Skip to content

Commit

Permalink
Execute nexus operation from a workflow (#1473)
Browse files Browse the repository at this point in the history
* Execute nexus operation from a workflow

* Address review comments

* Add test environment support for Nexus Operations (#1475)

* Add test environment support for Nexus Operations

* Change client to not allow any direct calls
  • Loading branch information
bergundy authored Jun 19, 2024
1 parent 660c124 commit 646c0a1
Show file tree
Hide file tree
Showing 23 changed files with 2,128 additions and 123 deletions.
5 changes: 5 additions & 0 deletions interceptor/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ type HandleQueryInput = internal.HandleQueryInput
// NOTE: Experimental
type UpdateInput = internal.UpdateInput

// RequestCancelNexusOperationInput is the input to WorkflowOutboundInterceptor.RequestCancelNexusOperation.
//
// NOTE: Experimental
type RequestCancelNexusOperationInput = internal.RequestCancelNexusOperationInput

// WorkflowOutboundInterceptor is an interface for all workflow calls
// originating from the SDK.
//
Expand Down
5 changes: 2 additions & 3 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"sync/atomic"
"time"

"go.temporal.io/api/common/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/operatorservice/v1"
Expand Down Expand Up @@ -649,7 +648,7 @@ type (
// request ID. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation].
requestID string
// workflow completion callback. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation].
callbacks []*common.Callback
callbacks []*commonpb.Callback
}

// RetryPolicy defines the retry policy.
Expand Down Expand Up @@ -1004,6 +1003,6 @@ func SetRequestIDOnStartWorkflowOptions(opts *StartWorkflowOptions, requestID st

// SetCallbacksOnStartWorkflowOptions is an internal only method for setting callbacks on StartWorkflowOptions.
// Callbacks are purposefully not exposed to users for the time being.
func SetCallbacksOnStartWorkflowOptions(opts *StartWorkflowOptions, callbacks []*common.Callback) {
func SetCallbacksOnStartWorkflowOptions(opts *StartWorkflowOptions, callbacks []*commonpb.Callback) {
opts.callbacks = callbacks
}
48 changes: 48 additions & 0 deletions internal/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,28 @@ type (
cause error
}

// NexusOperationError is an error returned when a Nexus Operation has failed.
//
// NOTE: Experimental
NexusOperationError struct {
// The raw proto failure object this error was created from.
Failure *failurepb.Failure
// Error message.
Message string
// ID of the NexusOperationScheduled event.
ScheduledEventID int64
// Endpoint name.
Endpoint string
// Service name.
Service string
// Operation name.
Operation string
// Operation ID - may be empty if the operation completed synchronously.
OperationID string
// Chained cause - typically an ApplicationError or a CanceledError.
Cause error
}

// ChildWorkflowExecutionAlreadyStartedError is set as the cause of
// ChildWorkflowExecutionError when failure is due the child workflow having
// already started.
Expand Down Expand Up @@ -800,6 +822,32 @@ func (e *ChildWorkflowExecutionError) Unwrap() error {
return e.cause
}

// Error implements the error interface.
func (e *NexusOperationError) Error() string {
msg := fmt.Sprintf(
"%s (endpoint: %q, service: %q, operation: %q, operation ID: %q, scheduledEventID: %d)",
e.Message, e.Endpoint, e.Service, e.Operation, e.OperationID, e.ScheduledEventID)
if e.Cause != nil {
msg = fmt.Sprintf("%s: %v", msg, e.Cause)
}
return msg
}

// setFailure implements the failureHolder interface for consistency with other failure based errors..
func (e *NexusOperationError) setFailure(f *failurepb.Failure) {
e.Failure = f
}

// failure implements the failureHolder interface for consistency with other failure based errors.
func (e *NexusOperationError) failure() *failurepb.Failure {
return e.Failure
}

// Unwrap returns the Cause associated with this error.
func (e *NexusOperationError) Unwrap() error {
return e.Cause
}

// Error from error interface
func (*NamespaceNotFoundError) Error() string {
return "namespace not found"
Expand Down
20 changes: 20 additions & 0 deletions internal/failure_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,15 @@ func (dfc *DefaultFailureConverter) ErrorToFailure(err error) *failurepb.Failure
RetryState: err.retryState,
}
failure.FailureInfo = &failurepb.Failure_ChildWorkflowExecutionFailureInfo{ChildWorkflowExecutionFailureInfo: failureInfo}
case *NexusOperationError:
failureInfo := &failurepb.NexusOperationFailureInfo{
ScheduledEventId: err.ScheduledEventID,
Endpoint: err.Endpoint,
Service: err.Service,
Operation: err.Operation,
OperationId: err.OperationID,
}
failure.FailureInfo = &failurepb.Failure_NexusOperationExecutionFailureInfo{NexusOperationExecutionFailureInfo: failureInfo}
default: // All unknown errors are considered to be retryable ApplicationFailureInfo.
failureInfo := &failurepb.ApplicationFailureInfo{
Type: getErrType(err),
Expand Down Expand Up @@ -254,6 +263,17 @@ func (dfc *DefaultFailureConverter) FailureToError(failure *failurepb.Failure) e
childWorkflowExecutionFailureInfo.GetRetryState(),
dfc.FailureToError(failure.GetCause()),
)
} else if info := failure.GetNexusOperationExecutionFailureInfo(); info != nil {
err = &NexusOperationError{
Message: failure.Message,
Cause: dfc.FailureToError(failure.GetCause()),
Failure: originalFailure,
ScheduledEventID: info.GetScheduledEventId(),
Endpoint: info.GetEndpoint(),
Service: info.GetService(),
Operation: info.GetOperation(),
OperationID: info.GetOperationId(),
}
}

if err == nil {
Expand Down
23 changes: 23 additions & 0 deletions internal/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,20 @@ type HandleQueryInput struct {
Args []interface{}
}

// RequestCancelNexusOperationInput is the input to WorkflowOutboundInterceptor.RequestCancelNexusOperation.
//
// NOTE: Experimental
type RequestCancelNexusOperationInput struct {
// Client that was used to start the operation.
Client NexusClient
// Operation name.
Operation any
// Operation ID. May be empty if the operation is synchronous or has not started yet.
ID string
// seq number. For internal use only.
seq int64
}

// WorkflowOutboundInterceptor is an interface for all workflow calls
// originating from the SDK. See documentation in the interceptor package for
// more details.
Expand Down Expand Up @@ -283,6 +297,15 @@ type WorkflowOutboundInterceptor interface {
// interceptor.WorkflowHeader will return a non-nil map for this context.
NewContinueAsNewError(ctx Context, wfn interface{}, args ...interface{}) error

// ExecuteNexusOperation intercepts NexusClient.ExecuteOperation.
//
// NOTE: Experimental
ExecuteNexusOperation(ctx Context, client NexusClient, operation any, input any, options NexusOperationOptions) NexusOperationFuture
// RequestCancelNexusOperation intercepts Nexus Operation cancelation via context.
//
// NOTE: Experimental
RequestCancelNexusOperation(ctx Context, input RequestCancelNexusOperationInput)

mustEmbedWorkflowOutboundInterceptorBase()
}

Expand Down
18 changes: 18 additions & 0 deletions internal/interceptor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,24 @@ func (w *WorkflowOutboundInterceptorBase) NewContinueAsNewError(
return w.Next.NewContinueAsNewError(ctx, wfn, args...)
}

// ExecuteNexusOperation implements
// WorkflowOutboundInterceptor.ExecuteNexusOperation.
func (w *WorkflowOutboundInterceptorBase) ExecuteNexusOperation(
ctx Context,
client NexusClient,
operation any,
input any,
options NexusOperationOptions,
) NexusOperationFuture {
return w.Next.ExecuteNexusOperation(ctx, client, operation, input, options)
}

// RequestCancelNexusOperation implements
// WorkflowOutboundInterceptor.RequestCancelNexusOperation.
func (w *WorkflowOutboundInterceptorBase) RequestCancelNexusOperation(ctx Context, input RequestCancelNexusOperationInput) {
w.Next.RequestCancelNexusOperation(ctx, input)
}

func (*WorkflowOutboundInterceptorBase) mustEmbedWorkflowOutboundInterceptorBase() {}

// ClientInterceptorBase is a default implementation of ClientInterceptor meant
Expand Down
Loading

0 comments on commit 646c0a1

Please sign in to comment.