From 327d2c6f2b7d16a4b6e37b89f927bdd3dba9616e Mon Sep 17 00:00:00 2001 From: yangxuan Date: Thu, 19 Dec 2024 18:19:14 +0800 Subject: [PATCH] fix: ChannelManager concurret Release and Watch bug See also: #38589 Signed-off-by: yangxuan --- internal/datacoord/channel.go | 20 +++++++++- internal/datacoord/channel_manager.go | 46 +++++++++++++++------- internal/datacoord/channel_manager_test.go | 44 +++++++++++++++++++++ internal/datacoord/channel_store.go | 21 +++++----- internal/datacoord/channel_store_test.go | 2 +- internal/datacoord/mock_channel_store.go | 34 ++++++---------- 6 files changed, 115 insertions(+), 52 deletions(-) diff --git a/internal/datacoord/channel.go b/internal/datacoord/channel.go index 0a80b43a0745f..4de0ee900245a 100644 --- a/internal/datacoord/channel.go +++ b/internal/datacoord/channel.go @@ -191,7 +191,15 @@ func NewStateChannelByWatchInfo(nodeID int64, info *datapb.ChannelWatchInfo) *St return c } -func (c *StateChannel) TransitionOnSuccess() { +func (c *StateChannel) TransitionOnSuccess(opID int64) { + if opID != c.Info.GetOpID() { + log.Warn("Try to transit on success but opID not match, stay original state ", + zap.Any("currentState", c.currentState), + zap.String("channel", c.Name), + zap.Int64("target opID", opID), + zap.Int64("channel opID", c.Info.GetOpID())) + return + } switch c.currentState { case Standby: c.setState(ToWatch) @@ -208,7 +216,15 @@ func (c *StateChannel) TransitionOnSuccess() { } } -func (c *StateChannel) TransitionOnFailure() { +func (c *StateChannel) TransitionOnFailure(opID int64) { + if opID != c.Info.GetOpID() { + log.Warn("Try to transit on failure but opID not match, stay original state", + zap.Any("currentState", c.currentState), + zap.String("channel", c.Name), + zap.Int64("target opID", opID), + zap.Int64("channel opID", c.Info.GetOpID())) + return + } switch c.currentState { case Watching: c.setState(Standby) diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 15b3933d5ded7..40286294090bb 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -546,8 +546,8 @@ func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies [ nodeID := nodeAssign.NodeID var ( - succeededChannels = make([]RWChannel, 0, channelCount) - failedChannels = make([]RWChannel, 0, channelCount) + succeededChannels = 0 + failedChannels = 0 futures = make([]*conc.Future[any], 0, channelCount) ) @@ -564,31 +564,42 @@ func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies [ future := getOrCreateIOPool().Submit(func() (any, error) { err := m.Notify(ctx, nodeID, tmpWatchInfo) - return innerCh, err + return poolResult{ + ch: innerCh, + opID: tmpWatchInfo.GetOpID(), + }, err }) futures = append(futures, future) } for _, f := range futures { - ch, err := f.Await() + got, err := f.Await() + res := got.(poolResult) + if err != nil { - failedChannels = append(failedChannels, ch.(RWChannel)) + log.Ctx(ctx).Warn("Failed to notify channel operations to datanode", + zap.Int64("assignment", nodeAssign.NodeID), + zap.Int("operation count", channelCount), + zap.String("channel name", res.ch.GetName()), + zap.Error(err), + ) + failedChannels++ } else { - succeededChannels = append(succeededChannels, ch.(RWChannel)) + succeededChannels++ advanced = true } + + m.mu.Lock() + m.store.UpdateState(err == nil, nodeID, res.ch, res.opID) + m.mu.Unlock() } log.Ctx(ctx).Info("Finish to notify channel operations to datanode", zap.Int64("assignment", nodeAssign.NodeID), zap.Int("operation count", channelCount), - zap.Int("success count", len(succeededChannels)), - zap.Int("failure count", len(failedChannels)), + zap.Int("success count", succeededChannels), + zap.Int("failure count", failedChannels), ) - m.mu.Lock() - m.store.UpdateState(false, failedChannels...) - m.store.UpdateState(true, succeededChannels...) - m.mu.Unlock() } return advanced @@ -597,6 +608,7 @@ func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies [ type poolResult struct { successful bool ch RWChannel + opID int64 } func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*NodeChannelInfo) bool { @@ -617,13 +629,15 @@ func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*No for _, ch := range nodeAssign.Channels { innerCh := ch + tmpWatchInfo := typeutil.Clone(innerCh.GetWatchInfo()) future := getOrCreateIOPool().Submit(func() (any, error) { - successful, got := m.Check(ctx, nodeID, innerCh.GetWatchInfo()) + successful, got := m.Check(ctx, nodeID, tmpWatchInfo) if got { return poolResult{ successful: successful, ch: innerCh, + opID: tmpWatchInfo.GetOpID(), }, nil } return nil, errors.New("Got results with no progress") @@ -633,10 +647,10 @@ func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*No for _, f := range futures { got, err := f.Await() + result := got.(poolResult) if err == nil { m.mu.Lock() - result := got.(poolResult) - m.store.UpdateState(result.successful, result.ch) + m.store.UpdateState(result.successful, nodeID, result.ch, result.opID) m.mu.Unlock() advanced = true @@ -656,6 +670,7 @@ func (m *ChannelManagerImpl) Notify(ctx context.Context, nodeID int64, info *dat zap.String("channel", info.GetVchan().GetChannelName()), zap.Int64("assignment", nodeID), zap.String("operation", info.GetState().String()), + zap.Int64("opID", info.GetOpID()), ) log.Info("Notify channel operation") err := m.subCluster.NotifyChannelOperation(ctx, nodeID, &datapb.ChannelOperationsRequest{Infos: []*datapb.ChannelWatchInfo{info}}) @@ -706,6 +721,7 @@ func (m *ChannelManagerImpl) Check(ctx context.Context, nodeID int64, info *data if resp.GetState() == datapb.ChannelWatchState_ReleaseFailure { return false, true } + } return false, false } diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index 99a0d953cd369..8cf1f539dc758 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -450,6 +450,50 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() { s.checkAssignment(m, 1, "ch1", Watching) s.checkAssignment(m, 1, "ch2", Watching) }) + + s.Run("advance watching channels released during check", func() { + idx := int64(19530) + mockAlloc := globalIDAllocator.NewMockGlobalIDAllocator(s.T()) + mockAlloc.EXPECT().AllocOne(). + RunAndReturn(func() (int64, error) { + idx++ + return idx, nil + }) + chNodes := map[string]int64{ + "ch1": 1, + "ch2": 1, + } + s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch) + s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice() + m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, mockAlloc) + s.Require().NoError(err) + s.checkAssignment(m, 1, "ch1", ToWatch) + s.checkAssignment(m, 1, "ch2", ToWatch) + + m.AdvanceChannelState(ctx) + s.checkAssignment(m, 1, "ch1", Watching) + s.checkAssignment(m, 1, "ch2", Watching) + + // Release belfore check return + s.mockCluster.EXPECT().CheckChannelOperationProgress(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (*datapb.ChannelOperationProgressResponse, error) { + if info.GetVchan().GetChannelName() == "ch1" { + m.Release(1, "ch1") + s.checkAssignment(m, 1, "ch1", ToRelease) + rwChannel, found := m.GetChannel(nodeID, "ch1") + s.True(found) + metaInfo := rwChannel.GetWatchInfo() + s.Require().EqualValues(metaInfo.GetOpID(), 19531) + log.Info("Trying to check this info", zap.Any("meta info", rwChannel.GetWatchInfo())) + } + log.Info("Trying to check this info", zap.Any("rpc info", info)) + return &datapb.ChannelOperationProgressResponse{State: datapb.ChannelWatchState_WatchSuccess, Progress: 100}, nil + }).Twice() + m.AdvanceChannelState(ctx) + + s.checkAssignment(m, 1, "ch1", ToRelease) + s.checkAssignment(m, 1, "ch2", Watched) + }) + s.Run("advance watching channels check ErrNodeNotFound", func() { chNodes := map[string]int64{ "ch1": 1, diff --git a/internal/datacoord/channel_store.go b/internal/datacoord/channel_store.go index 847458f1a74fc..adede46db0a47 100644 --- a/internal/datacoord/channel_store.go +++ b/internal/datacoord/channel_store.go @@ -72,7 +72,7 @@ type RWChannelStore interface { Update(op *ChannelOpSet) error // UpdateState is used by StateChannelStore only - UpdateState(isSuccessful bool, channels ...RWChannel) + UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64) // SegLegacyChannelByNode is used by StateChannelStore only SetLegacyChannelByNode(nodeIDs ...int64) @@ -375,18 +375,17 @@ func (c *StateChannelStore) AddNode(nodeID int64) { } } -func (c *StateChannelStore) UpdateState(isSuccessful bool, channels ...RWChannel) { - lo.ForEach(channels, func(ch RWChannel, _ int) { - for _, cInfo := range c.channelsInfo { - if stateChannel, ok := cInfo.Channels[ch.GetName()]; ok { - if isSuccessful { - stateChannel.(*StateChannel).TransitionOnSuccess() - } else { - stateChannel.(*StateChannel).TransitionOnFailure() - } +func (c *StateChannelStore) UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64) { + channelName := channel.GetName() + if cInfo, ok := c.channelsInfo[nodeID]; ok { + if stateChannel, ok := cInfo.Channels[channelName]; ok { + if isSuccessful { + stateChannel.(*StateChannel).TransitionOnSuccess(opID) + } else { + stateChannel.(*StateChannel).TransitionOnFailure(opID) } } - }) + } } func (c *StateChannelStore) SetLegacyChannelByNode(nodeIDs ...int64) { diff --git a/internal/datacoord/channel_store_test.go b/internal/datacoord/channel_store_test.go index d15414c717d13..13fae5f9c8892 100644 --- a/internal/datacoord/channel_store_test.go +++ b/internal/datacoord/channel_store_test.go @@ -443,7 +443,7 @@ func (s *StateChannelStoreSuite) TestUpdateState() { }, } - store.UpdateState(test.inSuccess, channel) + store.UpdateState(test.inSuccess, bufferID, channel, 0) s.Equal(test.outChannelState, channel.currentState) }) } diff --git a/internal/datacoord/mock_channel_store.go b/internal/datacoord/mock_channel_store.go index a416692c7db09..86eac95d420cb 100644 --- a/internal/datacoord/mock_channel_store.go +++ b/internal/datacoord/mock_channel_store.go @@ -566,16 +566,9 @@ func (_c *MockRWChannelStore_Update_Call) RunAndReturn(run func(*ChannelOpSet) e return _c } -// UpdateState provides a mock function with given fields: isSuccessful, channels -func (_m *MockRWChannelStore) UpdateState(isSuccessful bool, channels ...RWChannel) { - _va := make([]interface{}, len(channels)) - for _i := range channels { - _va[_i] = channels[_i] - } - var _ca []interface{} - _ca = append(_ca, isSuccessful) - _ca = append(_ca, _va...) - _m.Called(_ca...) +// UpdateState provides a mock function with given fields: isSuccessful, nodeID, channel, opID +func (_m *MockRWChannelStore) UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64) { + _m.Called(isSuccessful, nodeID, channel, opID) } // MockRWChannelStore_UpdateState_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateState' @@ -585,21 +578,16 @@ type MockRWChannelStore_UpdateState_Call struct { // UpdateState is a helper method to define mock.On call // - isSuccessful bool -// - channels ...RWChannel -func (_e *MockRWChannelStore_Expecter) UpdateState(isSuccessful interface{}, channels ...interface{}) *MockRWChannelStore_UpdateState_Call { - return &MockRWChannelStore_UpdateState_Call{Call: _e.mock.On("UpdateState", - append([]interface{}{isSuccessful}, channels...)...)} +// - nodeID int64 +// - channel RWChannel +// - opID int64 +func (_e *MockRWChannelStore_Expecter) UpdateState(isSuccessful interface{}, nodeID interface{}, channel interface{}, opID interface{}) *MockRWChannelStore_UpdateState_Call { + return &MockRWChannelStore_UpdateState_Call{Call: _e.mock.On("UpdateState", isSuccessful, nodeID, channel, opID)} } -func (_c *MockRWChannelStore_UpdateState_Call) Run(run func(isSuccessful bool, channels ...RWChannel)) *MockRWChannelStore_UpdateState_Call { +func (_c *MockRWChannelStore_UpdateState_Call) Run(run func(isSuccessful bool, nodeID int64, channel RWChannel, opID int64)) *MockRWChannelStore_UpdateState_Call { _c.Call.Run(func(args mock.Arguments) { - variadicArgs := make([]RWChannel, len(args)-1) - for i, a := range args[1:] { - if a != nil { - variadicArgs[i] = a.(RWChannel) - } - } - run(args[0].(bool), variadicArgs...) + run(args[0].(bool), args[1].(int64), args[2].(RWChannel), args[3].(int64)) }) return _c } @@ -609,7 +597,7 @@ func (_c *MockRWChannelStore_UpdateState_Call) Return() *MockRWChannelStore_Upda return _c } -func (_c *MockRWChannelStore_UpdateState_Call) RunAndReturn(run func(bool, ...RWChannel)) *MockRWChannelStore_UpdateState_Call { +func (_c *MockRWChannelStore_UpdateState_Call) RunAndReturn(run func(bool, int64, RWChannel, int64)) *MockRWChannelStore_UpdateState_Call { _c.Call.Return(run) return _c }