Skip to content

Commit

Permalink
revert to original status and add server-side err suffix
Browse files Browse the repository at this point in the history
  • Loading branch information
DMwangnima committed Sep 18, 2024
1 parent 8683021 commit ba48fa1
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 78 deletions.
41 changes: 29 additions & 12 deletions pkg/remote/trans/nphttp2/grpc/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,33 @@ var (

// errors used for cancelling stream.
// the code should be codes.Canceled coz it's NOT returned from remote
errConnectionEOF = status.ErrWithTriggeredByUpstream(codes.Canceled, "transport: connection EOF")
errMaxStreamsExceeded = status.Err(codes.Canceled, "transport: max streams exceeded")
errNotReachable = status.Err(codes.Canceled, "transport: server not reachable")
errMaxAgeClosing = status.ErrWithTriggeredByUpstream(codes.Canceled, "transport: closing server transport due to maximum connection age")
errIdleClosing = status.ErrWithTriggeredByUpstream(codes.Canceled, "transport: closing server transport due to idleness")

errHeaderListSizeLimitViolation = status.Err(codes.Internal, ErrHeaderListSizeLimitViolation.Error())
errIllegalHeaderWrite = status.Err(codes.Internal, ErrIllegalHeaderWrite.Error())
errConnectionEOF = status.Err(codes.Canceled, remoteErrMsg("transport: connection EOF"))
errMaxStreamsExceeded = status.Err(codes.Canceled, remoteErrMsg("transport: max streams exceeded"))
errNotReachable = status.Err(codes.Canceled, remoteErrMsg("transport: server not reachable"))
errMaxAgeClosing = status.Err(codes.Canceled, "transport: closing server transport due to maximum connection age")
errIdleClosing = status.Err(codes.Canceled, remoteErrMsg("transport: closing server transport due to idleness"))
errHeaderListSizeLimitViolation = status.Err(codes.Internal, handlerSideErrMsg(ErrHeaderListSizeLimitViolation.Error()))
errIllegalHeaderWrite = status.Err(codes.Internal, handlerSideErrMsg(ErrIllegalHeaderWrite.Error()))
)

const (
remoteErrTpl = "[triggered by %s]"
remoteErrSuffix = "[triggered by remote service]"
handlerSizeSuffix = "[triggered by handler side]"
)

func remoteErrMsg(msg string) string {
return msg + remoteErrSuffix
}

func remoteErrMsgf(msg string, a ...interface{}) string {
return fmt.Sprintf(msg+remoteErrSuffix, a...)
}

func handlerSideErrMsg(msg string) string {
return msg + handlerSizeSuffix
}

func init() {
rand.Seed(time.Now().UnixNano())
}
Expand Down Expand Up @@ -408,7 +425,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
s := t.activeStreams[se.StreamID]
t.mu.Unlock()
if s != nil {
t.closeStream(s, status.ErrorfWithTriggeredByUpstream(codes.Canceled, "transport: ReadFrame encountered http2.StreamError: %v", err), true, se.Code, false)
t.closeStream(s, status.Err(codes.Canceled, s.getRemoteErrMsgf("transport: ReadFrame encountered http2.StreamError: %v", err)), true, se.Code, false)
} else {
t.controlBuf.put(&cleanupStream{
streamID: se.StreamID,
Expand All @@ -425,7 +442,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
return
}
klog.CtxWarnf(t.ctx, "transport: http2Server.HandleStreams failed to read frame: %v", err)
t.closeWithErr(status.ErrorfWithTriggeredByUpstream(codes.Canceled, "transport: ReadFrame encountered err: %v", err))
t.closeWithErr(status.Err(codes.Canceled, remoteErrMsgf("transport: ReadFrame encountered err: %v", err)))
return
}
switch frame := frame.(type) {
Expand Down Expand Up @@ -552,7 +569,7 @@ func (t *http2Server) handleData(f *grpcframe.DataFrame) {
}
if size > 0 {
if err := s.fc.onData(size); err != nil {
t.closeStream(s, status.ErrorfWithTriggeredByUpstream(codes.Canceled, "transport: inflow control err: %v", err), true, http2.ErrCodeFlowControl, false)
t.closeStream(s, status.Err(codes.Canceled, s.getRemoteErrMsgf("transport: inflow control err: %v", err)), true, http2.ErrCodeFlowControl, false)
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
Expand Down Expand Up @@ -580,7 +597,7 @@ func (t *http2Server) handleData(f *grpcframe.DataFrame) {
func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
// If the stream is not deleted from the transport's active streams map, then do a regular close stream.
if s, ok := t.getStream(f); ok {
t.closeStream(s, status.ErrorfWithTriggeredByUpstream(codes.Canceled, "transport: RSTStream Frame received with error code: %v", f.ErrCode), false, 0, false)
t.closeStream(s, status.Err(codes.Canceled, s.getRemoteErrMsgf("transport: RSTStream Frame received with error code: %v", f.ErrCode)), false, 0, false)
return
}
// If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
Expand Down
6 changes: 6 additions & 0 deletions pkg/remote/trans/nphttp2/grpc/http2_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,9 @@ func testServerSideCloseStreamErrorHandling_WriteStatus(t *testing.T) {
cli.waitServerFinished()
})
}

func TestErrMsg(t *testing.T) {
test.Assert(t, remoteErrMsg("test") == "test[triggered by remote service]")
test.Assert(t, remoteErrMsgf("test: %s", "val") == "test: val[triggered by remote service]")
test.Assert(t, handlerSideErrMsg("test") == "test[triggered by handler side]")
}
39 changes: 39 additions & 0 deletions pkg/remote/trans/nphttp2/grpc/stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package grpc

import (
"testing"

"github.com/cloudwego/kitex/internal/test"
)

func TestStream_SetSourceService(t *testing.T) {
s := new(Stream)
testSvc := "test service"
testErrMsg := "test err"
testErrMsgTpl := "test err: %s"
testErrMsgVal := "val"
test.Assert(t, s.getSourceService() == "remote service")
test.Assert(t, s.getRemoteErrMsg(testErrMsg) == "test err[triggered by remote service]")
test.Assert(t, s.getRemoteErrMsgf(testErrMsgTpl, testErrMsgVal) == "test err: val[triggered by remote service]")

s.SetSourceService(testSvc)
test.Assert(t, s.getSourceService() == "test service")
test.Assert(t, s.getRemoteErrMsg(testErrMsg) == "test err[triggered by test service]")
test.Assert(t, s.getRemoteErrMsgf(testErrMsgTpl, testErrMsgVal) == "test err: val[triggered by test service]")
}
22 changes: 22 additions & 0 deletions pkg/remote/trans/nphttp2/grpc/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ type Stream struct {

// closeStreamErr is used to store the error when stream is closed
closeStreamErr atomic.Value
// sourceService is the source service name of this stream
sourceService atomic.Value
}

// isHeaderSent is only valid on the server-side.
Expand Down Expand Up @@ -490,6 +492,26 @@ func (s *Stream) getCloseStreamErr() error {
return errStatusStreamDone
}

func (s *Stream) SetSourceService(svc string) {
s.sourceService.Store(svc)
}

func (s *Stream) getSourceService() string {
rawSvc := s.sourceService.Load()
if rawSvc != nil {
return rawSvc.(string)
}
return "remote service"
}

func (s *Stream) getRemoteErrMsg(msg string) string {
return msg + fmt.Sprintf(remoteErrTpl, s.getSourceService())
}

func (s *Stream) getRemoteErrMsgf(msg string, a ...interface{}) string {
return fmt.Sprintf(msg, a...) + fmt.Sprintf(remoteErrTpl, s.getSourceService())
}

// StreamWrite only used for unit test
func StreamWrite(s *Stream, buffer *bytes.Buffer) {
s.write(recvMsg{buffer: buffer})
Expand Down
1 change: 1 addition & 0 deletions pkg/remote/trans/nphttp2/server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (t *svrTransHandler) handleFunc(s *grpcTransport.Stream, svrTrans *SvrTrans
return
}
}
s.SetSourceService(ri.From().ServiceName())
rCtx = t.startTracer(rCtx, ri)
defer func() {
panicErr := recover()
Expand Down
39 changes: 2 additions & 37 deletions pkg/remote/trans/nphttp2/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,34 +49,18 @@ type Iface interface {
// and should be created with New, Newf, or FromProto.
type Status struct {
s *spb.Status
// triggeredByUpstream identifies whether this error was triggered by upstream
triggeredByUpstream bool
}

// New returns a Status representing c and msg.
func New(c codes.Code, msg string) *Status {
return &Status{s: &spb.Status{Code: int32(c), Message: msg}}
}

// NewWithTriggeredByUpstream returns a Status identified triggered by upstream
func NewWithTriggeredByUpstream(c codes.Code, msg string) *Status {
res := New(c, msg)
res.triggeredByUpstream = true
return res
}

// Newf returns New(c, fmt.Sprintf(format, a...)).
func Newf(c codes.Code, format string, a ...interface{}) *Status {
return New(c, fmt.Sprintf(format, a...))
}

// NewfWithTriggeredByUpstream is the same as Newf with identifying triggered by upstream
func NewfWithTriggeredByUpstream(c codes.Code, format string, a ...interface{}) *Status {
res := Newf(c, format, a...)
res.triggeredByUpstream = true
return res
}

// ErrorProto returns an error representing s. If s.Code is OK, returns nil.
func ErrorProto(s *spb.Status) error {
return FromProto(s).Err()
Expand All @@ -92,21 +76,11 @@ func Err(c codes.Code, msg string) error {
return New(c, msg).Err()
}

// ErrWithTriggeredByUpstream is the same as Err with identifying triggered by upstream
func ErrWithTriggeredByUpstream(c codes.Code, msg string) error {
return NewWithTriggeredByUpstream(c, msg).Err()
}

// Errorf returns Error(c, fmt.Sprintf(format, a...)).
func Errorf(c codes.Code, format string, a ...interface{}) error {
return Err(c, fmt.Sprintf(format, a...))
}

// ErrorfWithTriggeredByUpstream is the same as Errorf with identifying triggered by upstream
func ErrorfWithTriggeredByUpstream(c codes.Code, format string, a ...interface{}) error {
return NewfWithTriggeredByUpstream(c, format, a...).Err()
}

// Code returns the status code contained in s.
func (s *Status) Code() codes.Code {
if s == nil || s.s == nil {
Expand Down Expand Up @@ -145,7 +119,7 @@ func (s *Status) Err() error {
if s.Code() == codes.OK {
return nil
}
return &Error{e: s.Proto(), triggeredByUpstream: s.triggeredByUpstream}
return &Error{e: s.Proto()}
}

// WithDetails returns a new status with the provided details messages appended to the status.
Expand Down Expand Up @@ -184,22 +158,13 @@ func (s *Status) Details() []interface{} {
return details
}

// TriggeredByUpstream returns whether the error was triggered by upstream.
func (s *Status) TriggeredByUpstream() bool {
return s.triggeredByUpstream
}

// Error wraps a pointer of a status proto. It implements error and Status,
// and a nil *Error should never be returned by this package.
type Error struct {
e *spb.Status
triggeredByUpstream bool
e *spb.Status
}

func (e *Error) Error() string {
if e.triggeredByUpstream {
return fmt.Sprintf("upstream error: code = %d desc = %s", codes.Code(e.e.GetCode()), e.e.GetMessage())
}
return fmt.Sprintf("rpc error: code = %d desc = %s", codes.Code(e.e.GetCode()), e.e.GetMessage())
}

Expand Down
31 changes: 2 additions & 29 deletions pkg/remote/trans/nphttp2/status/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package status
import (
"context"
"fmt"
"strings"
"testing"

spb "google.golang.org/genproto/googleapis/rpc/status"
Expand Down Expand Up @@ -71,7 +70,7 @@ func TestError(t *testing.T) {
s.Code = 1
s.Message = "test err"

er := &Error{s, false}
er := &Error{s}
test.Assert(t, len(er.Error()) > 0)

status := er.GRPCStatus()
Expand Down Expand Up @@ -102,7 +101,7 @@ func TestFromContextError(t *testing.T) {
s := new(spb.Status)
s.Code = 1
s.Message = "test err"
grpcErr := &Error{s, false}
grpcErr := &Error{s}
// grpc err
codeGrpcErr := Code(grpcErr)
test.Assert(t, codeGrpcErr == codes.Canceled)
Expand All @@ -115,29 +114,3 @@ func TestFromContextError(t *testing.T) {
codeNil := Code(nil)
test.Assert(t, codeNil == codes.OK)
}

func TestStatusWithTriggeredByUpstream(t *testing.T) {
statusMsg := "test"
statusOK := NewWithTriggeredByUpstream(codes.OK, statusMsg)
test.Assert(t, statusOK.Code() == codes.OK)
test.Assert(t, statusOK.Message() == statusMsg)
test.Assert(t, statusOK.triggeredByUpstream)
statusErr := statusOK.Err()
test.Assert(t, statusErr == nil)

statusCanceled := NewfWithTriggeredByUpstream(codes.Canceled, "%s", statusMsg)
test.Assert(t, statusCanceled.Code() == codes.Canceled)
test.Assert(t, statusCanceled.Message() == statusMsg)
test.Assert(t, statusCanceled.triggeredByUpstream)
statusErr = statusCanceled.Err()
test.Assert(t, strings.Contains(statusErr.Error(), "upstream error"))
}

func TestErrorWithTriggeredByUpstream(t *testing.T) {
statusMsg := "test"
errOK := ErrWithTriggeredByUpstream(codes.OK, statusMsg)
test.Assert(t, errOK == nil)

errCanceled := ErrorfWithTriggeredByUpstream(codes.Canceled, "%s", statusMsg)
test.Assert(t, strings.Contains(errCanceled.Error(), "upstream error"))
}

0 comments on commit ba48fa1

Please sign in to comment.