Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execute nexus operation from a workflow #1473

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions interceptor/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ type HandleQueryInput = internal.HandleQueryInput
// NOTE: Experimental
type UpdateInput = internal.UpdateInput

// RequestCancelNexusOperationInput is the input to WorkflowOutboundInterceptor.RequestCancelNexusOperation.
//
// NOTE: Experimental
type RequestCancelNexusOperationInput = internal.RequestCancelNexusOperationInput

// WorkflowOutboundInterceptor is an interface for all workflow calls
// originating from the SDK.
//
Expand Down
5 changes: 2 additions & 3 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"sync/atomic"
"time"

"go.temporal.io/api/common/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/operatorservice/v1"
Expand Down Expand Up @@ -649,7 +648,7 @@ type (
// request ID. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation].
requestID string
// workflow completion callback. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation].
callbacks []*common.Callback
callbacks []*commonpb.Callback
}

// RetryPolicy defines the retry policy.
Expand Down Expand Up @@ -1004,6 +1003,6 @@ func SetRequestIDOnStartWorkflowOptions(opts *StartWorkflowOptions, requestID st

// SetCallbacksOnStartWorkflowOptions is an internal only method for setting callbacks on StartWorkflowOptions.
// Callbacks are purposefully not exposed to users for the time being.
func SetCallbacksOnStartWorkflowOptions(opts *StartWorkflowOptions, callbacks []*common.Callback) {
func SetCallbacksOnStartWorkflowOptions(opts *StartWorkflowOptions, callbacks []*commonpb.Callback) {
opts.callbacks = callbacks
}
48 changes: 48 additions & 0 deletions internal/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,28 @@ type (
cause error
}

// NexusOperationError is an error returned when a Nexus Operation has failed.
bergundy marked this conversation as resolved.
Show resolved Hide resolved
//
// NOTE: Experimental
NexusOperationError struct {
// The raw proto failure object this error was created from.
Failure *failurepb.Failure
// Error message.
Message string
// ID of the NexusOperationScheduled event.
ScheduledEventID int64
// Endpoint name.
Endpoint string
// Service name.
Service string
// Operation name.
Operation string
// Operation ID - may be empty if the operation completed synchronously.
OperationID string
// Chained cause - typically an ApplicationError or a CanceledError.
Cause error
}

// ChildWorkflowExecutionAlreadyStartedError is set as the cause of
// ChildWorkflowExecutionError when failure is due the child workflow having
// already started.
Expand Down Expand Up @@ -800,6 +822,32 @@ func (e *ChildWorkflowExecutionError) Unwrap() error {
return e.cause
}

// Error implements the error interface.
func (e *NexusOperationError) Error() string {
bergundy marked this conversation as resolved.
Show resolved Hide resolved
msg := fmt.Sprintf(
"%s (endpoint: %q, service: %q, operation: %q, operation ID: %q, scheduledEventID: %d)",
e.Message, e.Endpoint, e.Service, e.Operation, e.OperationID, e.ScheduledEventID)
if e.Cause != nil {
msg = fmt.Sprintf("%s: %v", msg, e.Cause)
}
return msg
}

// setFailure implements the failureHolder interface for consistency with other failure based errors..
func (e *NexusOperationError) setFailure(f *failurepb.Failure) {
e.Failure = f
}

// failure implements the failureHolder interface for consistency with other failure based errors.
func (e *NexusOperationError) failure() *failurepb.Failure {
return e.Failure
}

// Unwrap returns the Cause associated with this error.
func (e *NexusOperationError) Unwrap() error {
return e.Cause
}

// Error from error interface
func (*NamespaceNotFoundError) Error() string {
return "namespace not found"
Expand Down
20 changes: 20 additions & 0 deletions internal/failure_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,15 @@ func (dfc *DefaultFailureConverter) ErrorToFailure(err error) *failurepb.Failure
RetryState: err.retryState,
}
failure.FailureInfo = &failurepb.Failure_ChildWorkflowExecutionFailureInfo{ChildWorkflowExecutionFailureInfo: failureInfo}
case *NexusOperationError:
bergundy marked this conversation as resolved.
Show resolved Hide resolved
failureInfo := &failurepb.NexusOperationFailureInfo{
ScheduledEventId: err.ScheduledEventID,
Endpoint: err.Endpoint,
Service: err.Service,
Operation: err.Operation,
OperationId: err.OperationID,
}
failure.FailureInfo = &failurepb.Failure_NexusOperationExecutionFailureInfo{NexusOperationExecutionFailureInfo: failureInfo}
default: // All unknown errors are considered to be retryable ApplicationFailureInfo.
failureInfo := &failurepb.ApplicationFailureInfo{
Type: getErrType(err),
Expand Down Expand Up @@ -254,6 +263,17 @@ func (dfc *DefaultFailureConverter) FailureToError(failure *failurepb.Failure) e
childWorkflowExecutionFailureInfo.GetRetryState(),
dfc.FailureToError(failure.GetCause()),
)
} else if info := failure.GetNexusOperationExecutionFailureInfo(); info != nil {
err = &NexusOperationError{
Message: failure.Message,
Cause: dfc.FailureToError(failure.GetCause()),
Failure: originalFailure,
ScheduledEventID: info.GetScheduledEventId(),
Endpoint: info.GetEndpoint(),
Service: info.GetService(),
Operation: info.GetOperation(),
OperationID: info.GetOperationId(),
}
}

if err == nil {
Expand Down
23 changes: 23 additions & 0 deletions internal/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,20 @@ type HandleQueryInput struct {
Args []interface{}
}

// RequestCancelNexusOperationInput is the input to WorkflowOutboundInterceptor.RequestCancelNexusOperation.
//
// NOTE: Experimental
type RequestCancelNexusOperationInput struct {
// Client that was used to start the operation.
Client NexusClient
// Operation name.
Operation any
// Operation ID. May be empty if the operation is synchronous or has not started yet.
ID string
// seq number. For internal use only.
seq int64
}

// WorkflowOutboundInterceptor is an interface for all workflow calls
// originating from the SDK. See documentation in the interceptor package for
// more details.
Expand Down Expand Up @@ -283,6 +297,15 @@ type WorkflowOutboundInterceptor interface {
// interceptor.WorkflowHeader will return a non-nil map for this context.
NewContinueAsNewError(ctx Context, wfn interface{}, args ...interface{}) error

// ExecuteNexusOperation intercepts NexusClient.ExecuteOperation.
//
// NOTE: Experimental
ExecuteNexusOperation(ctx Context, client NexusClient, operation any, input any, options NexusOperationOptions) NexusOperationFuture
// RequestCancelNexusOperation intercepts Nexus Operation cancelation via context.
//
// NOTE: Experimental
RequestCancelNexusOperation(ctx Context, input RequestCancelNexusOperationInput)

mustEmbedWorkflowOutboundInterceptorBase()
}

Expand Down
18 changes: 18 additions & 0 deletions internal/interceptor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,24 @@ func (w *WorkflowOutboundInterceptorBase) NewContinueAsNewError(
return w.Next.NewContinueAsNewError(ctx, wfn, args...)
}

// ExecuteNexusOperation implements
// WorkflowOutboundInterceptor.ExecuteNexusOperation.
func (w *WorkflowOutboundInterceptorBase) ExecuteNexusOperation(
ctx Context,
client NexusClient,
operation any,
input any,
options NexusOperationOptions,
) NexusOperationFuture {
return w.Next.ExecuteNexusOperation(ctx, client, operation, input, options)
}

// RequestCancelNexusOperation implements
// WorkflowOutboundInterceptor.RequestCancelNexusOperation.
func (w *WorkflowOutboundInterceptorBase) RequestCancelNexusOperation(ctx Context, input RequestCancelNexusOperationInput) {
w.Next.RequestCancelNexusOperation(ctx, input)
}

func (*WorkflowOutboundInterceptorBase) mustEmbedWorkflowOutboundInterceptorBase() {}

// ClientInterceptorBase is a default implementation of ClientInterceptor meant
Expand Down
Loading
Loading