Skip to content

Commit 6e86ec7

Browse files
committed
proto reflect
1 parent 4836146 commit 6e86ec7

File tree

2 files changed

+27
-2
lines changed

2 files changed

+27
-2
lines changed

workflow/utils.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,26 @@ func (wf *Workflow) stepResultToGrpc(s *stepResult, reply interface{}) error {
124124
return s.Error
125125
}
126126

127+
func (wf *Workflow) stepResultFromKitex(reply interface{}, err error) *stepResult {
128+
sr := &stepResult{Error: wf.Options.GRPCError2DtmError(err)}
129+
if err != nil {
130+
sr.Status = dtmcli.StatusFailed
131+
}
132+
if sr.Error == nil {
133+
sr.Data = dtmimp.MustMarshal(reply)
134+
} else if sr.Status == dtmcli.StatusFailed {
135+
sr.Data = []byte(err.Error())
136+
}
137+
return sr
138+
}
139+
140+
func (wf *Workflow) stepResultToKitex(s *stepResult, reply interface{}) error {
141+
if s.Error == nil && s.Status == dtmcli.StatusSucceed {
142+
dtmimp.MustUnmarshal(s.Data, &reply)
143+
}
144+
return s.Error
145+
}
146+
127147
func (wf *Workflow) stepResultFromHTTP(resp *http.Response, err error) *stepResult {
128148
sr := &stepResult{Error: err}
129149
if err == nil {

workflow/workflow.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"github.com/cloudwego/kitex/pkg/endpoint"
88
"github.com/cloudwego/kitex/pkg/rpcinfo"
9+
"google.golang.org/protobuf/reflect/protoreflect"
910
"net/http"
1011
"net/url"
1112

@@ -264,6 +265,10 @@ func Interceptor(ctx context.Context, method string, req, reply interface{}, cc
264265
return wf.stepResultToGrpc(sr, reply)
265266
}
266267

268+
type KitexProtoReflect interface {
269+
ProtoReflect() protoreflect.Message
270+
}
271+
267272
// KitexInterceptor is the middleware for workflow to capture grpc call result
268273
func KitexInterceptor(next endpoint.Endpoint) endpoint.Endpoint {
269274
return func(ctx context.Context, req, resp interface{}) (err error) {
@@ -292,9 +297,9 @@ func KitexInterceptor(next endpoint.Endpoint) endpoint.Endpoint {
292297
}
293298
sr := wf.recordedDo(func(bb *dtmcli.BranchBarrier) *stepResult {
294299
err := origin()
295-
return wf.stepResultFromGrpc(resp, err)
300+
return wf.stepResultFromKitex(resp, err)
296301
})
297-
return wf.stepResultToGrpc(sr, resp)
302+
return wf.stepResultToKitex(sr, resp)
298303
}
299304

300305
}

0 commit comments

Comments
 (0)