Skip to content

Commit

Permalink
online: gRPC release stream automatically
Browse files Browse the repository at this point in the history
  • Loading branch information
DMwangnima committed Jan 14, 2025
1 parent 8526b3a commit b258278
Showing 1 changed file with 27 additions and 0 deletions.
27 changes: 27 additions & 0 deletions pkg/remote/trans/nphttp2/client_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strings"
"time"

"github.com/cloudwego/kitex/pkg/gofunc"
"github.com/cloudwego/kitex/pkg/remote"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/codes"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/grpc"
Expand Down Expand Up @@ -100,6 +101,32 @@ func newClientConn(ctx context.Context, tr grpc.ClientTransport, addr string) (*
if err != nil {
return nil, err
}
// gRPC unary do not need to monitor the stream ctx and transport ctx
// since it must invoke stream.Recv which would inspect the stream.ctx
if isStreaming {
gofunc.GoFunc(ctx, func() {
sCtx := s.Context()
select {
// For these scenarios, stream.ctx would be canceled:
// 1. user invoke cancel()
// 2. parent stream is done
case <-sCtx.Done():
tr.CloseStream(s, sCtx.Err())
return
// when http2Client.closeStream is called, stream.Done() would be closed.
// Important: http2Client.closeStream would not lead to stream.ctx canceled.
case <-s.Done():
// since stream is closed, we just exit without doing anything
return
// For now, t.ctx would not be canceled.
// Pls check pkg/remote/trans/nphttp2/conn_pool for details.
case <-tr.Error():
tr.CloseStream(s, grpc.ErrConnClosing)
return
}
})
}

return &clientConn{
tr: tr,
s: s,
Expand Down

0 comments on commit b258278

Please sign in to comment.