Nexus Standalone: Terminate + Cancel#9624
Nexus Standalone: Terminate + Cancel#9624stephanos wants to merge 20 commits intofeature/nexus-standalonefrom
Conversation
a69b3a4 to
3bc48be
Compare
| len(req.GetIdentity()), config.MaxIDLengthLimit()) | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
^ I considered re-using these by introducing an interface or two, but then again ... maybe that's overkill.
5a99e83 to
c7313a0
Compare
| s.Contains(err.Error(), "operation_id is required") | ||
| }) | ||
| } | ||
|
|
There was a problem hiding this comment.
^ will need to verify and maybe tweak tests once everything's implemented
c7313a0 to
d1b9216
Compare
bergundy
left a comment
There was a problem hiding this comment.
The only thing that's missing for me is request ID dedupe.
chasm/lib/nexusoperation/handler.go
Outdated
| }) | ||
|
|
||
| resp, _, err := chasm.UpdateComponent(ctx, ref, func(o *Operation, ctx chasm.MutableContext, req *nexusoperationpb.TerminateNexusOperationRequest) (*nexusoperationpb.TerminateNexusOperationResponse, error) { | ||
| _, err := o.Terminate(ctx, chasm.TerminateComponentRequest{ |
There was a problem hiding this comment.
Missing a request ID here (not sure if this is already in the API but it should be.
chasm/lib/nexusoperation/handler.go
Outdated
| }) | ||
|
|
||
| resp, _, err := chasm.UpdateComponent(ctx, ref, func(o *Operation, ctx chasm.MutableContext, req *nexusoperationpb.RequestCancelNexusOperationRequest) (*nexusoperationpb.RequestCancelNexusOperationResponse, error) { | ||
| if err := o.Cancel(ctx, nil); err != nil { |
There was a problem hiding this comment.
Let's dedupe here based on a cancel request ID. See SAA for reference.
| len(req.GetIdentity()), config.MaxIDLengthLimit()) | ||
| } | ||
| return nil | ||
| } |
| func validateRequestCancelNexusOperationExecutionRequest(req *workflowservice.RequestCancelNexusOperationExecutionRequest, config *Config) error { | ||
| if req.GetOperationId() == "" { | ||
| return serviceerror.NewInvalidArgument("operation_id is required") | ||
| } | ||
| if len(req.GetOperationId()) > config.MaxIDLengthLimit() { | ||
| return serviceerror.NewInvalidArgumentf("operation_id exceeds length limit. Length=%d Limit=%d", | ||
| len(req.GetOperationId()), config.MaxIDLengthLimit()) | ||
| } | ||
| if len(req.GetRunId()) > config.MaxIDLengthLimit() { | ||
| return serviceerror.NewInvalidArgumentf("run_id exceeds length limit. Length=%d Limit=%d", | ||
| len(req.GetRunId()), config.MaxIDLengthLimit()) | ||
| } | ||
| if len(req.GetIdentity()) > config.MaxIDLengthLimit() { | ||
| return serviceerror.NewInvalidArgumentf("identity exceeds length limit. Length=%d Limit=%d", | ||
| len(req.GetIdentity()), config.MaxIDLengthLimit()) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func validateTerminateNexusOperationExecutionRequest(req *workflowservice.TerminateNexusOperationExecutionRequest, config *Config) error { | ||
| if req.GetOperationId() == "" { | ||
| return serviceerror.NewInvalidArgument("operation_id is required") | ||
| } | ||
| if len(req.GetOperationId()) > config.MaxIDLengthLimit() { | ||
| return serviceerror.NewInvalidArgumentf("operation_id exceeds length limit. Length=%d Limit=%d", | ||
| len(req.GetOperationId()), config.MaxIDLengthLimit()) | ||
| } | ||
| if len(req.GetRunId()) > config.MaxIDLengthLimit() { | ||
| return serviceerror.NewInvalidArgumentf("run_id exceeds length limit. Length=%d Limit=%d", | ||
| len(req.GetRunId()), config.MaxIDLengthLimit()) | ||
| } | ||
| if len(req.GetIdentity()) > config.MaxIDLengthLimit() { | ||
| return serviceerror.NewInvalidArgumentf("identity exceeds length limit. Length=%d Limit=%d", | ||
| len(req.GetIdentity()), config.MaxIDLengthLimit()) | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
We need a request ID on both of these and to validate it
f8c16bc to
49cb758
Compare
1b50884 to
c4dcd26
Compare
c4dcd26 to
db6cfb6
Compare
|
|
||
| // The reason for requesting cancellation. | ||
| string reason = 10; | ||
| } |
There was a problem hiding this comment.
^ mimicking ActivityCancelState
bbd18bd to
47463a8
Compare
47463a8 to
9639961
Compare
| len(req.GetIdentity()), config.MaxIDLengthLimit()) | ||
| } | ||
|
|
||
| // TODO: use different config for limit |
There was a problem hiding this comment.
As the comment says, I'll leave this here for now (mimics SAA) but we're investigating a better option
There was a problem hiding this comment.
Was failing tests after rebase
| } | ||
|
|
||
| message NexusOperationTerminateState { | ||
| string request_id = 1; |
There was a problem hiding this comment.
Please add a comment explaining what this is and how it is used.
chasm/lib/nexusoperation/frontend.go
Outdated
| } | ||
|
|
||
| if req.GetRequestId() == "" { | ||
| // Since this mutates the request, we clone it first so that any retries use the original request. |
There was a problem hiding this comment.
Doesn't seem necessary and would be preferable if we used the same request ID for retries.
There was a problem hiding this comment.
Also, can you say when this request would ever be retried from this location? I think I'm missing something.
There was a problem hiding this comment.
Right; this is verbatim from SAA (I assumed what's there was reviewed and is a blessed pattern). I hadn't seen it before either ... 👀
There was a problem hiding this comment.
I tracked this down and I believe this was an overcorrection on the SAA team; I've let them know.
| })) | ||
| } | ||
|
|
||
| func (o *Operation) handleCancellationRequested( |
There was a problem hiding this comment.
Should this be called requestCancel for consistency with Terminate?
There was a problem hiding this comment.
Also followed the SAA naming here; but I also like requestCancel 👍
|
|
||
| return chasm.TerminateComponentResponse{}, nil | ||
| } | ||
|
|
There was a problem hiding this comment.
There's nothing transitioning to the terminated state here... You should also store the terminate reason somehow. I would look at SAA to figure out the best place to store it.
There was a problem hiding this comment.
Right; that's why I have "unimplemented" there. I was under the impression that isn't available yet; ie blocked on the HSM to CHASM migration. Let me take a closer look again.
There was a problem hiding this comment.
Talked offline; it's in scope here - added now.
| return chasm.TerminateComponentResponse{}, nil | ||
| } | ||
|
|
||
| return chasm.TerminateComponentResponse{}, serviceerror.NewUnimplemented("not implemented") |
There was a problem hiding this comment.
| return chasm.TerminateComponentResponse{}, serviceerror.NewUnimplemented("not implemented") | |
| return chasm.TerminateComponentResponse{}, nil |
There was a problem hiding this comment.
Not quite as we'll need a transition - added now.
| return o.handleCancellation(ctx, newCancellation(req)) | ||
| } | ||
|
|
||
| func (o *Operation) handleCancellation( |
There was a problem hiding this comment.
Do we need this separate method?
There was a problem hiding this comment.
Yes, unless we should duplicate the code since it's called from both Cancel and requestCancel?
baa41d7 to
808f1f6
Compare
| } else if len(req.GetRequestId()) > config.MaxIDLengthLimit() { | ||
| return serviceerror.NewInvalidArgumentf("request_id exceeds length limit. Length=%d Limit=%d", | ||
| len(req.GetRequestId()), config.MaxIDLengthLimit()) | ||
| } |
There was a problem hiding this comment.
Moved them all here from frontend; and to the top for easier scanning.
| // Request ID used to deduplicate terminate requests. | ||
| string request_id = 1; | ||
| // The identity of the client who requested termination. | ||
| string identity = 2; |
There was a problem hiding this comment.
Need this to store the identity, too. Even though there's no way to query it for the user (yet?).
adf30e8 to
65847d9
Compare
How did you test it?
Can't verify functional tests yet due to missing impl.