Skip to content

Commit 7688c77

Browse files
committed
feat: optimize gRPC error handling
1 parent 240f4ab commit 7688c77

File tree

8 files changed

+198
-66
lines changed

8 files changed

+198
-66
lines changed

pkg/remote/trans/nphttp2/grpc/controlbuf.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -451,19 +451,23 @@ func (c *controlBuffer) get(block bool) (interface{}, error) {
451451
select {
452452
case <-c.ch:
453453
case <-c.done:
454-
c.finish()
455-
return nil, ErrConnClosing
454+
var err error
455+
c.finish(errStatusConnClosing)
456+
c.mu.Lock()
457+
err = c.err
458+
c.mu.Unlock()
459+
return nil, err
456460
}
457461
}
458462
}
459463

460-
func (c *controlBuffer) finish() {
464+
func (c *controlBuffer) finish(err error) {
461465
c.mu.Lock()
462466
if c.err != nil {
463467
c.mu.Unlock()
464468
return
465469
}
466-
c.err = ErrConnClosing
470+
c.err = err
467471
// There may be headers for streams in the control buffer.
468472
// These streams need to be cleaned out since the transport
469473
// is still not aware of these yet.
@@ -473,7 +477,7 @@ func (c *controlBuffer) finish() {
473477
continue
474478
}
475479
if hdr.onOrphaned != nil { // It will be nil on the server-side.
476-
hdr.onOrphaned(ErrConnClosing)
480+
hdr.onOrphaned(err)
477481
}
478482
}
479483
c.mu.Unlock()

pkg/remote/trans/nphttp2/grpc/controlbuf_test.go

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@ package grpc
1818

1919
import (
2020
"context"
21+
"errors"
2122
"testing"
2223
"time"
2324

2425
"github.com/cloudwego/kitex/internal/test"
2526
)
2627

2728
func TestControlBuf(t *testing.T) {
28-
ctx := context.Background()
29+
ctx, cancel := context.WithCancel(context.Background())
2930
cb := newControlBuffer(ctx.Done())
3031

3132
// test put()
@@ -52,24 +53,53 @@ func TestControlBuf(t *testing.T) {
5253
test.Assert(t, !success, err)
5354

5455
// test throttle() mock a lot of response frame so throttle() will block current goroutine
55-
for i := 0; i < maxQueuedTransportResponseFrames+5; i++ {
56+
exceedSize := 5
57+
for i := 0; i < maxQueuedTransportResponseFrames+exceedSize; i++ {
5658
err := cb.put(&ping{})
5759
test.Assert(t, err == nil, err)
5860
}
5961

6062
// start a new goroutine to consume response frame
6163
go func() {
6264
time.Sleep(time.Millisecond * 100)
63-
for {
65+
for i := 0; i < exceedSize+1; i++ {
6466
it, err := cb.get(false)
65-
if err != nil || it == nil {
66-
break
67-
}
67+
test.Assert(t, err == nil, err)
68+
test.Assert(t, it != nil)
6869
}
6970
}()
7071

7172
cb.throttle()
73+
// consumes all of the frames
74+
for {
75+
it, err := cb.get(false)
76+
if err != nil || it == nil {
77+
break
78+
}
79+
}
80+
81+
finishErr := errors.New("finish")
82+
go func() {
83+
ticker := time.NewTicker(10 * time.Millisecond)
84+
defer ticker.Stop()
85+
for range ticker.C {
86+
var block bool
87+
cb.mu.Lock()
88+
block = cb.consumerWaiting
89+
cb.mu.Unlock()
90+
if block {
91+
cb.finish(finishErr)
92+
cancel()
93+
return
94+
}
95+
}
96+
}()
97+
item, err = cb.get(true)
98+
test.Assert(t, err == finishErr, err)
99+
test.Assert(t, item == nil, item)
72100

73-
// test finish()
74-
cb.finish()
101+
err = cb.put(testItem)
102+
test.Assert(t, err == finishErr, err)
103+
_, err = cb.get(false)
104+
test.Assert(t, err == finishErr, err)
75105
}

pkg/remote/trans/nphttp2/grpc/http2_client.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,7 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
534534
if err != nil {
535535
rst = true
536536
rstCode = http2.ErrCodeCancel
537+
klog.CtxInfof(s.ctx, "KITEX: stream closed by user side ctx canceled, err: %v, code: %d [send RSTStream Frame]", err, rstCode)
537538
}
538539
t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
539540
}
@@ -557,6 +558,15 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
557558
// This will unblock reads eventually.
558559
s.write(recvMsg{err: err})
559560
}
561+
562+
// store closeStreamErr
563+
if err == io.EOF {
564+
err = st.Err()
565+
}
566+
if err != nil {
567+
s.closeStreamErr.Store(err)
568+
}
569+
560570
// If headerChan isn't closed, then close it.
561571
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
562572
s.noHeaders = true
@@ -597,6 +607,9 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
597607
// re-connected. This happens because t.onClose() begins reconnect logic at the
598608
// addrConn level and blocks until the addrConn is successfully connected.
599609
func (t *http2Client) Close(err error) error {
610+
if rawErr, ok := err.(ConnectionError); ok {
611+
err = status.Err(codes.Unavailable, rawErr.Desc)
612+
}
600613
t.mu.Lock()
601614
// Make sure we only Close once.
602615
if t.state == closing {
@@ -617,7 +630,7 @@ func (t *http2Client) Close(err error) error {
617630
t.kpDormancyCond.Signal()
618631
}
619632
t.mu.Unlock()
620-
t.controlBuf.finish()
633+
t.controlBuf.finish(err)
621634
t.cancel()
622635
cErr := t.conn.Close()
623636

@@ -656,10 +669,10 @@ func (t *http2Client) Write(s *Stream, hdr, data []byte, opts *Options) error {
656669
if opts.Last {
657670
// If it's the last message, update stream state.
658671
if !s.compareAndSwapState(streamActive, streamWriteDone) {
659-
return errStreamDone
672+
return s.getCloseStreamErr()
660673
}
661674
} else if s.getState() != streamActive {
662-
return errStreamDone
675+
return s.getCloseStreamErr()
663676
}
664677
df := newDataFrame()
665678
df.streamID = s.id
@@ -670,7 +683,7 @@ func (t *http2Client) Write(s *Stream, hdr, data []byte, opts *Options) error {
670683
df.originD = df.d
671684
if hdr != nil || data != nil { // If it's not an empty data frame, check quota.
672685
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
673-
return err
686+
return s.getCloseStreamErr()
674687
}
675688
}
676689
return t.controlBuf.put(df)
@@ -766,6 +779,7 @@ func (t *http2Client) handleData(f *grpcframe.DataFrame) {
766779
}
767780
if size > 0 {
768781
if err := s.fc.onData(size); err != nil {
782+
klog.CtxErrorf(s.ctx, "KITEX: http2Client.handleData inflow control err: %v, code: %d [send RSTStream Frame]", err, http2.ErrCodeFlowControl)
769783
t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
770784
return
771785
}
@@ -983,6 +997,7 @@ func (t *http2Client) operateHeaders(frame *grpcframe.MetaHeadersFrame) {
983997
if !initialHeader && !endStream {
984998
// As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
985999
st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
1000+
klog.CtxErrorf(s.ctx, "KITEX: http2Client.operateHeaders received HEADERS frame in the middle of a stream, code: %d [send RSTStream Frame]", http2.ErrCodeProtocol)
9861001
t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
9871002
return
9881003
}
@@ -991,6 +1006,7 @@ func (t *http2Client) operateHeaders(frame *grpcframe.MetaHeadersFrame) {
9911006
// Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
9921007
state.data.isGRPC = !initialHeader
9931008
if err := state.decodeHeader(frame); err != nil {
1009+
klog.CtxErrorf(s.ctx, "KITEX: http2Client.operateHeaders decode HEADERS frame failed, err: %v, code: %d [send RSTStream Frame]", err, http2.ErrCodeProtocol)
9941010
t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream)
9951011
return
9961012
}
@@ -1034,7 +1050,7 @@ func (t *http2Client) reader() {
10341050
// Check the validity of server preface.
10351051
frame, err := t.framer.ReadFrame()
10361052
if err != nil {
1037-
err = connectionErrorf(true, err, "error reading from server, remoteAddress=%s, error=%v", t.conn.RemoteAddr(), err)
1053+
err = connectionErrorf(true, err, "error reading from server, remoteAddress=%s, error=%v", t.conn.RemoteAddr(), err)
10381054
t.Close(err) // this kicks off resetTransport, so must be last before return
10391055
return
10401056
}
@@ -1073,12 +1089,13 @@ func (t *http2Client) reader() {
10731089
if err != nil {
10741090
msg = err.Error()
10751091
}
1092+
klog.CtxErrorf(s.ctx, "KITEX: http2Client.reader encountered http2.StreamError: %v, code: %d [send RSTStream Frame]", se, http2.ErrCodeProtocol)
10761093
t.closeStream(s, status.New(code, msg).Err(), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
10771094
}
10781095
continue
10791096
} else {
10801097
// Transport error.
1081-
err = connectionErrorf(true, err, "error reading from server, remoteAddress=%s, error=%v", t.conn.RemoteAddr(), err)
1098+
err = connectionErrorf(true, err, "error reading from server, remoteAddress=%s, error=%v", t.conn.RemoteAddr(), err)
10821099
t.Close(err)
10831100
return
10841101
}

0 commit comments

Comments
 (0)