Skip to content

Commit e40642e

Browse files
rodrigozhoudnr
authored andcommitted
Update nexus link converter (#6460)
## What changed? <!-- Describe what has changed in this PR --> Update nexus link converter ## Why? <!-- Tell your future self why have you made these changes --> Sync with code in SDK. ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) -->
1 parent a505b1a commit e40642e

File tree

6 files changed

+227
-65
lines changed

6 files changed

+227
-65
lines changed

components/nexusoperations/executors.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -184,11 +184,6 @@ func (e taskExecutor) executeInvocationTask(ctx context.Context, env hsm.Environ
184184
return fmt.Errorf("%w: %w", queues.NewUnprocessableTaskError("failed to generate a callback token"), err)
185185
}
186186

187-
nexusLink, err := ConvertLinkWorkflowEventToNexusLink(args.workflowEventLink)
188-
if err != nil {
189-
return err
190-
}
191-
192187
callCtx, cancel := context.WithTimeout(
193188
ctx,
194189
e.Config.RequestTimeout(ns.Name().String(), task.EndpointName),
@@ -204,7 +199,7 @@ func (e taskExecutor) executeInvocationTask(ctx context.Context, env hsm.Environ
204199
CallbackHeader: nexus.Header{
205200
commonnexus.CallbackTokenHeader: token,
206201
},
207-
Links: []nexus.Link{nexusLink},
202+
Links: []nexus.Link{args.nexusLink},
208203
})
209204

210205
methodTag := metrics.NexusMethodTag("StartOperation")
@@ -260,7 +255,7 @@ type startArgs struct {
260255
endpointID string
261256
header map[string]string
262257
payload *commonpb.Payload
263-
workflowEventLink *commonpb.Link_WorkflowEvent
258+
nexusLink nexus.Link
264259
namespaceFailoverVersion int64
265260
}
266261

@@ -292,7 +287,7 @@ func (e taskExecutor) loadOperationArgs(
292287
}
293288
args.payload = event.GetNexusOperationScheduledEventAttributes().GetInput()
294289
args.header = event.GetNexusOperationScheduledEventAttributes().GetNexusHeader()
295-
args.workflowEventLink = &commonpb.Link_WorkflowEvent{
290+
args.nexusLink = ConvertLinkWorkflowEventToNexusLink(&commonpb.Link_WorkflowEvent{
296291
Namespace: ns.Name().String(),
297292
WorkflowId: ref.WorkflowKey.WorkflowID,
298293
RunId: ref.WorkflowKey.RunID,
@@ -302,7 +297,7 @@ func (e taskExecutor) loadOperationArgs(
302297
EventType: event.GetEventType(),
303298
},
304299
},
305-
}
300+
})
306301
args.namespaceFailoverVersion = event.Version
307302
return nil
308303
})

components/nexusoperations/executors_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,7 @@ func TestProcessInvocationTask(t *testing.T) {
9999
},
100100
},
101101
}
102-
handlerNexusLink, err := nexusoperations.ConvertLinkWorkflowEventToNexusLink(handlerLink)
103-
require.NoError(t, err)
102+
handlerNexusLink := nexusoperations.ConvertLinkWorkflowEventToNexusLink(handlerLink)
104103

105104
cases := []struct {
106105
name string

components/nexusoperations/link_converter.go

Lines changed: 82 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,34 +20,59 @@
2020
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
2121
// THE SOFTWARE.
2222

23+
// This file is duplicated in temporalio/temporal/components/nexusoperations/link_converter.go
24+
// Any changes here or there must be replicated. This is temporary until the
25+
// temporal repo updates to the most recent SDK version.
26+
2327
package nexusoperations
2428

2529
import (
2630
"fmt"
2731
"net/url"
32+
"regexp"
2833
"strconv"
29-
"strings"
3034

3135
"github.com/nexus-rpc/sdk-go/nexus"
3236
commonpb "go.temporal.io/api/common/v1"
3337
enumspb "go.temporal.io/api/enums/v1"
3438
)
3539

3640
const (
41+
urlSchemeTemporalKey = "temporal"
42+
urlPathNamespaceKey = "namespace"
43+
urlPathWorkflowIDKey = "workflowID"
44+
urlPathRunIDKey = "runID"
45+
urlPathTemplate = "/namespaces/%s/workflows/%s/%s/history"
46+
urlTemplate = "temporal://" + urlPathTemplate
47+
3748
linkWorkflowEventReferenceTypeKey = "referenceType"
3849
linkEventReferenceEventIDKey = "eventID"
3950
linkEventReferenceEventTypeKey = "eventType"
4051
)
4152

42-
func ConvertLinkWorkflowEventToNexusLink(we *commonpb.Link_WorkflowEvent) (nexus.Link, error) {
43-
u, err := url.Parse(fmt.Sprintf(
44-
"temporal:///namespaces/%s/workflows/%s/%s/history",
45-
url.PathEscape(we.GetNamespace()),
46-
url.PathEscape(we.GetWorkflowId()),
47-
url.PathEscape(we.GetRunId()),
53+
var (
54+
rePatternNamespace = fmt.Sprintf(`(?P<%s>[^/]+)`, urlPathNamespaceKey)
55+
rePatternWorkflowID = fmt.Sprintf(`(?P<%s>[^/]+)`, urlPathWorkflowIDKey)
56+
rePatternRunID = fmt.Sprintf(`(?P<%s>[^/]+)`, urlPathRunIDKey)
57+
urlPathRE = regexp.MustCompile(fmt.Sprintf(
58+
`^/namespaces/%s/workflows/%s/%s/history$`,
59+
rePatternNamespace,
60+
rePatternWorkflowID,
61+
rePatternRunID,
4862
))
49-
if err != nil {
50-
return nexus.Link{}, err
63+
)
64+
65+
// ConvertLinkWorkflowEventToNexusLink converts a Link_WorkflowEvent type to Nexus Link.
66+
func ConvertLinkWorkflowEventToNexusLink(we *commonpb.Link_WorkflowEvent) nexus.Link {
67+
u := &url.URL{
68+
Scheme: urlSchemeTemporalKey,
69+
Path: fmt.Sprintf(urlPathTemplate, we.GetNamespace(), we.GetWorkflowId(), we.GetRunId()),
70+
RawPath: fmt.Sprintf(
71+
urlPathTemplate,
72+
url.PathEscape(we.GetNamespace()),
73+
url.PathEscape(we.GetWorkflowId()),
74+
url.PathEscape(we.GetRunId()),
75+
),
5176
}
5277

5378
switch ref := we.GetReference().(type) {
@@ -57,9 +82,10 @@ func ConvertLinkWorkflowEventToNexusLink(we *commonpb.Link_WorkflowEvent) (nexus
5782
return nexus.Link{
5883
URL: u,
5984
Type: string(we.ProtoReflect().Descriptor().FullName()),
60-
}, nil
85+
}
6186
}
6287

88+
// ConvertNexusLinkToLinkWorkflowEvent converts a Nexus Link to Link_WorkflowEvent.
6389
func ConvertNexusLinkToLinkWorkflowEvent(link nexus.Link) (*commonpb.Link_WorkflowEvent, error) {
6490
we := &commonpb.Link_WorkflowEvent{}
6591
if link.Type != string(we.ProtoReflect().Descriptor().FullName()) {
@@ -70,54 +96,76 @@ func ConvertNexusLinkToLinkWorkflowEvent(link nexus.Link) (*commonpb.Link_Workfl
7096
)
7197
}
7298

73-
if link.URL.Scheme != "temporal" {
74-
return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent")
99+
if link.URL.Scheme != urlSchemeTemporalKey {
100+
return nil, fmt.Errorf(
101+
"failed to parse link to Link_WorkflowEvent: invalid scheme: %s",
102+
link.URL.Scheme,
103+
)
104+
}
105+
106+
matches := urlPathRE.FindStringSubmatch(link.URL.EscapedPath())
107+
if len(matches) != 4 {
108+
return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent: malformed URL path")
75109
}
76110

77-
pathParts := strings.Split(link.URL.Path, "/")
78-
if len(pathParts) != 7 {
79-
return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent")
111+
var err error
112+
we.Namespace, err = url.PathUnescape(matches[urlPathRE.SubexpIndex(urlPathNamespaceKey)])
113+
if err != nil {
114+
return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent: %w", err)
80115
}
81-
if pathParts[0] != "" || pathParts[1] != "namespaces" || pathParts[3] != "workflows" || pathParts[6] != "history" {
82-
return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent")
116+
117+
we.WorkflowId, err = url.PathUnescape(matches[urlPathRE.SubexpIndex(urlPathWorkflowIDKey)])
118+
if err != nil {
119+
return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent: %w", err)
83120
}
84-
we.Namespace = pathParts[2]
85-
we.WorkflowId = pathParts[4]
86-
we.RunId = pathParts[5]
87-
switch link.URL.Query().Get(linkWorkflowEventReferenceTypeKey) {
121+
122+
we.RunId, err = url.PathUnescape(matches[urlPathRE.SubexpIndex(urlPathRunIDKey)])
123+
if err != nil {
124+
return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent: %w", err)
125+
}
126+
127+
switch refType := link.URL.Query().Get(linkWorkflowEventReferenceTypeKey); refType {
88128
case string((&commonpb.Link_WorkflowEvent_EventReference{}).ProtoReflect().Descriptor().Name()):
89129
eventRef, err := convertURLQueryToLinkWorkflowEventEventReference(link.URL.Query())
90130
if err != nil {
91-
return nil, err
131+
return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent: %w", err)
92132
}
93133
we.Reference = &commonpb.Link_WorkflowEvent_EventRef{
94134
EventRef: eventRef,
95135
}
136+
default:
137+
return nil, fmt.Errorf(
138+
"failed to parse link to Link_WorkflowEvent: unknown reference type: %q",
139+
refType,
140+
)
96141
}
97142

98143
return we, nil
99144
}
100145

101146
func convertLinkWorkflowEventEventReferenceToURLQuery(eventRef *commonpb.Link_WorkflowEvent_EventReference) string {
102-
values := url.Values{
103-
linkWorkflowEventReferenceTypeKey: []string{string(eventRef.ProtoReflect().Descriptor().Name())},
104-
linkEventReferenceEventIDKey: []string{strconv.FormatInt(eventRef.GetEventId(), 10)},
105-
linkEventReferenceEventTypeKey: []string{eventRef.GetEventType().String()},
147+
values := url.Values{}
148+
values.Set(linkWorkflowEventReferenceTypeKey, string(eventRef.ProtoReflect().Descriptor().Name()))
149+
if eventRef.GetEventId() > 0 {
150+
values.Set(linkEventReferenceEventIDKey, strconv.FormatInt(eventRef.GetEventId(), 10))
106151
}
152+
values.Set(linkEventReferenceEventTypeKey, eventRef.GetEventType().String())
107153
return values.Encode()
108154
}
109155

110156
func convertURLQueryToLinkWorkflowEventEventReference(queryValues url.Values) (*commonpb.Link_WorkflowEvent_EventReference, error) {
111-
eventID, err := strconv.ParseInt(queryValues.Get(linkEventReferenceEventIDKey), 10, 64)
112-
if err != nil {
113-
return nil, err
157+
var err error
158+
eventRef := &commonpb.Link_WorkflowEvent_EventReference{}
159+
eventIDValue := queryValues.Get(linkEventReferenceEventIDKey)
160+
if eventIDValue != "" {
161+
eventRef.EventId, err = strconv.ParseInt(queryValues.Get(linkEventReferenceEventIDKey), 10, 64)
162+
if err != nil {
163+
return nil, err
164+
}
114165
}
115-
eventType, err := enumspb.EventTypeFromString(queryValues.Get(linkEventReferenceEventTypeKey))
166+
eventRef.EventType, err = enumspb.EventTypeFromString(queryValues.Get(linkEventReferenceEventTypeKey))
116167
if err != nil {
117168
return nil, err
118169
}
119-
return &commonpb.Link_WorkflowEvent_EventReference{
120-
EventId: eventID,
121-
EventType: eventType,
122-
}, nil
170+
return eventRef, nil
123171
}

0 commit comments

Comments
 (0)