Skip to content

Commit

Permalink
feat: optimize gRPC error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
DMwangnima committed Nov 11, 2024
1 parent 240f4ab commit 963f2d1
Show file tree
Hide file tree
Showing 20 changed files with 993 additions and 177 deletions.
5 changes: 5 additions & 0 deletions pkg/kerrors/kerrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ func IsKitexError(err error) bool {
if _, ok := err.(*DetailedError); ok {
return true
}

if errors.Is(err, ErrStreamingProtocol) {
return true
}

return false
}

Expand Down
19 changes: 13 additions & 6 deletions pkg/kerrors/kerrors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ func TestIsKitexError(t *testing.T) {
ErrNoMoreInstance,
ErrConnOverLimit,
ErrQPSOverLimit,
// streaming errors
ErrStreamingProtocol,
ErrStreamTimeout,
ErrBizCanceled,
ErrMetaSizeExceeded,
ErrMetaContentIllegal,
ErrGracefulShutdown,
}
for _, e := range errs {
test.Assert(t, IsKitexError(e))
Expand Down Expand Up @@ -76,15 +83,15 @@ func TestIs(t *testing.T) {
func TestError(t *testing.T) {
basic := "basic"
extra := "extra"
be := &basicError{basic}
be := &basicError{message: basic}
test.Assert(t, be.Error() == basic)
detailedMsg := appendErrMsg(basic, extra)
test.Assert(t, (&DetailedError{basic: be, extraMsg: extra}).Error() == detailedMsg)
}

func TestWithCause(t *testing.T) {
ae := errors.New("any error")
be := &basicError{"basic"}
be := &basicError{message: "basic"}
de := be.WithCause(ae)

test.Assert(t, be.Error() == "basic")
Expand All @@ -102,7 +109,7 @@ func TestWithCause(t *testing.T) {

func TestWithCauseAndStack(t *testing.T) {
ae := errors.New("any error")
be := &basicError{"basic"}
be := &basicError{message: "basic"}
stack := string(debug.Stack())
de := be.WithCauseAndStack(ae, stack)

Expand Down Expand Up @@ -135,7 +142,7 @@ func TestTimeout(t *testing.T) {
return os.IsTimeout(err)
}

ke = &basicError{"non-timeout"}
ke = &basicError{message: "non-timeout"}
TimeoutCheckFunc = osCheck
test.Assert(t, !IsTimeoutError(ke))
TimeoutCheckFunc = nil
Expand Down Expand Up @@ -173,7 +180,7 @@ func TestTimeout(t *testing.T) {
}

func TestWithCause1(t *testing.T) {
ae := &basicError{"basic"}
ae := &basicError{message: "basic"}
be := ErrRPCTimeout.WithCause(ae)
if e2, ok := be.(*DetailedError); ok {
e2.WithExtraMsg("retry circuite break")
Expand Down Expand Up @@ -218,7 +225,7 @@ func BenchmarkWithCause3(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
ae := &basicError{"basic"}
ae := &basicError{message: "basic"}
be := ErrRPCTimeout.WithCause(ae)
if e2, ok := be.(*DetailedError); ok {
e2.WithExtraMsg("测试")
Expand Down
29 changes: 29 additions & 0 deletions pkg/kerrors/streaming_errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kerrors

var (
// ErrStreamingProtocol is the parent type of all streaming protocol(e.g. gRPC, TTHeader Streaming)
// related but not user-aware errors.
ErrStreamingProtocol = &basicError{"streaming protocol error"}
ErrStreamTimeout = &basicError{"stream timeout"}
ErrBizCanceled = &basicError{"business canceled"}
ErrNonBizCanceled = &basicError{"non-business canceled"}
ErrMetaSizeExceeded = &basicError{"meta size exceeds limit"}
ErrMetaContentIllegal = &basicError{"meta content illegal"}
ErrGracefulShutdown = &basicError{"graceful shutdown"}
)
4 changes: 3 additions & 1 deletion pkg/remote/trans/nphttp2/conn_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

"github.com/cloudwego/kitex/internal/test"
"github.com/cloudwego/kitex/pkg/remote"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/codes"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/status"
)

func TestConnPool(t *testing.T) {
Expand Down Expand Up @@ -77,7 +79,7 @@ func TestReleaseConn(t *testing.T) {
// close stream to ensure no active stream on this connection,
// which will be released when put back to the connection pool and closed by GracefulClose
s := conn.(*clientConn).s
conn.(*clientConn).tr.CloseStream(s, nil)
conn.(*clientConn).tr.CloseStream(s, status.Err(codes.Internal, "test"))
test.Assert(t, err == nil, err)
time.Sleep(100 * time.Millisecond)
shortCP.Put(conn)
Expand Down
72 changes: 72 additions & 0 deletions pkg/remote/trans/nphttp2/errors/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* This file may have been modified by CloudWeGo authors. All CloudWeGo
* Modifications are Copyright 2021 CloudWeGo Authors.
*/

package errors

import (
"errors"
"fmt"

"github.com/cloudwego/kitex/pkg/kerrors"
)

// This package contains all the errors suitable for Kitex errors model.
// These errors should not be used by user directly.
// If users need to perceive these errors, the pkg/streamx/provider/grpc/gerrors package should be used.
var (
// stream error
ErrHTTP2Stream = newErrType("HTTP2Stream err")
ErrClosedWithoutTrailer = newErrType("client received Data frame with END_STREAM flag")
ErrMiddleHeader = newErrType("Headers frame appeared in the middle of a stream")
ErrDecodeHeader = newErrType("decoded Headers frame failed")
ErrRecvRstStream = newErrType("received RstStream frame")
ErrStreamDrain = newErrType("stream rejected by draining connection")
ErrStreamFlowControl = newErrType("stream-level flow control")
ErrIllegalHeaderWrite = newErrType("Illegal header write")

// connection error
ErrHTTP2Connection = newErrType("HTTP2Connection err")
ErrEstablishConnection = newErrType("established connection failed")
ErrHandleGoAway = newErrType("handled GoAway Frame failed")
ErrKeepAlive = newErrType("keepalive failed")
ErrOperateHeaders = newErrType("operated Headers Frame failed")
ErrNoActiveStream = newErrType("no active stream")
)

type errType struct {
message string
// parent errType
basic error
}

func newErrType(message string) *errType {
return &errType{message: message, basic: kerrors.ErrStreamingProtocol}
}

func (e *errType) Error() string {
if e.basic == nil {
return e.message
}
return fmt.Sprintf("%s - %s", e.basic.Error(), e.message)
}

func (e *errType) Is(target error) bool {
return target == e || errors.Is(e.basic, target)
}
58 changes: 58 additions & 0 deletions pkg/remote/trans/nphttp2/errors/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* This file may have been modified by CloudWeGo authors. All CloudWeGo
* Modifications are Copyright 2021 CloudWeGo Authors.
*/

package errors

import (
"errors"
"strings"
"testing"

"github.com/cloudwego/kitex/internal/test"
"github.com/cloudwego/kitex/pkg/kerrors"
)

var errs = []*errType{
// stream error
ErrHTTP2Stream,
ErrClosedWithoutTrailer,
ErrMiddleHeader,
ErrDecodeHeader,
ErrRecvRstStream,
ErrStreamDrain,
ErrStreamFlowControl,
ErrIllegalHeaderWrite,
// connection error
ErrHTTP2Connection,
ErrEstablishConnection,
ErrHandleGoAway,
ErrKeepAlive,
ErrOperateHeaders,
ErrNoActiveStream,
}

func TestErrType(t *testing.T) {
for _, err := range errs {
test.Assert(t, errors.Is(err, kerrors.ErrStreamingProtocol), err)
test.Assert(t, kerrors.IsKitexError(err), err)
test.Assert(t, strings.Contains(err.Error(), err.message), err)
test.Assert(t, strings.Contains(err.Error(), err.basic.Error()), err)
}
}
14 changes: 9 additions & 5 deletions pkg/remote/trans/nphttp2/grpc/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,19 +451,23 @@ func (c *controlBuffer) get(block bool) (interface{}, error) {
select {
case <-c.ch:
case <-c.done:
c.finish()
return nil, ErrConnClosing
var err error
c.finish(errStatusConnClosing)
c.mu.Lock()
err = c.err
c.mu.Unlock()
return nil, err
}
}
}

func (c *controlBuffer) finish() {
func (c *controlBuffer) finish(err error) {
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return
}
c.err = ErrConnClosing
c.err = err
// There may be headers for streams in the control buffer.
// These streams need to be cleaned out since the transport
// is still not aware of these yet.
Expand All @@ -473,7 +477,7 @@ func (c *controlBuffer) finish() {
continue
}
if hdr.onOrphaned != nil { // It will be nil on the server-side.
hdr.onOrphaned(ErrConnClosing)
hdr.onOrphaned(err)
}
}
c.mu.Unlock()
Expand Down
46 changes: 38 additions & 8 deletions pkg/remote/trans/nphttp2/grpc/controlbuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ package grpc

import (
"context"
"errors"
"testing"
"time"

"github.com/cloudwego/kitex/internal/test"
)

func TestControlBuf(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
cb := newControlBuffer(ctx.Done())

// test put()
Expand All @@ -52,24 +53,53 @@ func TestControlBuf(t *testing.T) {
test.Assert(t, !success, err)

// test throttle() mock a lot of response frame so throttle() will block current goroutine
for i := 0; i < maxQueuedTransportResponseFrames+5; i++ {
exceedSize := 5
for i := 0; i < maxQueuedTransportResponseFrames+exceedSize; i++ {
err := cb.put(&ping{})
test.Assert(t, err == nil, err)
}

// start a new goroutine to consume response frame
go func() {
time.Sleep(time.Millisecond * 100)
for {
for i := 0; i < exceedSize+1; i++ {
it, err := cb.get(false)
if err != nil || it == nil {
break
}
test.Assert(t, err == nil, err)
test.Assert(t, it != nil)
}
}()

cb.throttle()
// consumes all of the frames
for {
it, err := cb.get(false)
if err != nil || it == nil {
break
}
}

finishErr := errors.New("finish")
go func() {
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
var block bool
cb.mu.Lock()
block = cb.consumerWaiting
cb.mu.Unlock()
if block {
cb.finish(finishErr)
cancel()
return
}
}
}()
item, err = cb.get(true)
test.Assert(t, err == finishErr, err)
test.Assert(t, item == nil, item)

// test finish()
cb.finish()
err = cb.put(testItem)
test.Assert(t, err == finishErr, err)
_, err = cb.get(false)
test.Assert(t, err == finishErr, err)
}
Loading

0 comments on commit 963f2d1

Please sign in to comment.