Skip to content

Commit

Permalink
fix: stream connection leak in case of error (#38321)
Browse files Browse the repository at this point in the history
issue: #38318
pr: #38320

Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 authored Dec 11, 2024
1 parent e758d8e commit 7cee01a
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
14 changes: 11 additions & 3 deletions pkg/mq/msgdispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,16 @@ func NewDispatcher(ctx context.Context,
log := log.With(zap.String("pchannel", pchannel),
zap.String("subName", subName), zap.Bool("isMain", isMain))
log.Info("creating dispatcher...")
stream, err := factory.NewTtMsgStream(ctx)

var stream msgstream.MsgStream
var err error
defer func() {
if err != nil && stream != nil {
stream.Close()
}
}()

stream, err = factory.NewTtMsgStream(ctx)
if err != nil {
return nil, err
}
Expand All @@ -106,15 +115,14 @@ func NewDispatcher(ctx context.Context,

err = stream.Seek(ctx, []*Pos{position}, false)
if err != nil {
stream.Close()
log.Error("seek failed", zap.Error(err))
return nil, err
}
posTime := tsoutil.PhysicalTime(position.GetTimestamp())
log.Info("seek successfully", zap.Uint64("posTs", position.GetTimestamp()),
zap.Time("posTime", posTime), zap.Duration("tsLag", time.Since(posTime)))
} else {
err := stream.AsConsumer(ctx, []string{pchannel}, subName, subPos)
err = stream.AsConsumer(ctx, []string{pchannel}, subName, subPos)
if err != nil {
log.Error("asConsumer failed", zap.Error(err))
return nil, err
Expand Down
1 change: 1 addition & 0 deletions pkg/mq/msgdispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestDispatcher(t *testing.T) {
t.Run("test AsConsumer fail", func(t *testing.T) {
ms := msgstream.NewMockMsgStream(t)
ms.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("mock error"))
ms.EXPECT().Close().Return()
factory := &msgstream.MockMqFactory{
NewMsgStreamFunc: func(ctx context.Context) (msgstream.MsgStream, error) {
return ms, nil
Expand Down

0 comments on commit 7cee01a

Please sign in to comment.