Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bergundy committed May 17, 2024
1 parent 4f2835b commit dcfa8cf
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 10 deletions.
2 changes: 2 additions & 0 deletions internal/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,7 @@ func (e *ChildWorkflowExecutionError) Unwrap() error {
return e.cause
}

// Error implements the error interface.
func (e *NexusOperationError) Error() string {
msg := fmt.Sprintf(
"%s (endpoint: %q, service: %q, operation: %q, operation ID: %q, scheduledEventID: %d)",
Expand All @@ -842,6 +843,7 @@ func (e *NexusOperationError) failure() *failurepb.Failure {
return e.Failure
}

// Unwrap returns the Cause associated with this error.
func (e *NexusOperationError) Unwrap() error {
return e.Cause
}
Expand Down
12 changes: 8 additions & 4 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1697,14 +1697,18 @@ func isCommandMatchEvent(d *commandpb.Command, e *historypb.HistoryEvent, obes [
eventAttributes := e.GetNexusOperationScheduledEventAttributes()
commandAttributes := d.GetScheduleNexusOperationCommandAttributes()

if eventAttributes.GetService() != commandAttributes.GetService() || eventAttributes.GetOperation() != commandAttributes.GetOperation() {
return eventAttributes.GetService() == commandAttributes.GetService() &&
eventAttributes.GetOperation() == commandAttributes.GetOperation()

case enumspb.COMMAND_TYPE_REQUEST_CANCEL_NEXUS_OPERATION:
if e.GetEventType() != enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED {
return false
}

return true
eventAttributes := e.GetNexusOperationCancelRequestedEventAttributes()
commandAttributes := d.GetRequestCancelNexusOperationCommandAttributes()

case enumspb.COMMAND_TYPE_REQUEST_CANCEL_NEXUS_OPERATION:
return e.GetEventType() == enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED
return eventAttributes.GetScheduledEventId() == commandAttributes.GetScheduledEventId()
}

return false
Expand Down
17 changes: 15 additions & 2 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2125,8 +2125,10 @@ type NexusOperationOptions struct {
ScheduleToCloseTimeout time.Duration
}

// NexusOperationExecution is the result of [NexusOperationFuture.GetNexusOperationExecution].
// NexusOperationExecution is the result of NexusOperationFuture.GetNexusOperationExecution.
type NexusOperationExecution struct {
// Operation ID as set by the Operation's handler. May be empty if the operation hasn't started yet or completed
// synchronously.
OperationID string
}

Expand All @@ -2141,10 +2143,17 @@ type NexusOperationFuture interface {
// synchronous operations.
//
// NOTE: Experimental
//
// fut := nexusClient.ExecuteOperation(ctx, op, ...)
// var exec workflow.NexusOperationExecution
// if err := fut.GetNexusOperationExecution().Get(ctx, &exec); err == nil {
// // Nexus Operation started, OperationID is optionally set.
// }
GetNexusOperationExecution() Future
}

// NexusClient is a client for executing Nexus Operations from a workflow.
// NOTE to maintainers, this interface definition is duplicated in the workflow package to provide a better UX.
type NexusClient interface {
// The endpoint name this client uses.
//
Expand Down Expand Up @@ -2240,7 +2249,11 @@ func (wc *workflowEnvironmentInterceptor) ExecuteNexusOperation(ctx Context, cli

var operationID string
seq := wc.env.ExecuteNexusOperation(params, func(r *commonpb.Payload, e error) {
mainSettable.Set(&commonpb.Payloads{Payloads: []*commonpb.Payload{r}}, e)
var payloads *commonpb.Payloads
if r != nil {
payloads = &commonpb.Payloads{Payloads: []*commonpb.Payload{r}}
}
mainSettable.Set(payloads, e)
if cancellable {
// future is done, we don't need cancellation anymore
ctxDone.removeReceiveCallback(cancellationCallback)
Expand Down
2 changes: 1 addition & 1 deletion test/nexus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ func TestAsyncOperationFromWorkflow(t *testing.T) {
}

func TestReplay(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
tc := newTestContext(t, ctx)

Expand Down
21 changes: 18 additions & 3 deletions workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,25 @@ type (

UpdateHandlerOptions = internal.UpdateHandlerOptions

// NOTE to maintainers, this interface definition is duplicated in the internal package to provide a better UX.

// NexusClient is a client for executing Nexus Operations from a workflow.
//
// NOTE: Experimental
NexusClient = internal.NexusClient
NexusClient interface {
// The endpoint name this client uses.
//
// NOTE: Experimental
Endpoint() string
// The service name this client uses.
//
// NOTE: Experimental
Service() string

// ExecuteOperation executes a Nexus Operation.
// The operation argument can be a string, a [nexus.Operation] or a [nexus.OperationReference].
//
// NOTE: Experimental
ExecuteOperation(ctx Context, operation any, input any, options NexusOperationOptions) NexusOperationFuture
}

// NexusOperationOptions are options for starting a Nexus Operation from a Workflow.
//
Expand Down

0 comments on commit dcfa8cf

Please sign in to comment.