From fcb001aa35bde9bbdaa70be6945cb60bdf3300cd Mon Sep 17 00:00:00 2001 From: Marina-Sakai Date: Tue, 21 Jan 2025 10:18:50 +0800 Subject: [PATCH] fix: message reader writer --- client/genericclient/client.go | 3 -- .../genericclient/generic_stream_service.go | 19 +++++---- pkg/generic/generic_service.go | 39 +++++++++++++++++-- 3 files changed, 44 insertions(+), 17 deletions(-) diff --git a/client/genericclient/client.go b/client/genericclient/client.go index 902f93c671..4c74c92dfa 100644 --- a/client/genericclient/client.go +++ b/client/genericclient/client.go @@ -96,8 +96,6 @@ func (gc *genericServiceClient) GenericCall(ctx context.Context, method string, ctx = client.NewCtxWithCallOptions(ctx, callOptions) mtInfo := gc.svcInfo.MethodInfo(method) _args := mtInfo.NewArgs().(*generic.Args) - codec := gc.g.MessageReaderWriter() - _args.SetCodec(codec) _args.Method = method _args.Request = request @@ -110,7 +108,6 @@ func (gc *genericServiceClient) GenericCall(ctx context.Context, method string, } _result := mtInfo.NewResult().(*generic.Result) - _result.SetCodec(codec) if err = gc.kClient.Call(ctx, mt.Name, _args, _result); err != nil { return } diff --git a/client/genericclient/generic_stream_service.go b/client/genericclient/generic_stream_service.go index 44698ea26a..dda876c417 100644 --- a/client/genericclient/generic_stream_service.go +++ b/client/genericclient/generic_stream_service.go @@ -26,8 +26,7 @@ func StreamingServiceInfo(g generic.Generic) *serviceinfo.ServiceInfo { } func newClientStreamingServiceInfo(g generic.Generic) *serviceinfo.ServiceInfo { - readerWriter := g.MessageReaderWriter() - if readerWriter == nil { + if g.MessageReaderWriter() == nil { // TODO: support grpc binary generic panic("binary generic streaming is not supported") } @@ -37,12 +36,12 @@ func newClientStreamingServiceInfo(g generic.Generic) *serviceinfo.ServiceInfo { nil, func() interface{} { args := &generic.Args{} - args.SetCodec(readerWriter) + args.SetCodec(g.MessageReaderWriter()) return args }, func() interface{} { result := &generic.Result{} - result.SetCodec(readerWriter) + result.SetCodec(g.MessageReaderWriter()) return result }, false, @@ -52,12 +51,12 @@ func newClientStreamingServiceInfo(g generic.Generic) *serviceinfo.ServiceInfo { nil, func() interface{} { args := &generic.Args{} - args.SetCodec(readerWriter) + args.SetCodec(g.MessageReaderWriter()) return args }, func() interface{} { result := &generic.Result{} - result.SetCodec(readerWriter) + result.SetCodec(g.MessageReaderWriter()) return result }, false, @@ -67,12 +66,12 @@ func newClientStreamingServiceInfo(g generic.Generic) *serviceinfo.ServiceInfo { nil, func() interface{} { args := &generic.Args{} - args.SetCodec(readerWriter) + args.SetCodec(g.MessageReaderWriter()) return args }, func() interface{} { result := &generic.Result{} - result.SetCodec(readerWriter) + result.SetCodec(g.MessageReaderWriter()) return result }, false, @@ -82,12 +81,12 @@ func newClientStreamingServiceInfo(g generic.Generic) *serviceinfo.ServiceInfo { nil, func() interface{} { args := &generic.Args{} - args.SetCodec(readerWriter) + args.SetCodec(g.MessageReaderWriter()) return args }, func() interface{} { result := &generic.Result{} - result.SetCodec(readerWriter) + result.SetCodec(g.MessageReaderWriter()) return result }, false, diff --git a/pkg/generic/generic_service.go b/pkg/generic/generic_service.go index 5bff8c6f1c..222f9fdfca 100644 --- a/pkg/generic/generic_service.go +++ b/pkg/generic/generic_service.go @@ -32,7 +32,7 @@ type Service interface { // ServiceInfoWithGeneric create a generic ServiceInfo func ServiceInfoWithGeneric(g Generic) *serviceinfo.ServiceInfo { isCombinedServices := getIsCombinedServices(g) - return newServiceInfo(g.PayloadCodecType(), g.MessageReaderWriter(), g.IDLServiceName(), isCombinedServices) + return newServiceInfo(g, isCombinedServices) } func getIsCombinedServices(g Generic) bool { @@ -44,16 +44,16 @@ func getIsCombinedServices(g Generic) bool { return false } -func newServiceInfo(pcType serviceinfo.PayloadCodec, messageReaderWriter interface{}, serviceName string, isCombinedServices bool) *serviceinfo.ServiceInfo { +func newServiceInfo(g Generic, isCombinedServices bool) *serviceinfo.ServiceInfo { handlerType := (*Service)(nil) - methods, svcName := GetMethodInfo(messageReaderWriter, serviceName) + methods, svcName := getMethodInfo(g, g.IDLServiceName()) svcInfo := &serviceinfo.ServiceInfo{ ServiceName: svcName, HandlerType: handlerType, Methods: methods, - PayloadCodec: pcType, + PayloadCodec: g.PayloadCodecType(), Extra: make(map[string]interface{}), } svcInfo.Extra["generic"] = true @@ -63,6 +63,36 @@ func newServiceInfo(pcType serviceinfo.PayloadCodec, messageReaderWriter interfa return svcInfo } +func getMethodInfo(g Generic, serviceName string) (methods map[string]serviceinfo.MethodInfo, svcName string) { + if g.MessageReaderWriter() == nil { + // note: binary generic cannot be used with multi-service feature + svcName = serviceinfo.GenericService + methods = map[string]serviceinfo.MethodInfo{ + serviceinfo.GenericMethod: serviceinfo.NewMethodInfo(callHandler, newGenericServiceCallArgs, newGenericServiceCallResult, false), + } + } else { + svcName = serviceName + methods = map[string]serviceinfo.MethodInfo{ + serviceinfo.GenericMethod: serviceinfo.NewMethodInfo( + callHandler, + func() interface{} { + args := &Args{} + args.SetCodec(g.MessageReaderWriter()) + return args + }, + func() interface{} { + result := &Result{} + result.SetCodec(g.MessageReaderWriter()) + return result + }, + false, + ), + } + } + return +} + +/* // GetMethodInfo is only used in kitex, please DON'T USE IT. This method may be removed in the future func GetMethodInfo(messageReaderWriter interface{}, serviceName string) (methods map[string]serviceinfo.MethodInfo, svcName string) { if messageReaderWriter == nil { @@ -92,6 +122,7 @@ func GetMethodInfo(messageReaderWriter interface{}, serviceName string) (methods } return } +*/ func callHandler(ctx context.Context, handler, arg, result interface{}) error { realArg := arg.(*Args)