Skip to content

Commit

Permalink
feat: optimize gRPC error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
DMwangnima committed Nov 14, 2024
1 parent 240f4ab commit 9bd9abe
Show file tree
Hide file tree
Showing 21 changed files with 1,498 additions and 185 deletions.
3 changes: 2 additions & 1 deletion pkg/kerrors/kerrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ func IsKitexError(err error) bool {
if _, ok := err.(*DetailedError); ok {
return true
}
return false

return IsStreamingError(err)
}

// TimeoutCheckFunc is used to check whether the given err is a timeout error.
Expand Down
12 changes: 11 additions & 1 deletion pkg/kerrors/kerrors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ func TestIsKitexError(t *testing.T) {
ErrNoMoreInstance,
ErrConnOverLimit,
ErrQPSOverLimit,
// streaming errors
ErrStreamingProtocol,
errStreamingTimeout,
ErrStreamTimeout,
ErrStreamingCanceled,
ErrBizCanceled,
ErrGracefulShutdown,
errStreamingMeta,
ErrMetaSizeExceeded,
ErrMetaContentIllegal,
}
for _, e := range errs {
test.Assert(t, IsKitexError(e))
Expand Down Expand Up @@ -204,7 +214,7 @@ func TestFormat(t *testing.T) {
error: errors.New("some_business_error"),
}
basicErr := &basicError{
message: "fake_msg",
"fake_msg",
}
err := basicErr.WithCause(businessError)
got := fmt.Sprintf("%+v", err)
Expand Down
61 changes: 61 additions & 0 deletions pkg/kerrors/streaming_errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kerrors

import "errors"

var (
// ErrStreamingProtocol is the parent type of all streaming protocol(e.g. gRPC, TTHeader Streaming)
// related but not user-aware errors.
ErrStreamingProtocol = &basicError{"streaming protocol error"}

// errStreamingTimeout is the parent type of all streaming timeout errors.
errStreamingTimeout = &basicError{"streaming timeout error"}
// ErrStreamTimeout denotes the timeout of the whole stream.
ErrStreamTimeout = errStreamingTimeout.WithCause(errors.New("stream timeout"))

// ErrStreamingCanceled is the parent type of all streaming canceled errors.
ErrStreamingCanceled = &basicError{"streaming canceled error"}
// ErrBizCanceled denotes the stream is canceled by the biz code invoking cancel().
ErrBizCanceled = ErrStreamingCanceled.WithCause(errors.New("business canceled"))
// ErrGracefulShutdown denotes the stream is canceled due to graceful shutdown.
ErrGracefulShutdown = ErrStreamingCanceled.WithCause(errors.New("graceful shutdown"))

// errStreamingMeta is the parent type of all streaming meta errors.
errStreamingMeta = &basicError{"streaming meta error"}
// ErrMetaSizeExceeded denotes the streaming meta size exceeds the limit.
ErrMetaSizeExceeded = errStreamingMeta.WithCause(errors.New("meta size exceeds limit"))
// ErrMetaContentIllegal denotes the streaming meta content is illegal.
ErrMetaContentIllegal = errStreamingMeta.WithCause(errors.New("meta content illegal"))

streamingBasicErrors = []*basicError{
ErrStreamingProtocol,
errStreamingTimeout,
ErrStreamingCanceled,
errStreamingMeta,
}
)

// IsStreamingError reports whether the given err is a streaming err
func IsStreamingError(err error) bool {
for _, sErr := range streamingBasicErrors {
if errors.Is(err, sErr) {
return true
}
}
return false
}
40 changes: 40 additions & 0 deletions pkg/kerrors/streaming_errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kerrors

import (
"testing"

"github.com/cloudwego/kitex/internal/test"
)

func TestIsStreamingError(t *testing.T) {
errs := []error{
ErrStreamingProtocol,
errStreamingTimeout,
ErrStreamTimeout,
ErrStreamingCanceled,
ErrBizCanceled,
ErrGracefulShutdown,
errStreamingMeta,
ErrMetaSizeExceeded,
ErrMetaContentIllegal,
}
for _, err := range errs {
test.Assert(t, IsStreamingError(err), err)
}
}
4 changes: 3 additions & 1 deletion pkg/remote/trans/nphttp2/conn_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

"github.com/cloudwego/kitex/internal/test"
"github.com/cloudwego/kitex/pkg/remote"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/codes"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/status"
)

func TestConnPool(t *testing.T) {
Expand Down Expand Up @@ -77,7 +79,7 @@ func TestReleaseConn(t *testing.T) {
// close stream to ensure no active stream on this connection,
// which will be released when put back to the connection pool and closed by GracefulClose
s := conn.(*clientConn).s
conn.(*clientConn).tr.CloseStream(s, nil)
conn.(*clientConn).tr.CloseStream(s, status.Err(codes.Internal, "test"))
test.Assert(t, err == nil, err)
time.Sleep(100 * time.Millisecond)
shortCP.Put(conn)
Expand Down
77 changes: 77 additions & 0 deletions pkg/remote/trans/nphttp2/errors/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* This file may have been modified by CloudWeGo authors. All CloudWeGo
* Modifications are Copyright 2021 CloudWeGo Authors.
*/

package errors

import (
"errors"
"fmt"

"github.com/cloudwego/kitex/pkg/kerrors"
)

// This package contains all the errors suitable for Kitex errors model.
// These errors should not be used by user directly.
// If users need to perceive these errors, the pkg/streamx/provider/grpc/gerrors package should be used.
var (
// stream error
ErrHTTP2Stream = newErrType("HTTP2Stream err when parsing HTTP2 frame")
ErrClosedWithoutTrailer = newErrType("client received Data frame with END_STREAM flag")
ErrMiddleHeader = newErrType("Headers frame appeared in the middle of a stream")
ErrDecodeHeader = newErrType("decoded Headers frame failed")
ErrRecvRstStream = newErrType("received RstStream frame")
ErrStreamDrain = newErrType("stream rejected by draining connection")
ErrStreamFlowControl = newErrType("stream-level flow control")
ErrIllegalHeaderWrite = newErrType("Headers frame has been already sent by server")
ErrStreamIsDone = newErrType("stream is done")
ErrMaxStreamExceeded = newErrType("max stream exceeded")

// connection error
ErrHTTP2Connection = newErrType("HTTP2Connection err when parsing HTTP2 frame")
ErrEstablishConnection = newErrType("established connection failed")
ErrHandleGoAway = newErrType("handled GoAway Frame failed")
ErrKeepAlive = newErrType("keepalive failed")
ErrOperateHeaders = newErrType("operated Headers Frame failed")
ErrNoActiveStream = newErrType("no active stream")
ErrControlBufFinished = newErrType("controlbuf finished")
ErrNotReachable = newErrType("server transport is not reachable")
ErrConnectionIsClosing = newErrType("connection is closing")
)

type errType struct {
message string
// parent errType
basic error
}

func newErrType(message string) *errType {
return &errType{message: message, basic: kerrors.ErrStreamingProtocol}
}

func (e *errType) Error() string {
if e.basic == nil {
return e.message
}
return fmt.Sprintf("%s - %s", e.basic.Error(), e.message)
}

func (e *errType) Is(target error) bool {
return target == e || errors.Is(e.basic, target)
}
64 changes: 64 additions & 0 deletions pkg/remote/trans/nphttp2/errors/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* This file may have been modified by CloudWeGo authors. All CloudWeGo
* Modifications are Copyright 2021 CloudWeGo Authors.
*/

package errors

import (
"errors"
"strings"
"testing"

"github.com/cloudwego/kitex/internal/test"
"github.com/cloudwego/kitex/pkg/kerrors"
)

var errs = []*errType{
// stream error
ErrHTTP2Stream,
ErrClosedWithoutTrailer,
ErrMiddleHeader,
ErrDecodeHeader,
ErrRecvRstStream,
ErrStreamDrain,
ErrStreamFlowControl,
ErrIllegalHeaderWrite,
ErrStreamIsDone,
ErrMaxStreamExceeded,
// connection error
ErrHTTP2Connection,
ErrEstablishConnection,
ErrHandleGoAway,
ErrKeepAlive,
ErrOperateHeaders,
ErrNoActiveStream,
ErrControlBufFinished,
ErrNotReachable,
ErrConnectionIsClosing,
}

func TestErrType(t *testing.T) {
for _, err := range errs {
test.Assert(t, errors.Is(err, kerrors.ErrStreamingProtocol), err)
test.Assert(t, kerrors.IsKitexError(err), err)
test.Assert(t, kerrors.IsStreamingError(err), err)
test.Assert(t, strings.Contains(err.Error(), err.message), err)
test.Assert(t, strings.Contains(err.Error(), err.basic.Error()), err)
}
}
16 changes: 10 additions & 6 deletions pkg/remote/trans/nphttp2/grpc/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,19 +451,23 @@ func (c *controlBuffer) get(block bool) (interface{}, error) {
select {
case <-c.ch:
case <-c.done:
c.finish()
return nil, ErrConnClosing
var err error
c.finish(errStatusControlBufFinished)
c.mu.Lock()
err = c.err
c.mu.Unlock()
return nil, err
}
}
}

func (c *controlBuffer) finish() {
func (c *controlBuffer) finish(err error) {
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return
}
c.err = ErrConnClosing
c.err = err
// There may be headers for streams in the control buffer.
// These streams need to be cleaned out since the transport
// is still not aware of these yet.
Expand All @@ -473,7 +477,7 @@ func (c *controlBuffer) finish() {
continue
}
if hdr.onOrphaned != nil { // It will be nil on the server-side.
hdr.onOrphaned(ErrConnClosing)
hdr.onOrphaned(err)
}
}
c.mu.Unlock()
Expand Down Expand Up @@ -696,7 +700,7 @@ func (l *loopyWriter) originateStream(str *outStream) error {
if err == ErrConnClosing {
return err
}
// Other errors(errStreamDrain) need not close transport.
// Other errors(errStatusStreamDrain) need not close transport.
return nil
}
if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
Expand Down
Loading

0 comments on commit 9bd9abe

Please sign in to comment.